Login | Register   
Twitter
RSS Feed
Download our iPhone app
TODAY'S HEADLINES  |   ARTICLE ARCHIVE  |   FORUMS  |   TIP BANK
Browse DevX
Sign up for e-mail newsletters from DevX


advertisement
 

Integrating Spring Batch and MongoDB for ETL Over NoSQL : Page 2

Step-by-step instructions for running an ETL batch job with Spring Batch and MongoDB.


advertisement

Step 2a: Configuring the job-repository.xml file

Spring Batch framework requires a job repository to store the details of the application and also other information related to job and steps. This repository can either be created in a database or held in memory. I will use the memory-based job repository in this example.

JOB-REPOSITORY.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">   <bean id="jobRepository" class="org.springframework.batch.core.repository.support.SimpleJobRepository" >              <constructor-arg>  <bean class="org.springframework.batch.core.repository.dao.MapJobInstanceDao" />            </constructor-arg>             <constructor-arg>   <bean class="org.springframework.batch.core.repository.dao.MapJobExecutionDao" />             </constructor-arg>             <constructor-arg>   <bean class="org.springframework.batch.core.repository.dao.MapStepExecutionDao"/>             </constructor-arg>             <constructor-arg>   <bean class="org.springframework.batch.core.repository.dao.MapExecutionContextDao"/>             </constructor-arg>       </bean>  <bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>  <bean id="jobRepository-transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>  <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" >                  <property name="jobRepository" ref="jobRepository"/>                </bean> </beans>

Step 2b: Configuring job.xml to load the data from a single file to MongoDB collection (table)



First, I define the Job.xml (FileToMongoTableJob.xml in my example). In this file, I specify the FlatFileItemReader, which is a class from the Spring Batch framework. I specify the resource to the FlatFileItemReader as the path of the input file. Here I say the resource value is file:d:\data\employee.csv, i.e., the location of input file employee.csv. I also define the delimiter, which in my case is a comma separator through the DelimitedLineTokenizer class. Then I define my own class EmployeeFieldSetMapper, which implements the Spring Batch framework's FieldSetMapper class. This class binds the resultSet values to the fields of the table. If there is any calculation or process involved, I can cater that through my defined EmployeeProcessor class which implements the ItemProcessor class of the Spring Batch framework.

After this, I specify the MongoDB details by mentioning the hostname where the database is installed and also the port number. I access the database through the MongoTemplate, which takes the reference of the database details mentioned through the id (i.e., Mongo as the argument). In the MongoTemplate I also pass the other argument (i.e., the name of the database I will work with inside the MongoDB), and in this case it is "new." Now I define my own class, MongoDBItemWriter, which is the extension of the ItemWriter class in Spring Batch. This class now reads the MongoTemplate to get the details of the database.

Next, I specify the DynamicJobParameters class, which implements the JobParametersIncrementer from the Spring Batch. This works as the incrementer for the job.

Finally, I specify my batch job where I give the batch:step and batch:tasklet details. The batch job here is employeeProcessorJob, which contains a single step that holds the tasklet where the task mentioned is to read the batch:chunk from the employeeFileItemReader. I also mention the process and the itemwriter details.

FileToMongoTableJob.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"       xmlns:batch="http://www.springframework.org/schema/batch"           xmlns:beans="http://www.springframework.org/schema/beans"      xmlns:aop="http://www.springframework.org/schema/aop"      xmlns:tx="http://www.springframework.org/schema/tx"      xmlns:p="http://www.springframework.org/schema/p"      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      xmlns:mongo="http://www.springframework.org/schema/data/mongo"      xsi:schemaLocation=" http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd      http://www.springframework.org/schema/batch       http://www.springframework.org/schema/batch/spring-batch-2.0.xsd      http://www.springframework.org/schema/aop       http://www.springframework.org/schema/aop/spring-aop-3.0.xsd      http://www.springframework.org/schema/tx       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd      http://www.springframework.org/schema/data/mongo       http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd">                 <beans:import resource="JOB-REPOSITORY.xml"/>                  <bean id="employeeFileItemReader"      class="org.springframework.batch.item.file.FlatFileItemReader">           <property name="resource" value="file:d:\data\employee.csv" />                <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">                                    <property name="lineTokenizer">                     <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">                          <property name="delimiter" value=","/> <property name="names" value="id,name,city,designation,joiningYear,terminationYear" />                     </bean>                </property>                <property name="fieldSetMapper"> <bean class="com.infosys.springbatch.mongo.example.EmployeeFieldSetMapper" />                </property>           </bean>      </property>      </bean>   <bean id="employeeProcessor" class="com.infosys.springbatch.mongo.example.EmployeeProcessor"/>            <mongo:mongo id="mongo" host="localhost" port="27017">          </mongo:mongo>    <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">            <constructor-arg ref="mongo"/>            <constructor-arg name="databaseName" value="new"/>      </bean>       <bean  id="mongoDBItemWriter" class="com.infosys.springbatch.mongo.example.MongoDBItemWriter">           <property name="mongoTemplate" ref="mongoTemplate"/>      </bean>       <bean id="dynamicJobParameters" class="com.infosys.springbatch.mongo.example.DynamicJobParameters" />        <batch:job id="employeeProcessorJob" job-repository="jobRepository" incrementer="dynamicJobParameters">            <batch:step id="step1">  <batch:tasklet transaction-manager="jobRepository-transactionManager" > <batch:chunk reader="employeeFileItemReader" processor="employeeProcessor" writer="mongoDBItemWriter"  commit-interval="10" />                 </batch:tasklet>           </batch:step>                </batch:job> </beans> 

The above job description is to read from a single file and insert to a Mongo table.

Step 2c: Configuring job.xml to load the data from multiple files to MongoDB collection (table)

Next, I'll look at a job description where I read from multiple files and insert into a table through MultipleFileToMongoTableJob.xml. This job description remains the same as the above one with just few differences. While mentioning the employeeFileItemReader, I also mention its scope, which is step. As the FlatFileItemReader will run in multiple steps to read from multiple files, the resource for the FlatFileItemReader is not a single fixed file. There are multiple files to be read; therefore the value for the resource is mentioned as #{stepExecutionContext[fileName]} to be read at the runtime. The employeeProcessor scope is also mentioned as step.

Next, I define the details for the PartitionStep, which is a class inside the Spring Batch framework. Here I give the name of the PartitionStep class as the step1:master. In the PartitionStep, I mention two properties: one is the reference of the jobRepository and other is the stepExecutionSplitter, which refers to the class SimpleStepExecutionSplitter in the Spring Batch framework. This class again takes two references: one is jobRepository, and the other is the step details.

Another argument that goes into this is the MultiResourcePartitioner class, which again is the Spring Batch framework. This class reads the multiple files from the given resource. Here I say the value of the resource is file:d:/data/inputFiles/employeePart*.csv, which indicates that from the mentioned locations I read all the file parts (employeePart0.csv, employeePart1.csv, employeePart2.csv and so on).

Under the step1:master, I also define another property, partitionHandler, which refers to the class TaskExecutorPartitionHandler inside the Spring Batch framework. This class takes three properties: taskExecutor, step and the gridSize. Then I define the step details, which takes the details of the task in the form of tasklet. Inside the task I mention the reader, processor and writer details. Finally, I give the job description under file_partition_Job, where I give the reference of the step details.

MultipleFileToMongoTableJob.xml

<?xml version="1. 0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/batch"      xmlns:beans="http://www.springframework.org/schema/beans"      xmlns:aop="http://www.springframework.org/schema/aop"      xmlns:tx="http://www.springframework.org/schema/tx"      xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      xmlns:mongo="http://www.springframework.org/schema/data/mongo"      xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd">      <beans:import resource="JOB-REPOSITORY.xml"/>        <beans:bean id="employeeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">           <beans:property name="resource" value="#{stepExecutionContext[fileName]}" />           <beans:property name="strict" value="false" />           <beans:property name="lineMapper"> <beans:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">                                         <beans:property name="lineTokenizer">           <beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <beans:property name="delimiter" value=","/> <beans:property name="names" value="id,name,city,designation,joiningYear,terminationYear" />                          </beans:bean>                     </beans:property>                     <beans:property name="fieldSetMapper"> <beans:bean class="com.infosys.springbatch.mongo.example.EmployeeFieldSetMapper" />                     </beans:property>                </beans:bean>           </beans:property>      </beans:bean>   <beans:bean id="employeeProcessor" class="com.infosys.springbatch.mongo.example.EmployeeProcessor" scope="step"/>            <mongo:mongo id="mongo" host="localhost" port="27017">      </mongo:mongo>       <beans:bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">            <beans:constructor-arg ref="mongo"/>            <beans:constructor-arg name="databaseName" value="new"/>      </beans:bean>       <beans:bean  id="mongoDBItemWriter" class="com.infosys.springbatch.mongo.example.MongoDBItemWriter">           <beans:property name="mongoTemplate" ref="mongoTemplate"/>      </beans:bean>   <beans:bean name="step1:master" class="org.springframework.batch.core.partition.support.PartitionStep">           <beans:property name="jobRepository" ref="jobRepository" />           <beans:property name="stepExecutionSplitter"> <beans:bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">                     <beans:constructor-arg ref="jobRepository" />                     <beans:constructor-arg ref="step1" />                     <beans:constructor-arg> <beans:bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> <beans:property name="resources" value="file:d:/data/inputFiles/employeePart*.csv" />                          </beans:bean>                     </beans:constructor-arg>                </beans:bean>           </beans:property>           <beans:property name="partitionHandler"> <beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> <beans:property name="taskExecutor" ref="asyncTaskExecutor" />                     <beans:property name="step" ref="step1" />                     <beans:property name="gridSize" value="3" />                </beans:bean>           </beans:property>      </beans:bean>        <step id="step1"> <tasklet job-repository="jobRepository" transaction-manager="jobRepository-transactionManager">                <chunk reader="employeeFileItemReader"  processor="employeeProcessor" writer="mongoDBItemWriter"  commit-interval="50" />           </tasklet>      </step> <beans:bean id="dynamicJobParameters" class="com.infosys.springbatch.mongo.example.DynamicJobParameters" />      <job id="file_partition_Job"  job-repository="jobRepository" incrementer="dynamicJobParameters">                     <step id="fileProcessStep" parent="step1:master" />                     </job>      </beans:beans>



Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

Sitemap