devxlogo

Processing Large Datasets in a Grid Environment

Processing Large Datasets in a Grid Environment

Distributed computing enables large-scale dataset processing in a grid environment. An interconnected network of individual computers forms the core. Each computer has its own processor and memory, which are used to process assigned tasks separately in the grid. Large datasets are generally split into equally sized blocks, each of which is stored in an individual computer in the grid.

Larger data sets can occupy a terabyte or more of disk space. To avoid the problem of space crunch, dataset blocks are further compressed and stored in each computer in the grid and then decompressed and processed in each computer when needed. This process of compression and decompression is vital to larger data set processing.

In addition to the compression/decompression process, object conversion can enable fast record writing, reading and updating. By converting each record in the dataset block to object form and then compressing and storing the corresponding block file, you can boost performance when handling larger data sets. A protocol buffer is an effective way to handle this object conversion.

In this article, we provide an overview of our solution for handling a large data set file in a grid environment and explain the benefits along the way. We use LZO compression/decompression and a protocol buffer based on the open source framework from Google, Protobuf. Our approach supports object serialization and deserialization by basically converting file records into serialized objects and vice versa.

LZO Compression and Decompression

In a distributed computing scenario, dataset blocks are stored across the network — replicated on each computer system in the cluster — for parallel processing. A job (or one process in distributed computing) is executed across the clustered systems or networked systems in parallel. The job under execution picks up the dataset block file of each computing system in the cluster. This process requires considerable disk space, which is where the LZO compression technique comes in. LZO compression is based on data loss compression and the memory-based fast decompression methodology.

Twitter has an internal project to build a solution for splittable LZO for Hadoop. We created our proof of concept based on the Twitter/Hadoop project but instead of using Hadoop to maintain bulk data we used the Protobuf protocol buffer for fast data serialization and deserialization. The combination of a protocol buffer and LZO compression improved the scalability of large dataset processing for us.

Based on the Twitter/Hadoop project findings, here are some data points about how LZO compression compares with the GZIP compression/decompression algorithm.

  1. The LZO compression ratio is less than GZIP’s. For example, GZIP compressed a 64KB file to 20KB, while LZO compressed the same file to 35KB size file.
  2. The decompression ratio of LZO is faster than GZIP decompression, and it supports splittable files for decompression.
  3. LZO compression is data-loss compression.

Processing Large Datasets in a Single-Node System

To process a large dataset in a single-node system, we opted for the multithreading option. Large dataset file records are converted to a proto object and stored in an equally sized block file. We use LZO compression to compress the block file and storing it in the system. For faster processing, we use n number of threads to process the block file based on random memory availability in the single-node system. Figure 1 illustrates the compression flow for a large dataset file in a single-node system.

After splitting the large dataset file into multiple block files, multiple proto object block files will be created for parallel processing (based on the block size). These proto object block files are compressed using LZO compression. They are also used for parallel processing in single- and multi-node systems.

Our previous DevX article explains how to use a protocol buffer for object management.

Figure 2 illustrates the decompression flow.


Figure 2. Decompression Flow for a Large Dataset File

The compressed block file is decompressed and converted back to the proto object. For decompression, we used multithreading to decompress the compressed block file and read the proto object from an uncompressed file. Here are the steps for reading the LZO compressed block file for decompression.

  1. Check the available free memory from the single-node system.
  2. Calculate the possible thread count based on block size and free memory. Here is the code for determining thread count.
    public static int getNoOfThreadUsage(int blockSize){Runtime runtime = Runtime.getRuntime ();return (int) (runtime.freeMemory()/blockSize);}
  3. Get the file count of the compressed block file, which is called a block count.
  4. Calculate the parallel thread limit for uncompressing the compressed block file.
    threadLimit = noThread < blockCount? noThread : blockCount;
  5. Set the thread count value for parallel execution and increase the value until it reaches the thread limit. This means uncompress all compressed block files one by one based on the available memory in the single-node system. This helps to avoid deadlocks and out-of-memory issues.

Putting It All Together: LZO Compression Use Case

This section walks through the code for an LZO compression use case of our solution. This sample use case program provides parallel processing in a single-node system using large dataset files. (Click here to download the source code.)

The following employee proto file is used for the proto object.

option java_package = "com.emp";option java_outer_classname = "EmpProtos";message Employee {required string name = 1;required int32 id = 2; // Unique ID number for this employeeoptional string email = 3;}message Emprecord {repeated Employee emp = 1;}

The following code snippet uses a proto Java file for object management, and it writes the employee object into the output file as a serialized form.

package com.emp;import java.io.*;import com.emp.EmpProtos.*; //importing the Java proto which is created by proto compiler.//writing the employee record into the output filepublic static void addEmployee(int id ,String name,String email, FileOutputStream fout) throws Exception {Emprecord.Builder recBuilder = Emprecord.newBuilder();Employee.Builder empBuilder = Employee.newBuilder();empBuilder.setName(name);empBuilder.setId(id);empBuilder.setEmail(email);recBuilder.addEmp(empBuilder.build());recBuilder.build().writeTo(fout);}

The following code snippet helps read the employee record in the input file.

//Reading the employee record from the filepublic static void readEmprecord(FileInputStream fin) throws Exception {Emprecord empList =Emprecord.parseFrom(fin);System.out.println("==Employee List ==");if(empList != null){for (Employee emp: empList.getEmpList()) {System.out.println(emp.getId()+" "+emp.getName()+" "+getEmail());}}}

The following code snippet is for compressing the block of the proto object file. We use the lzocomdecomp.jar to compress and decompress the proto object block file. The code shows the syntax for calling the LZO compression class. This class will return the .(dot)LZO extn file, which means it compressed via LZO compression.

LzoCompress.compress( )

The following code snippet is for uncompressing the compressed proto object block file. We create an object for the class and pass the value as a compressed proto object file. This class supports the threading and you can customize it based on your proto object for reading from the block. The output of the file extension is .(dot)unlzo.

FileUncompress( )

The Required JARs

The following are the supported JAR files required for executing the sample use case. Click here to download the source code.

  • protobuf-java-.jar
  • protobuf-format-java-1.1.jar
  • lzocomdecomp.jar: This is our own JAR for LZO compression and decompression.
  • Sampleusecase_emp.jar: This JAR contains multithreading way of processing large dataset.

Here is the syntax for executing the sample use program.

Empaddread   

The input file is comma separated in this format:

,,

Here is the field format:

  • Id: integer
  • name: string
  • email: string
devxblackblue

About Our Editorial Process

At DevX, we’re dedicated to tech entrepreneurship. Our team closely follows industry shifts, new products, AI breakthroughs, technology trends, and funding announcements. Articles undergo thorough editing to ensure accuracy and clarity, reflecting DevX’s style and supporting entrepreneurs in the tech sphere.

See our full editorial policy.

About Our Journalist