RSS Feed
Download our iPhone app
Browse DevX
Sign up for e-mail newsletters from DevX


Integrating Spring Batch and MongoDB for ETL Over NoSQL : Page 3

Step-by-step instructions for running an ETL batch job with Spring Batch and MongoDB.


Step 3: The class files used in defining the Jobs.xml

Below is the Employee POJO class, which holds the details/attributes of the employee with their corresponding getter/setter methods, which are not shown here.


package com.infosys.springbatch.mongo.example;  import java.io.Serializable;  public class Employee implements Serializable  {      private static final long serialVersionUID = 1L;      private String id;       private String name;            private String city;            private String designation;            private int joiningYear;            private int terminationYear;      private int tenure;  }

Below, the given class maps the fieldSet data to the employee attributes and creates an employee object.


package com.infosys.springbatch.mongo.example;  import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet;   public class EmployeeFieldSetMapper implements FieldSetMapper<Employee>  {            public Employee mapFieldSet(FieldSet fs)       {                      if(fs == null)           {                               return null;                     }                      Employee employee = new Employee();                     employee.setId(fs.readString("id"));                     employee.setName(fs.readString("name"));                     employee.setCity(fs.readString("city"));                     employee.setDesignation(fs.readString("designation"));                     employee.setJoiningYear(fs.readInt("joiningYear"));           employee.setTerminationYear(fs.readInt("terminationYear"));           return employee;           }   }

Below, the mentioned class implements the ItemProcessor, which does the processing of any logic if there is any involved using the employee object.


package com.infosys.springbatch.mongo.example;  import org.springframework.batch.item.ItemProcessor;   public class EmployeeProcessor implements ItemProcessor<Employee, Employee>  {            public Employee process(Employee employee) throws Exception      {                      if(employee == null )                 return null;            employee.setTenure(employee.getTerminationYear()-employee.getJoiningYear());                 employee.setName(employee.getName());           return employee;           }  }

This class implements the ItemWriter which actually writes the employee objects to the MongoDB table using the database details which has been defined in the MongoTemplate in the job xml file.


package com.infosys.springbatch.mongo.example; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.item.ItemWriter; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.MongoTemplate; public class MongoDBItemWriter implements ItemWriter<Object>  {     private static final Log log = LogFactory.getLog(MongoDBItemWriter.class);     private MongoTemplate mongoTemplate;     /**      * @see ItemWriter#write(List)      */     public void write(List<?> data) throws Exception      {         log.info(data);  List<Employee> employeeList = (List<Employee>)data;                   MongoOperations operations = (MongoOperations)mongoTemplate;         if(operations.collectionExists("employee") == false)         {              operations.createCollection("employee");         }         operations.insertAll(employeeList);     }     public void setMongoTemplate(MongoTemplate mongoTemplate)      {       this.mongoTemplate = mongoTemplate;     }     public MongoTemplate getMongoTemplate()     {       return mongoTemplate;     } }

Below, the mentioned class implements the JobParametersIncrementer. This is basically used for incrementing the job count.


package com.infosys.springbatch.mongo.example;  import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersIncrementer;  public class DynamicJobParameters implements JobParametersIncrementer  {                    public JobParameters getNext(JobParameters parameters)          {            if (parameters==null || parameters.isEmpty())                   {  return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();                 }           long id = parameters.getLong("run.id",1L) + 1;          parameters = new JobParametersBuilder().addLong("run.id", id).toJobParameters();             return parameters;                 } }

Step 4: Execution of the jobs mentioned in FileToMongoTableJob.xml and MultipleFileToMongoTableJob.xml

To run the jobs, I need to create run configurations for each of the jobs:

  • To load data from single file to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are the xml definition file, job id mentioned in the xml, and the job incremental FileToMongoTableJob.xml employeeProcessorJob employee.id=1001.
  • To load data from multiple files to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are same as above: MultipleFileToMongoTableJob.xml file_partition_Job employee.id=1001.


The main idea behind this article is to show the end-to-end integration process between Spring Batch and MongoDB to leverage their benefits.

Ira Agrawal works as a Technical Manager with Infosys Labs, where she has worked on different aspects of distributed computing including various middleware and products based on DSM, SOA and virtualization technologies.
Email AuthorEmail Author
Close Icon
Thanks for your registration, follow us on our social networks to keep up-to-date