Sunday, December 17, 2023

Chaining MapReduce Job in Hadoop

While processing data using MapReduce you may want to break the requirement into a series of task and do them as a chain of MapReduce jobs rather than doing everything with in one MapReduce job and making it more complex. Hadoop provides two predefined classes ChainMapper and ChainReducer for the purpose of chaining MapReduce job in Hadoop.


ChainMapper class in Hadoop

Using ChainMapper class you can use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For adding map tasks to the ChainedMapper addMapper() method is used.

ChainReducer class in Hadoop

Using the predefined ChainReducer class in Hadoop you can chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For setting the Reducer class to the chain job setReducer() method is used.

For adding a Mapper class to the chain reducer addMapper() method is used.

How to chain MapReduce jobs

Using the ChainMapper and the ChainReducer classes it is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*].

In the chain of MapReduce job you can have-

  • A chain of map tasks executed using ChainMapper
  • A reducer set using ChainReducer.
  • A chain of map tasks added using ChainReducer (This step is optional).

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain.

Benefits of using a chained MapReduce job

  • When MapReduce jobs are chained data from immediate mappers is kept in memory rather than storing to disk so that another mapper in chain doesn't have to read data from disk. Immediate benefit of this pattern is a dramatic reduction in disk IO.
  • Gives you a chance to break the problem into simpler tasks and execute them as a chain.

Chained MapReduce job example

Let’s take a simple example to show chained MapReduce job in action. Here input file has item, sales and zone columns in the below format (tab separated) and you have to get the total sales per item for zone-1.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3
    

For the sake of example let’s say in first mapper you get all the records, in the second mapper you filter them to get only the records for zone-1. In the reducer you get the total for each item and then you flip the records so that key become value and value becomes key. For that Inverse Mapper is used which is a predefined mapper in Hadoop.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sales extends Configured implements Tool{
  // First Mapper
  public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{
    private Text item = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");
      item.set(salesArr[0]);
      // Writing (sales,zone) as value
      context.write(item, new Text(salesArr[1] + "," + salesArr[2]));
    }
  }
    
  // Mapper 2
  public static class FilterMapper extends Mapper<Text, Text, Text, IntWritable>{
    public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {
    
      String[] recordArr = value.toString().split(",");
      // Filtering on zone
      if(recordArr[1].equals("zone-1")) {
          Integer sales = Integer.parseInt(recordArr[0]);
          context.write(key, new IntWritable(sales));
      }
    }
  }
    
  // Reduce function
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{      
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new Sales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Sales");
    job.setJarByClass(getClass());
        
    // MapReduce chaining
    Configuration mapConf1 = new Configuration(false);
    ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class,
        Text.class, Text.class,  mapConf1);
        
    Configuration mapConf2 = new Configuration(false);
    ChainMapper.addMapper(job, FilterMapper.class, Text.class, Text.class,
        Text.class, IntWritable.class, mapConf2);
        
    Configuration reduceConf = new Configuration(false);        
    ChainReducer.setReducer(job, TotalSalesReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
         
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Chaining MapReduce Job in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Write a Map Only Job in Hadoop MapReduce
  3. Data Locality in Hadoop
  4. How to Compress Intermediate Map Output in Hadoop
  5. Java Program to Write File in HDFS

You may also like-

  1. HDFS Commands Reference List
  2. How to Handle Missing And Under Replicated Blocks in HDFS
  3. What is SafeMode in Hadoop
  4. Parquet File Format in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. How to Run a Shell Script From Java Program
  7. Java Collections Interview Questions
  8. Best Practices For Exception Handling in Java