Login | Register   
LinkedIn
Google+
Twitter
RSS Feed
Download our iPhone app
TODAY'S HEADLINES  |   ARTICLE ARCHIVE  |   FORUMS  |   TIP BANK
Browse DevX
Sign up for e-mail newsletters from DevX


advertisement
 

Integrating Spring Batch and MongoDB for ETL Over NoSQL : Page 3

Step-by-step instructions for running an ETL batch job with Spring Batch and MongoDB.


advertisement

Step 3: The class files used in defining the Jobs.xml

Below is the Employee POJO class, which holds the details/attributes of the employee with their corresponding getter/setter methods, which are not shown here.

Employee.java

package com.infosys.springbatch.mongo.example;  import java.io.Serializable;  public class Employee implements Serializable  {      private static final long serialVersionUID = 1L;      private String id;       private String name;            private String city;            private String designation;            private int joiningYear;            private int terminationYear;      private int tenure;  }



Below, the given class maps the fieldSet data to the employee attributes and creates an employee object.

EmployeeFieldSetMapper.java

package com.infosys.springbatch.mongo.example;  import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet;   public class EmployeeFieldSetMapper implements FieldSetMapper<Employee>  {            public Employee mapFieldSet(FieldSet fs)       {                      if(fs == null)           {                               return null;                     }                      Employee employee = new Employee();                     employee.setId(fs.readString("id"));                     employee.setName(fs.readString("name"));                     employee.setCity(fs.readString("city"));                     employee.setDesignation(fs.readString("designation"));                     employee.setJoiningYear(fs.readInt("joiningYear"));           employee.setTerminationYear(fs.readInt("terminationYear"));           return employee;           }   }

Below, the mentioned class implements the ItemProcessor, which does the processing of any logic if there is any involved using the employee object.

EmployeeProcessor.java

package com.infosys.springbatch.mongo.example;  import org.springframework.batch.item.ItemProcessor;   public class EmployeeProcessor implements ItemProcessor<Employee, Employee>  {            public Employee process(Employee employee) throws Exception      {                      if(employee == null )                 return null;            employee.setTenure(employee.getTerminationYear()-employee.getJoiningYear());                 employee.setName(employee.getName());           return employee;           }  }

This class implements the ItemWriter which actually writes the employee objects to the MongoDB table using the database details which has been defined in the MongoTemplate in the job xml file.

MongoDBItemWriter.java

package com.infosys.springbatch.mongo.example; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.item.ItemWriter; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.MongoTemplate; public class MongoDBItemWriter implements ItemWriter<Object>  {     private static final Log log = LogFactory.getLog(MongoDBItemWriter.class);     private MongoTemplate mongoTemplate;     /**      * @see ItemWriter#write(List)      */     public void write(List<?> data) throws Exception      {         log.info(data);  List<Employee> employeeList = (List<Employee>)data;                   MongoOperations operations = (MongoOperations)mongoTemplate;         if(operations.collectionExists("employee") == false)         {              operations.createCollection("employee");         }         operations.insertAll(employeeList);     }     public void setMongoTemplate(MongoTemplate mongoTemplate)      {       this.mongoTemplate = mongoTemplate;     }     public MongoTemplate getMongoTemplate()     {       return mongoTemplate;     } }

Below, the mentioned class implements the JobParametersIncrementer. This is basically used for incrementing the job count.

DynamicJobParameters.java

package com.infosys.springbatch.mongo.example;  import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersIncrementer;  public class DynamicJobParameters implements JobParametersIncrementer  {                    public JobParameters getNext(JobParameters parameters)          {            if (parameters==null || parameters.isEmpty())                   {  return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();                 }           long id = parameters.getLong("run.id",1L) + 1;          parameters = new JobParametersBuilder().addLong("run.id", id).toJobParameters();             return parameters;                 } }

Step 4: Execution of the jobs mentioned in FileToMongoTableJob.xml and MultipleFileToMongoTableJob.xml

To run the jobs, I need to create run configurations for each of the jobs:

  • To load data from single file to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are the xml definition file, job id mentioned in the xml, and the job incremental FileToMongoTableJob.xml employeeProcessorJob employee.id=1001.
  • To load data from multiple files to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are same as above: MultipleFileToMongoTableJob.xml file_partition_Job employee.id=1001.

Conclusion

The main idea behind this article is to show the end-to-end integration process between Spring Batch and MongoDB to leverage their benefits.



Ira Agrawal works as a Technical Manager with Infosys Labs, where she has worked on different aspects of distributed computing including various middleware and products based on DSM, SOA and virtualization technologies.
Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

Sitemap