Integrating Spring Batch and MongoDB for ETL Over NoSQL

In today’s enterprises, I deal with applications which are interactive or which run in batch mode. Interactive applications are like Web applications in that they require user input. By contrast, applications which need to start once and end after completing their required jobs are called batch applications. They do not require frequent manual interventions. Batch applications process huge amounts of data. They include any ETL tool that extracts, transforms and loads data through batch processes.

Through this article, I plan to showcase an ETL framework leveraging the advantages of Spring Batch and MongoDB, which gives us the flavor of batch load over the NoSQL databases. Here I give a step by step demonstration of integrating Spring Batch with MongoDB.

Why Batch Processing?

The main advantage of batch applications is that they do not require any manual intervention. As a result, they can be scheduled to run at times when resources aren’t being utilized. As an example, I’ll look at an ETL tool which runs in batch mode to analyze weblogs. Several logs need to be parsed on a daily basis to fetch the required useful information. The input files are extracted and processed to obtain the required information, and the output data gets loaded to a database. This whole process is carried out in batch mode.

Batch processes mainly deal with huge amounts of data where a series of programs runs to meet the required objective. These programs can run one after the other, or they can run in parallel to speed up the execution, depending on requirements. Batch processing allows sharing of the resources; these processes are executed primarily towards the end of day when costly resources would otherwise sit idle.

Why Spring Batch?

The Spring Batch framework is designed to cater to batch applications that run on a daily basis in enterprise organizations. It helps to leverage the benefits of the Spring framework along with the advance services. Spring Batch is mainly used to process huge volume of data. It offers better performance and is highly scalable using different optimization and partition techniques. It also provides advantage over logging/tracing, transaction management, job processing statistics, job restart, steps, and resource management. By using the Spring programming model, I can write the business logic and let the framework take care of infrastructure.

Spring Batch includes three components: batch application, batch execution environment and batch infrastructure.

The Application component contains all the batch jobs and custom code written using Spring Batch.

The Core component contains the core runtime classes necessary to launch and control a batch job. It includes things such as a JobLauncher, Job, and Step implementations. Both Application and Core are built on top of a common infrastructure.

The Infrastructure contains readers, writers and services which are used both by application and the core framework itself. They include things like ItemReader, ItemWriter and MongoTemplate. To use the Spring Batch framework, I need only to configure and customize the XML files. All existing core services should be easy to replace or extend, without any impact to the infrastructure layer.

Why MongoDB?

RDBMS has ruled the storage space for decades, so why do I suddenly need NoSQL?

In a certain set of industries, storage and managing such huge data became a challenge, and traditional RDBMSes could not cope with the needs. Then the NoSQL databases came into place. As the name suggests, NoSQL does not have any query language, and the database does not have any fixed table schema. These databases generally store the data as key-value pair, big table, document store, graphs etc. They are open source, distributed and scale out unlike the relational databases. They seamlessly take advantage of new nodes and were designed with low-cost hardware in mind. They provide high scalability, better performance, easy replication, and greater optimization in data querying and insertions.

MongoDB is one such NoSQL database which is open source and document-oriented. Instead of storing data in tables, as in any relational database, MongoDB stores structure ddata as JSON-like documents with dynamic schemas. MongoDB gives support for ad-hoc queries, indexing, data replication and load balancing. It can be used as a file system and users can take advantage of its replication and load balancing to store the files on multiple servers.

Spring Batch – MongoDB Integration

Now I’ll demonstrate integration of Spring Batch with MongoDB. First, I plan to upload a huge input data file to a MongoDB database.

For this there are multiple steps involved.

Step 1: Splitting the data file

As the input data file is pretty huge, I can split it before loading. If I try to load the huge file sequentially, it will be very time consuming. Therefore, I split the huge file in small parts. The huge file can be split using any File Splitter logic and multiple parts can be loaded to different servers present in the cluster so that the different file parts can be loaded in parallel for faster execution.

Here is a sample code for the FileSplitter that takes the path of data file and the number of parts I want to create for that file. It also requires you to designate the output folder where you want to store the files split parts. Ideally, I assume that the number of parts will be same as the number of servers present in the cluster.

First, it creates those many file objects with the output folder path and stores them in fileNamesList. Their corresponding bufferedWriter objects are created and stored in a vector list. Then I read the file line by line and write the data in different split files from that fileNamesList, which I created using their corresponding bufferedWriter objects from the vector list. After all the split files are created, I transfer those files over different server using the FileTransfer class at the same location which I have given in the output folder.

Here I have assumed that there are just two machines in the cluster and two parts are getting created. One part remains at the same server where I run the FileSplitter and the other gets transferred to the machine whose details I give in the FileTransfer class. As of now, I have hard coded the second server details in the FileTransfer class, but I can configure the server details by reading it from the properties files. For example, if the main huge file is employee.txt, then the part will be created in the output folder named employeePart1.txt and employeePart2.txt.

FileSplitter.java

public class FileSplitter{     public static void main(String args[])     {int noParts = new Integer(args[0]); // no. of parts          String inputFile = args[1]; // input file path          String outputFolder = args[2]; // output folder path          List fileNamesList = new ArrayList();          FileInputStream fstream;          fstream = new FileInputStream(inputFile);          // Get the object of DataInputStream          DataInputStream in = new DataInputStream(fstream);          BufferedReader br = new BufferedReader(new InputStreamReader(in));          String strLine;          Vector vList = new Vector();          for(int i=0; iFileTransfer.java

public class FileTransfer {   public void transferFile(String fileName, String outputFolder)    {        String username = "some_username";         String host = "some_host_name_ip_address";          String pwd = "some_pwd";              JSch jsch = null;       Session session = null;       Channel channel = null;       ChannelSftp c = null;jsch = new JSch();           session = jsch.getSession(username, host, 22);          session.setPassword(pwd);java.util.Properties config = new java.util.Properties();         config.put("StrictHostKeyChecking", "no");           session.setConfig(config);          session.connect();          channel = session.openChannel("sftp");           channel.connect();         c = (ChannelSftp) channel;String fsrc = fileName, fdest = outputFolder;        c.put(fsrc, fdest); c.disconnect();       session.disconnect();   }} 

This is just a sample FileSplitter. I can use several other available logics for splitting the files. Now I move on to my actual integration of Spring Batch with MongoDB to carry out the load process.

Step 2a: Configuring the job-repository.xml file

Spring Batch framework requires a job repository to store the details of the application and also other information related to job and steps. This repository can either be created in a database or held in memory. I will use the memory-based job repository in this example.

JOB-REPOSITORY.xml

     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">                                                                                                                                                  

Step 2b: Configuring job.xml to load the data from a single file to MongoDB collection (table)

First, I define the Job.xml (FileToMongoTableJob.xml in my example). In this file, I specify the FlatFileItemReader, which is a class from the Spring Batch framework. I specify the resource to the FlatFileItemReader as the path of the input file. Here I say the resource value is file:d:dataemployee.csv, i.e., the location of input file employee.csv. I also define the delimiter, which in my case is a comma separator through the DelimitedLineTokenizer class. Then I define my own class EmployeeFieldSetMapper, which implements the Spring Batch framework’s FieldSetMapper class. This class binds the resultSet values to the fields of the table. If there is any calculation or process involved, I can cater that through my defined EmployeeProcessor class which implements the ItemProcessor class of the Spring Batch framework.

After this, I specify the MongoDB details by mentioning the hostname where the database is installed and also the port number. I access the database through the MongoTemplate, which takes the reference of the database details mentioned through the id (i.e., Mongo as the argument). In the MongoTemplate I also pass the other argument (i.e., the name of the database I will work with inside the MongoDB), and in this case it is “new.” Now I define my own class, MongoDBItemWriter, which is the extension of the ItemWriter class in Spring Batch. This class now reads the MongoTemplate to get the details of the database.

Next, I specify the DynamicJobParameters class, which implements the JobParametersIncrementer from the Spring Batch. This works as the incrementer for the job.

Finally, I specify my batch job where I give the batch:step and batch:tasklet details. The batch job here is employeeProcessorJob, which contains a single step that holds the tasklet where the task mentioned is to read the batch:chunk from the employeeFileItemReader. I also mention the process and the itemwriter details.

FileToMongoTableJob.xml

                                                                                                                                                                                                                                                                                                                                                                   

The above job description is to read from a single file and insert to a Mongo table.

Step 2c: Configuring job.xml to load the data from multiple files to MongoDB collection (table)

Next, I’ll look at a job description where I read from multiple files and insert into a table through MultipleFileToMongoTableJob.xml. This job description remains the same as the above one with just few differences. While mentioning the employeeFileItemReader, I also mention its scope, which is step. As the FlatFileItemReader will run in multiple steps to read from multiple files, the resource for the FlatFileItemReader is not a single fixed file. There are multiple files to be read; therefore the value for the resource is mentioned as #{stepExecutionContext[fileName]} to be read at the runtime. The employeeProcessor scope is also mentioned as step.

Next, I define the details for the PartitionStep, which is a class inside the Spring Batch framework. Here I give the name of the PartitionStep class as the step1:master. In the PartitionStep, I mention two properties: one is the reference of the jobRepository and other is the stepExecutionSplitter, which refers to the class SimpleStepExecutionSplitter in the Spring Batch framework. This class again takes two references: one is jobRepository, and the other is the step details.

Another argument that goes into this is the MultiResourcePartitioner class, which again is the Spring Batch framework. This class reads the multiple files from the given resource. Here I say the value of the resource is file:d:/data/inputFiles/employeePart*.csv, which indicates that from the mentioned locations I read all the file parts (employeePart0.csv, employeePart1.csv, employeePart2.csv and so on).

Under the step1:master, I also define another property, partitionHandler, which refers to the class TaskExecutorPartitionHandler inside the Spring Batch framework. This class takes three properties: taskExecutor, step and the gridSize. Then I define the step details, which takes the details of the task in the form of tasklet. Inside the task I mention the reader, processor and writer details. Finally, I give the job description under file_partition_Job, where I give the reference of the step details.

MultipleFileToMongoTableJob.xml

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               

Step 3: The class files used in defining the Jobs.xml

Below is the Employee POJO class, which holds the details/attributes of the employee with their corresponding getter/setter methods, which are not shown here.

Employee.java

package com.infosys.springbatch.mongo.example; import java.io.Serializable; public class Employee implements Serializable {     private static final long serialVersionUID = 1L;     private String id;      private String name;           private String city;           private String designation;           private int joiningYear;           private int terminationYear;     private int tenure; }

Below, the given class maps the fieldSet data to the employee attributes and creates an employee object.

EmployeeFieldSetMapper.java

package com.infosys.springbatch.mongo.example; import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;  public class EmployeeFieldSetMapper implements FieldSetMapper {           public Employee mapFieldSet(FieldSet fs)      {                     if(fs == null)          {                              return null;                    }                     Employee employee = new Employee();                    employee.setId(fs.readString("id"));                    employee.setName(fs.readString("name"));                    employee.setCity(fs.readString("city"));                    employee.setDesignation(fs.readString("designation"));                    employee.setJoiningYear(fs.readInt("joiningYear"));          employee.setTerminationYear(fs.readInt("terminationYear"));          return employee;          }  }

Below, the mentioned class implements the ItemProcessor, which does the processing of any logic if there is any involved using the employee object.

EmployeeProcessor.java

package com.infosys.springbatch.mongo.example; import org.springframework.batch.item.ItemProcessor;  public class EmployeeProcessor implements ItemProcessor {           public Employee process(Employee employee) throws Exception     {                     if(employee == null )                return null;           employee.setTenure(employee.getTerminationYear()-employee.getJoiningYear());                employee.setName(employee.getName());          return employee;          } }

This class implements the ItemWriter which actually writes the employee objects to the MongoDB table using the database details which has been defined in the MongoTemplate in the job xml file.

MongoDBItemWriter.java

package com.infosys.springbatch.mongo.example;import java.util.List;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.batch.item.ItemWriter;import org.springframework.data.mongodb.core.MongoOperations;import org.springframework.data.mongodb.core.MongoTemplate;public class MongoDBItemWriter implements ItemWriter {    private static final Log log = LogFactory.getLog(MongoDBItemWriter.class);    private MongoTemplate mongoTemplate;    /**     * @see ItemWriter#write(List)     */    public void write(List data) throws Exception     {        log.info(data); List employeeList = (List)data;                 MongoOperations operations = (MongoOperations)mongoTemplate;        if(operations.collectionExists("employee") == false)        {             operations.createCollection("employee");        }        operations.insertAll(employeeList);    }    public void setMongoTemplate(MongoTemplate mongoTemplate)     {      this.mongoTemplate = mongoTemplate;    }    public MongoTemplate getMongoTemplate()    {      return mongoTemplate;    }}

Below, the mentioned class implements the JobParametersIncrementer. This is basically used for incrementing the job count.

DynamicJobParameters.java

package com.infosys.springbatch.mongo.example; import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.JobParametersIncrementer; public class DynamicJobParameters implements JobParametersIncrementer {                   public JobParameters getNext(JobParameters parameters)         {           if (parameters==null || parameters.isEmpty())                  { return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();                }          long id = parameters.getLong("run.id",1L) + 1;         parameters = new JobParametersBuilder().addLong("run.id", id).toJobParameters();           return parameters;                }}

Step 4: Execution of the jobs mentioned in FileToMongoTableJob.xml and MultipleFileToMongoTableJob.xml

To run the jobs, I need to create run configurations for each of the jobs:

  • To load data from single file to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are the xml definition file, job id mentioned in the xml, and the job incremental FileToMongoTableJob.xml employeeProcessorJob employee.id=1001.
  • To load data from multiple files to MongoDB table I need to create run configuration where main class is org.springframework.batch.core.launch.support.CommandLineJobRunner and arguments are same as above: MultipleFileToMongoTableJob.xml file_partition_Job employee.id=1001.

Conclusion

The main idea behind this article is to show the end-to-end integration process between Spring Batch and MongoDB to leverage their benefits.

Share the Post:
Share on facebook
Share on twitter
Share on linkedin

Related Posts

DevX is the leading provider of technical information, tools, and services for professionals developing corporate applications.

Join Our Newsletter

Subscribe to receive our latest blog posts directly in your inbox!

© All Rights Reserved.