devxlogo

How to Write a MapReduce Program Using the Hadoop Framework and Java

How to Write a MapReduce Program Using the Hadoop Framework and Java

Big Data is a relatively new paradigm and processing data is the most important area on which to concentrate development efforts. This article will concentrate on the processing of Big Data using the Apache Hadoop framework and MapReduce programming. MapReduce can be defined as a special type of programming framework used to process huge amounts of data in a distributed framework, called commodity hardware.

Introduction

Hadoop MapReduce can be defined as a software programming framework used to process big volume of data (in terabyte level) in a parallel environment of clustered nodes. The cluster consists of thousands of nodes of commodity hardware. The processing is distributed, reliable and fault tolerant. A typical MapReduce job is performed according to the following steps:

  1. Split the data into independent chunks based on key-value pair. This is done by Map task in a parallel manner.
  2. The output of the Map job is sorted based on the key values
  3. The sorted output is the input to the Reduce job. And then it produces the final output to the processing and returns the result to the client.

MapReduce Framework

The Apache Hadoop MapReduce framework is written in Java. The framework consists of master-slave configuration. The master is known as JobTracker and the slaves are known as TaskTrackers. The master controls the task processed on the slaves (which are nothing but the nodes in a cluster). The computation is done on the slaves. So the compute and storages nodes are the same in a clustered environment. The concept is ‘ move the computation to the nodes where the data is stored’, and it makes the processing faster.

MapReduce Processing

The MapReduce framework model is very lightweight. So the cost of hardware is low compared with other frameworks. But at the same time, we should understand that the model works efficiently only in a distributed environment as the processing is done on nodes where the data resides. The other features like scalability, reliability and fault tolerance also works well on distributed environment.

MapReduce Implementation

Now it is time to discuss the implementation of the MapReduce model using the Java programming platform. The following are the different components of the entire end-to-end implementation.

  • The client programthat is the driver class and initiates the process
  • The Map functionthat performs the split using the key-value pair.
  • The Reduce functionthat aggregate the processed data and send the output back to the client.

Driver Class: The following is a driver class which binds the Map and Reduce function and starts the processing. This is the client program which initiates the process.

Listing 1: The client program (driver class) initiating the process

package com.mapreduce.devx;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * @author kaushik pal *  * This is the main driver class to initiate the mapreduce * process. It sets the back ground for the entire process and  * Then starts it. */public class DevXDriver {		public static void main(String[] args) throws Exception {					// Initiate configuration			Configuration configx = new Configuration();		 		// Add resource files		configx.addResource(new Path("/user/hadoop/core-site.xml"));	    configx.addResource(new Path("/user/hadoop/hdfs-site.xml"));	              		// Create MapReduce job         Job devxmapjob = new Job(configx,"DevXDriver.class");        devxmapjob.setJarByClass(DevXDriver.class);               devxmapjob.setJobName("DevX MapReduce Job");                         	    // Set output kay and value class		devxmapjob.setOutputKeyClass(Text.class);		devxmapjob.setOutputValueClass(Text.class);		// Set Map class		devxmapjob.setMapperClass(DevXMap.class);						// Set Combiner class		devxmapjob.setCombinerClass(DevXReducer.class);  				// Set Reducer class		devxmapjob.setReducerClass(DevXReducer.class);     		// Set Map output key and value classes		devxmapjob.setMapOutputKeyClass(Text.class);		devxmapjob.setMapOutputValueClass(Text.class);       		// Set number of reducer tasks		devxmapjob.setNumReduceTasks(10);		// Set input and output format classes		devxmapjob.setInputFormatClass(TextInputFormat.class);		devxmapjob.setOutputFormatClass(TextOutputFormat.class);       		// Set input and output path		FileInputFormat.addInputPath(devxmapjob, new Path("/user/map_reduce/input/"));		FileOutputFormat.setOutputPath(devxmapjob,new Path("/user/map_reduce/output"));       				// Start MapReduce job		devxmapjob.waitForCompletion(true);	}}

Map Function

This is responsible for splitting the data based on the key-value pair. This is known as mapping of data.

Listing 2: This is a Map function splitting the data into chunks

package com.mapreduce.devx;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URI;import java.util.StringTokenizer;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * @author kaushik pal *  * This is the map process. It does the mapping for keyword-value pair. */public class DevXMap extends Mapper {		// Create Path, BufferedReader and Text variables	Path file_path;	BufferedReader buffer_reader;	Text tweet_values = new Text();	/**	 * @param key	 * @param value	 * @param context	 */	public void map(LongWritable key, Text value, Context context)  {				try{							// Create configuration for Map			Configuration map_config = new Configuration();			 		 	// Load Hadoop core files in configuration			map_config.addResource(new Path("/user/hadoop/core-site.xml"));	        map_config.addResource(new Path("/user/hadoop/hdfs-site.xml"));		        	        // Create variables	        String searchkeyword = "";						// Open file from the file path			file_path=new Path("files/repository/keys.txt");            FileSystem file_system = FileSystem.get(URI.create("files/repository/keys.txt"),new Configuration());           			// Load buffer reader            buffer_reader=new BufferedReader(new InputStreamReader(file_system.open(file_path)));                                  while(buffer_reader.ready())            {            	            	searchkeyword=buffer_reader.readLine().trim();                                   }                        // Get key value            final Text key_value = new Text(searchkeyword);                                    // Check value and take decision            if(value == null)				{					return;				}             else{								StringTokenizer string_tokens = new StringTokenizer(value.toString(),",");					int count = 0;					while(string_tokens.hasMoreTokens()) {						count ++;						if(count 

Reduce Function

This is responsible for aggregating the data. The aggregation is done based on the key values. So after processing and sorting the aggregation is completed and sends the result back to the client program.

Listing 3: The Reduce function aggregates the processed data

package com.mapreduce.devx;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.RandomAccess;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * @author kaushik pal *  * This is the reducer function. It aggregates the output based on the  * sorting of key-value pairs. */public class DevXReducer extends Reducer {	 	 // Create variables for file path    Path positive_file_path;    Path negative_file_path;    Path output_file_path;    Path keyword_file_path;        // Create variables for buffer    BufferedReader positive_buff_reader;    BufferedReader negative_buff_reader;    BufferedReader keyword_buff_reader;         // Create variables for calculation    static Double total_record_count=new Double("0");	       static Double count_neg=new Double("0");    static Double count_pos=new Double("0");    static Double count_neu=new Double("0");       static Double percent_neg=new Double("0");    static Double percent_pos=new Double("0");    static Double percent_neu=new Double("0");    Pattern pattrn_matcher;    Matcher matcher_txt;    static int new_row=0;    FSDataOutputStream out_1st,out_2nd;	  	 /**	 * @param key	 * @param values	 * @param context	 * @throws IOException	 * @throws InterruptedException	 */    public void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException	    {          	    	// Create configuration for reducer    	Configuration reduce_config = new Configuration();    	        // Load hadoop config files    	reduce_config.addResource(new Path("/user/hadoop/core-site.xml"));        reduce_config.addResource(new Path("/user/hadoop/hdfs-site.xml"));    	        // Create variables        String key_word = "";        String check_keyword=key_word;              	keyword_file_path=new Path("files/repository/keys.txt");        FileSystem file_system_read = FileSystem.get(URI.create("files/repository/keys.txt"),new Configuration());        keyword_buff_reader=new BufferedReader(new InputStreamReader(file_system_read.open(keyword_file_path)));                FileSystem get_filesys = FileSystem.get(reduce_config);		FileSystem get_filesys_posneg = FileSystem.get(reduce_config);                Path path_output = new Path("/user/sentiment_output_file.txt");        Path path_output_posneg = new Path("/user/posneg_output_file.txt");                // Get keyword        while(keyword_buff_reader.ready())        {        	key_word=keyword_buff_reader.readLine().trim();                }                 // Check file system        if (!get_filesys.exists(path_output)) {                                   out_1st = get_filesys.create(path_output);            out_2nd = get_filesys_posneg.create(path_output_posneg);                   }                              // Check keyword matching using positive and negative dictionaries        if(check_keyword.equals(key.toString().toLowerCase()))        {     	        	for(Text new_tweets:values)            {	            	// Load positive word dictionary				positive_file_path=new Path("/user/map_reduce/pos_words.txt");                FileSystem filesystem_one = FileSystem.get(URI.create("files/pos_words.txt"),new Configuration());                positive_buff_reader=new BufferedReader(new InputStreamReader(filesystem_one.open(positive_file_path)));                              // Load negative word disctinary                negative_file_path = new Path("/user/map_reduce/neg_words.txt");                FileSystem filesystem_two = FileSystem.get(URI.create("files/neg_words.txt"),new Configuration());                negative_buff_reader =new BufferedReader(new InputStreamReader(filesystem_two.open(negative_file_path)));                ++total_record_count;                boolean first_flag=false;                boolean second_flag=false;                String all_tweets=new_tweets.toString();                              String first_regex = "";                String second_regex = "";                while(positive_buff_reader.ready())                {                    first_regex=positive_buff_reader.readLine().trim();                    new_row++;                               pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);                    matcher_txt = pattrn_matcher.matcher(all_tweets);                    first_flag=matcher_txt.find();                    if(first_flag)                    {                      	out_2nd.writeBytes(all_tweets);                        context.write(new Text(first_regex),new Text(all_tweets));                                          break;                    }                       }                while(negative_buff_reader.ready())                {                    new_row++;                                        second_regex=negative_buff_reader.readLine().trim();                    pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);                    matcher_txt = pattrn_matcher.matcher(all_tweets);                    second_flag=matcher_txt.find();                    if(second_flag)                    {                    	                    	out_2nd.writeBytes(all_tweets);                        context.write(new Text(second_regex),new Text(all_tweets));                        break;                    }                }                if(first_flag&second_flag)                {                    ++count_neu;                }                else                {                    if(first_flag)                    {                        ++count_pos;                    }                    if(second_flag)                    {                        ++count_neg;                    }                    if(first_flag==false&second_flag==false)                    {                        ++count_neu;                    }                }                // Close buffers                negative_buff_reader.close();                positive_buff_reader.close();            }        	        	// Calculate percent values            percent_pos=count_pos/total_record_count*100;            percent_neg=count_neg/total_record_count*100;            percent_neu=count_neu/total_record_count*100;          try{        	  				// Write to the files        	    out_1st.writeBytes("
"+key_word);				out_1st.writeBytes(","+total_record_count);				out_1st.writeBytes(","+percent_neg);				out_1st.writeBytes(","+percent_pos);				out_1st.writeBytes(","+percent_neu);								// Close file systems				out_1st.close();				get_filesys.close();          }catch(Exception e){        	  e.printStackTrace();          }	        }            }}

Conclusion

This article discussed MapReduce processing using the Java programming environment. The different components such as the Map and Reduce functions perform the main task and return the output to the client. The processing performs efficiently on a distributed environment only - so set up the Apache Hadoop framework on a distributed environment to get the best result. Hope you have enjoyed the article and you will be able to implement it in your practical programming.

 

About the Author

Kaushik Pal is a technical architect with 15 years of experience in enterprise application and product development. He has expertise in web technologies, architecture/design, java/j2ee, Open source and big data technologies. You can find more of his work at www.techalpine.com and you can email him here.

devxblackblue

About Our Editorial Process

At DevX, we’re dedicated to tech entrepreneurship. Our team closely follows industry shifts, new products, AI breakthroughs, technology trends, and funding announcements. Articles undergo thorough editing to ensure accuracy and clarity, reflecting DevX’s style and supporting entrepreneurs in the tech sphere.

See our full editorial policy.

About Our Journalist