devxlogo

Using Advanced Hadoop MapReduce Features

Using Advanced Hadoop MapReduce Features

Basic MapReduce programming explains the work flow details, but it does not cover the actual working details inside the MapReduce programming framework. This article will explain the data movement through the MapReduce architecture and the API calls used to do the actual processing. We will also discuss the customization techniques and function overriding for application specific needs.

Introduction

The advanced MapReduce features describe the execution and lower level details. In normal MapReduce programming, simply knowing the APIs and their usage is sufficient to write applications. But the inner details of MapReduce are a must to understand the actual working details and gain confidence.

Custom Types (Data)

For user provided Mapper and Reducer, the Hadoop MapReduce framework always uses typed data. The data which passes through Mappers and Reducers is stored in Java objects.

Writable Interface: The Writable interface is one of the most important interfaces. The objects which can be marshaled to/from files and over the network use this interface. Hadoop also uses this interface to transmit data in a serialized form. Some of the classes that implement Writable interface are mentioned below:

  1. Text class(It stores String data)
  2. LongWritable
  3. FloatWritable
  4. IntWritable
  5. BooleanWritable

Custom data type can also be created by implementing the Writable interface. Hadoop is capable of transmitting any custom data type (which fits your requirement) that implements Writable interface.

The following is the Writable interface that has two methods readFields and write. The first method (readFields) initializes the data of the object from the data contained in the ‘in’ binary stream. The second method (write) is used to reconstruct the object to the binary stream ‘out’. The most important contract of the entire process is that the order of read and write to the binary stream is same.

Listing 1: Showing Writable interface

public interface Writable {  void readFields(DataInput in);  void write(DataOutput out);}

Custom Types (Key)

In the previous section we discussed custom data types to meet an application specific data requirement. It manages the value part only. Now we will discuss the custom key type. In Hadoop MapReduce, the Reducer processes the key in sorted order. So the custom key type needs to implement the interface called WritableComparable. The key types should also implement hashCode ().

The following shows WritableComparable interface. It represents a Writable that is also Comparable.

Listing 2: Showing WritableComparable interface

public interface WritableComparableextends Writable, Comparable

How to use Custom Types

We have already discussed the custom value and key types that can be processed by Hadoop. Now we explore the mechanism that Hadoop uses to understand it. The JobConf object (which defines the job) has two methods called setOutputKeyClass () and setOutputValueClass () and these methods are used to control the value and key data types. If the Mapper produces different types that do not match the Reducer then JobConf’s setMapOutputKeyClass () and setMapOutputValueClass () methods can be used to set the input type as expected by the Reducer.

Faster Performance

The default sorting process is a bit slower as it first reads the key type from a stream then parses the byte stream (using readFields() method) and then finally call the compareTo () method of the key class. The faster approach would be setting an ordering between the keys and checking the byte streams without parsing the entire data set. To implement this faster comparison mechanism, the WritableComparator class can be extended with a comparator specific to your data types. The following is the class declaration.

Listing 3: Showing WritableComparator class

public class WritableComparatorextends Objectimplements RawComparator

Custom data and key types allow us to use a higher level data structure in the Hadoop framework. In a practical Hadoop application, the custom data type is one of the most important requirements. So this feature allows the use of custom writable types and provides a significant performance improvement.

Input Formats

The InputFormat is one of the most important interfaces that defines the input specification of a MapReduce job. Hadoop offers different types of InputFormat for interpretation of various types of input data. The most common and default is TextInputFormat which is used to read lines from a text file. Similarly SequenceFileInputFormat is used to read binary file formats.

The fundamental task of InputFormat is to read the data from the input file. Implementation of custom InputFormat is also possible as per your application’s requirement. For default the TextInputFormat implementation, the key is the byte offset of the line and value is the content of the line terminated by ‘
‘ character. For custom implementation, the separator can be any character and the InputFormat will parse accordingly.

The other function of InputFormat is to split the input file (data source) into fragments that are the input to map tasks. These fragments/splits are encapsulated in the instances of InputSplit interface. The input data source can be anything, such as a database table, xml file or some other file. So the split will be performed based on the application requirement. The most important point is that the split operation should be fast and efficient.

After splitting the files, the read operations from individual splits are very important. The RecordReader is responsible for reading the data from the splits. The RecordReader should be efficient enough to handle the fact that the splits do not always end neatly at the end of a line. The RecordReader always reads till the end of the line even if it crosses the theoretical end of a split. This feature is very important to avoid missing of records that might have crossed the InputSplit boundaries.

  • Custom InputFormat: In basic applications InputFormat is used directly. But for custom read the best way is to subclass FileInputFormat. This abstract class provides functionalities to manipulate files as per the application requirement. For custom parsing, the getRecordReader () method must be overridden so that it returns an instance of RecordReader. This RecordReader is then responsible for reading and parsing.
  • Alternate Source (Data): The InputFormat describes two things–the first is the presentation of data to the Mapper and the second is the data source. Most of the implementations are based on the FileInputFormat, where the data source is local file system of HDFS (Hadoop Distributed File System). But for other types of data sources, custom implementation of InputFormat are required. For example, a NoSQL database such as HBase provides TableInputFormat for reading data from database tables. So the data source can be anything that can be handled by custom implementation.

Output Formats

The OutputFormat is responsible for the write operation. We have already discussed that InputFormat and RecordReader interfaces are responsible for reading data into MapReduce program. After processing the data, the write operation to the permanent storage is managed by OutputFormat and RecordWriter interfaces. The default format is TextOutputFormat which writes the key/value pairs as strings to the output file. The other output format is SequenceFileOutputFormat and it keeps the data in binary form. All these classes use write () and readFields () methods of Writable classes.

The OutputFormat implementation needs to be customized to write data in a custom format. The FileOutputFormat abstract class must be extended to make the customization. The JobConf.setOutputFormat () method must be modified to use different custom format.

Data Partitioning

Partitioning can be defined as a process that determines which Reducer instance will receive which intermediate key/value pair. Each Mapper should determine the destination Reducer for all its output key/value pairs. The most important point is that for any key, regardless of its Mapper instance, the destination partition is the same. For performance reasons Mappers never communicate with each other to the partition of a particular key.

The Partitioner interface is used by the Hadoop system to determine the destination partition for a key/value pair. The number of partitions should match with the number of reduce tasks. The MapReduce framework determines the number of partitions when a job starts.

The following is the signature of Partitioner interface.

Listing 4: Showing Partitioner interface

public interface Partitioner<K2,V2>extends JobConfigurable

Conclusion

In this discussion we have covered the most important Hadoop MapReduce features and why these features are helpful for customization purposes. In practical MapReduce applications, the default implementation of APIs is not particularly useful. Rather, the custom features (that are based on the exposed APIs) may have a significant impact. All these customizations can be done easily once the concepts are clear. Hope this article will be helpful for understanding the advanced features and their implementation.

?

About the Author

Kaushik Pal is a technical architect with 15 years of experience in enterprise application and product development. He has expertise in web technologies, architecture/design, java/j2ee, Open source and big data technologies. 

devxblackblue

About Our Editorial Process

At DevX, we’re dedicated to tech entrepreneurship. Our team closely follows industry shifts, new products, AI breakthroughs, technology trends, and funding announcements. Articles undergo thorough editing to ensure accuracy and clarity, reflecting DevX’s style and supporting entrepreneurs in the tech sphere.

See our full editorial policy.

About Our Journalist