The Wayback Machine - https://web.archive.org/web/20131125161505/http://www.devx.com:80/Java/how-to-write-a-map-reduce-program-using-the-hadoop-framework-and-java.html
Login | Register   
Twitter
RSS Feed
Download our iPhone app
TODAY'S HEADLINES  |   ARTICLE ARCHIVE  |   FORUMS  |   TIP BANK
Browse DevX

advertisement
 

How to Write a MapReduce Program Using the Hadoop Framework and Java

Kaushik Pal explores the processing of Big Data using the Apache Hadoop framework and Map-Reduce programming.


advertisement

Big Data is a relatively new paradigm and processing data is the most important area on which to concentrate development efforts. This article will concentrate on the processing of Big Data using the Apache Hadoop framework and MapReduce programming. MapReduce can be defined as a special type of programming framework used to process huge amounts of data in a distributed framework, called commodity hardware.

Introduction

Hadoop MapReduce can be defined as a software programming framework used to process big volume of data (in terabyte level) in a parallel environment of clustered nodes. The cluster consists of thousands of nodes of commodity hardware. The processing is distributed, reliable and fault tolerant. A typical MapReduce job is performed according to the following steps:

  1. Split the data into independent chunks based on key-value pair. This is done by Map task in a parallel manner.
  2. The output of the Map job is sorted based on the key values
  3. The sorted output is the input to the Reduce job. And then it produces the final output to the processing and returns the result to the client.

MapReduce Framework

The Apache Hadoop MapReduce framework is written in Java. The framework consists of master-slave configuration. The master is known as JobTracker and the slaves are known as TaskTrackers. The master controls the task processed on the slaves (which are nothing but the nodes in a cluster). The computation is done on the slaves. So the compute and storages nodes are the same in a clustered environment. The concept is ' move the computation to the nodes where the data is stored', and it makes the processing faster.

MapReduce Processing

The MapReduce framework model is very lightweight. So the cost of hardware is low compared with other frameworks. But at the same time, we should understand that the model works efficiently only in a distributed environment as the processing is done on nodes where the data resides. The other features like scalability, reliability and fault tolerance also works well on distributed environment.

MapReduce Implementation

Now it is time to discuss the implementation of the MapReduce model using the Java programming platform. The following are the different components of the entire end-to-end implementation.

  • The client programthat is the driver class and initiates the process
  • The Map functionthat performs the split using the key-value pair.
  • The Reduce functionthat aggregate the processed data and send the output back to the client.


Driver Class: The following is a driver class which binds the Map and Reduce function and starts the processing. This is the client program which initiates the process.

Listing 1: The client program (driver class) initiating the process

package com.mapreduce.devx;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author kaushik pal
 * 
 * This is the main driver class to initiate the mapreduce
 * process. It sets the back ground for the entire process and 
 * Then starts it.
 */
public class DevXDriver {	
	public static void main(String[] args) throws Exception {		
	
		// Initiate configuration	
		Configuration configx = new Configuration();
		 
		// Add resource files
		configx.addResource(new Path("/user/hadoop/core-site.xml"));
	    configx.addResource(new Path("/user/hadoop/hdfs-site.xml"));
	              
		// Create MapReduce job 
        Job devxmapjob = new Job(configx,"DevXDriver.class");
        devxmapjob.setJarByClass(DevXDriver.class);       
        devxmapjob.setJobName("DevX MapReduce Job");                  
       
	    // Set output kay and value class
		devxmapjob.setOutputKeyClass(Text.class);
		devxmapjob.setOutputValueClass(Text.class);

		// Set Map class
		devxmapjob.setMapperClass(DevXMap.class);		
		
		// Set Combiner class
		devxmapjob.setCombinerClass(DevXReducer.class);  
		
		// Set Reducer class
		devxmapjob.setReducerClass(DevXReducer.class);     

		// Set Map output key and value classes
		devxmapjob.setMapOutputKeyClass(Text.class);
		devxmapjob.setMapOutputValueClass(Text.class);
       
		// Set number of reducer tasks
		devxmapjob.setNumReduceTasks(10);

		// Set input and output format classes
		devxmapjob.setInputFormatClass(TextInputFormat.class);
		devxmapjob.setOutputFormatClass(TextOutputFormat.class);
       
		// Set input and output path
		FileInputFormat.addInputPath(devxmapjob, new Path("/user/map_reduce/input/"));
		FileOutputFormat.setOutputPath(devxmapjob,new Path("/user/map_reduce/output"));       
		
		// Start MapReduce job
		devxmapjob.waitForCompletion(true);
	}
}

Map Function

This is responsible for splitting the data based on the key-value pair. This is known as mapping of data.

Listing 2: This is a Map function splitting the data into chunks

package com.mapreduce.devx;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author kaushik pal
 * 
 * This is the map process. It does the mapping for keyword-value pair.
 */
public class DevXMap extends Mapper<LongWritable, Text, Text,Text> {
	
	// Create Path, BufferedReader and Text variables
	Path file_path;
	BufferedReader buffer_reader;
	Text tweet_values = new Text();

	/**
	 * @param key
	 * @param value
	 * @param context
	 */
	public void map(LongWritable key, Text value, Context context)  {		
		try{	
			
			// Create configuration for Map
			Configuration map_config = new Configuration();
			 
		 	// Load Hadoop core files in configuration
			map_config.addResource(new Path("/user/hadoop/core-site.xml"));
	        map_config.addResource(new Path("/user/hadoop/hdfs-site.xml"));	
	        
	        // Create variables
	        String searchkeyword = "";
			
			// Open file from the file path
			file_path=new Path("files/repository/keys.txt");
            FileSystem file_system = FileSystem.get(URI.create("files/repository/keys.txt"),new Configuration());
           
			// Load buffer reader
            buffer_reader=new BufferedReader(new InputStreamReader(file_system.open(file_path)));
                      
            while(buffer_reader.ready())
            {            	
            	searchkeyword=buffer_reader.readLine().trim();           
            
            }
            
            // Get key value
            final Text key_value = new Text(searchkeyword);            
            
            // Check value and take decision
            if(value == null)
				{
					return;
				} 
            else{			
					StringTokenizer string_tokens = new StringTokenizer(value.toString(),",");
					int count = 0;

					while(string_tokens.hasMoreTokens()) {
						count ++;
						if(count <=1)
						continue;

						String new_tweet_value = string_tokens.nextToken().toLowerCase().trim().replaceAll("\\*","");
						
						if(new_tweet_value.contains(searchkeyword.toLowerCase().trim())) {									
							tweet_values.set(new_tweet_value);
							context.write(key_value,tweet_values);
						}			
					}
				}
			}
		catch(Exception e){					
			e.printStackTrace();		
		}
	}
}

Reduce Function

This is responsible for aggregating the data. The aggregation is done based on the key values. So after processing and sorting the aggregation is completed and sends the result back to the client program.

Listing 3: The Reduce function aggregates the processed data

package com.mapreduce.devx;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.RandomAccess;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author kaushik pal
 * 
 * This is the reducer function. It aggregates the output based on the 
 * sorting of key-value pairs.
 */
public class DevXReducer extends Reducer<Text ,Text,Text,Text> 
{	 	
 // Create variables for file path
    Path positive_file_path;
    Path negative_file_path;
    Path output_file_path;
    Path keyword_file_path;
    
    // Create variables for buffer
    BufferedReader positive_buff_reader;
    BufferedReader negative_buff_reader;
    BufferedReader keyword_buff_reader; 
    
    // Create variables for calculation
    static Double total_record_count=new Double("0");	
   
    static Double count_neg=new Double("0");
    static Double count_pos=new Double("0");
    static Double count_neu=new Double("0");
   
    static Double percent_neg=new Double("0");
    static Double percent_pos=new Double("0");
    static Double percent_neu=new Double("0");

    Pattern pattrn_matcher;
    Matcher matcher_txt;
    static int new_row=0;
    FSDataOutputStream out_1st,out_2nd;	 

 
	 /**
	 * @param key
	 * @param values
	 * @param context
	 * @throws IOException
	 * @throws InterruptedException
	 */
    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException
	    {          	
    	// Create configuration for reducer
    	Configuration reduce_config = new Configuration();
    	
        // Load hadoop config files
    	reduce_config.addResource(new Path("/user/hadoop/core-site.xml"));
        reduce_config.addResource(new Path("/user/hadoop/hdfs-site.xml"));
    	
        // Create variables
        String key_word = "";
        String check_keyword=key_word;  
        
    	keyword_file_path=new Path("files/repository/keys.txt");
        FileSystem file_system_read = FileSystem.get(URI.create("files/repository/keys.txt"),new Configuration());
        keyword_buff_reader=new BufferedReader(new InputStreamReader(file_system_read.open(keyword_file_path)));
        
        FileSystem get_filesys = FileSystem.get(reduce_config);
		FileSystem get_filesys_posneg = FileSystem.get(reduce_config);
        
        Path path_output = new Path("/user/sentiment_output_file.txt");
        Path path_output_posneg = new Path("/user/posneg_output_file.txt");
        
        // Get keyword
        while(keyword_buff_reader.ready())
        {
        	key_word=keyword_buff_reader.readLine().trim();
        
        } 
        
        // Check file system
        if (!get_filesys.exists(path_output)) {                       
            out_1st = get_filesys.create(path_output);
            out_2nd = get_filesys_posneg.create(path_output_posneg);
           
        }             
         
        // Check keyword matching using positive and negative dictionaries
        if(check_keyword.equals(key.toString().toLowerCase()))
        {     	
        	for(Text new_tweets:values)
            {	
            	// Load positive word dictionary
				positive_file_path=new Path("/user/map_reduce/pos_words.txt");
                FileSystem filesystem_one = FileSystem.get(URI.create("files/pos_words.txt"),new Configuration());
                positive_buff_reader=new BufferedReader(new InputStreamReader(filesystem_one.open(positive_file_path)));
              
                // Load negative word disctinary
                negative_file_path = new Path("/user/map_reduce/neg_words.txt");
                FileSystem filesystem_two = FileSystem.get(URI.create("files/neg_words.txt"),new Configuration());
                negative_buff_reader =new BufferedReader(new InputStreamReader(filesystem_two.open(negative_file_path)));

                ++total_record_count;

                boolean first_flag=false;
                boolean second_flag=false;

                String all_tweets=new_tweets.toString();              
                String first_regex = "";
                String second_regex = "";

                while(positive_buff_reader.ready())
                {
                    first_regex=positive_buff_reader.readLine().trim();
                    new_row++;           
                    pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);
                    matcher_txt = pattrn_matcher.matcher(all_tweets);
                    first_flag=matcher_txt.find();

                    if(first_flag)
                    {  
                    	out_2nd.writeBytes(all_tweets);
                        context.write(new Text(first_regex),new Text(all_tweets));                  
                        break;
                    }       
                }
                while(negative_buff_reader.ready())
                {
                    new_row++;                    
                    second_regex=negative_buff_reader.readLine().trim();
                    pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);
                    matcher_txt = pattrn_matcher.matcher(all_tweets);
                    second_flag=matcher_txt.find();
                    if(second_flag)
                    {                    	
                    	out_2nd.writeBytes(all_tweets);
                        context.write(new Text(second_regex),new Text(all_tweets));
                        break;
                    }

                }
                if(first_flag&second_flag)
                {
                    ++count_neu;
                }
                else
                {
                    if(first_flag)
                    {
                        ++count_pos;
                    }
                    if(second_flag)
                    {
                        ++count_neg;
                    }
                    if(first_flag==false&second_flag==false)
                    {
                        ++count_neu;
                    }
                }
                // Close buffers
                negative_buff_reader.close();
                positive_buff_reader.close();

            }
        	
        	// Calculate percent values
            percent_pos=count_pos/total_record_count*100;
            percent_neg=count_neg/total_record_count*100;
            percent_neu=count_neu/total_record_count*100;

          try{        	  
				// Write to the files
        	    out_1st.writeBytes("\n"+key_word);
				out_1st.writeBytes(","+total_record_count);
				out_1st.writeBytes(","+percent_neg);
				out_1st.writeBytes(","+percent_pos);
				out_1st.writeBytes(","+percent_neu);
				
				// Close file systems
				out_1st.close();
				get_filesys.close();
          }catch(Exception e){
        	  e.printStackTrace();
          }	
        }        
    }
}

Conclusion

This article discussed MapReduce processing using the Java programming environment. The different components such as the Map and Reduce functions perform the main task and return the output to the client. The processing performs efficiently on a distributed environment only - so set up the Apache Hadoop framework on a distributed environment to get the best result. Hope you have enjoyed the article and you will be able to implement it in your practical programming.

 

About the Author

Kaushik Pal is a technical architect with 15 years of experience in enterprise application and product development. He has expertise in web technologies, architecture/design, java/j2ee, Open source and big data technologies. You can find more of his work at www.techalpine.com and you can email him here.



   
Comment and Contribute

 

 

Sitemap