Tuesday, December 19, 2023

Using Combiner in Hadoop MapReduce to Improve Performance

In this post we’ll see what is combiner in Hadoop and how combiner helps in speeding up the shuffle and sort phase in Hadoop MapReduce.


What is combiner in Hadoop

Generally in a MapReduce job, data is collated in the Map phase and later aggregated in reduce phase. By specifying a combiner function in MapReduce you can aggregate data at the Map phase also.

You can specify a combiner in your MapReduce driver using the following statement -

job.setCombinerClass(COMBINER_CLASS.class); 

Note that specifying combiner in your MapReduce job is optional.

How combiner helps in improving MapReduce performance

Once the Map tasks start producing output that data has to be stored in memory, partitioned as per the number of reducers, sorted on keys and then spilled to the disk.

Once the Map task is done the data partitions have to be sent to the reducers (on different nodes) working on specific partitions. As you can see this whole shuffle and sort process involves consuming memory, I/O and data transfer across network.

If you specify a combiner function in MapReduce, when the map output stored in memory is written to disk, combiner function is run on the data so that there is less data to be written to the disk (reducing I/O) which also results in less data being transferred to reducer nodes (reducing bandwidth).

For example– Suppose you have sales data of several items and you are trying to find the maximum sales number per item. For Item1 if following (key,value) pair are the output of Map-1 and Map-2.

Map-1
(Item1, 300)
(Item1, 250)
(Item1, 340)
(Item1, 450)
Map-2
(Item1, 120)
(Item1, 540)
(Item1, 290)
(Item1, 300)
Then the reduce function which gets data for this key (Item1) will receive all these (key, value) pairs as input after the shuffle phase.
    
[Item1,(300,250,340,450,120,540,290,300)]
 

Resulting in final output - (Item1, 540)

If you are using a combiner in MapReduce job and the reducer class itself is used as the combiner class  then combiner will be called for each map output.

Map-1 Combiner output

      (Item1, 450) 
    

Map-2 Combiner output

      (Item1, 540)
         

Input to Reducer - [Item1, (450, 540)]

Resulting in final output - (Item1, 540)

So you can see by using a combiner map output is reduced which means less data is written to disk and less data is transferred to reducer nodes.

How to write a Combiner function

For writing Combiner class you need to extend Reducer and implement the reduce method just like you do for writing the reducer. In fact in many cases reducer itself can be used as the Combiner.

The output key value types of combiner must be same as the output key value type of the mapper.

Combiner in Hadoop

Though it is not always possible to use the reducer as the combiner class, classic example of this constraint is calculation of average.

For example- If there are two maps with (key, value) pair as following

Map-1 (1,4,7) and Map-2 (8,9)

Then reduce function will calculate average as – (1+4+7+8+9)/5 = 29/5 = 5.8

where as with combiner where average will also be calculated per map output

Map-1 – (1+4+7)/3 = 12/3 = 4

Map-2 – (8+9)/2 = 17/2 = 8.5

So the average calculated at reduce side will be – (4+8.5)/2 = 12.5/2 = 6.25

Combiner with MapReduce example

Here is a example where combiner is specified while calculating maximum sales figure per item.

  
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxSales extends Configured implements Tool{
  // Mapper
  public static class MaxSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0]);
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
    
  // Reducer
  public static class MaxSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int maxSalesValue = Integer.MIN_VALUE;
      for(IntWritable val : values) {
        maxSalesValue = Math.max(maxSalesValue, val.get());
      }  
      result.set(maxSalesValue);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new MaxSales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "MaxSales");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxSalesMapper.class); 
    // Specifying combiner class
    job.setCombinerClass(MaxSalesReducer.class);
    job.setReducerClass(MaxSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

In the displayed counters for the MapReduce job you can see the reduction in number of records passed to reducer.

   
Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=57
  Input split bytes=103
  Combine input records=21
  Combine output records=4
  Reduce input groups=4
  Reduce shuffle bytes=57
  Reduce input records=4
  Reduce output records=4

For comparison here are the counters when the same MapReduce job is run without a Combiner class.

   
  Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=273
  Input split bytes=103
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=273
  Reduce input records=21
  Reduce output records=4
  Spilled Records=42
    

That's all for this topic Using Combiner in Hadoop MapReduce to Improve Performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. What Are Counters in Hadoop MapReduce
  3. MapReduce Flow in YARN
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. File Write in HDFS - Hadoop Framework Internal Steps
  3. How to Read And Write Parquet File in Hadoop
  4. Using Avro File With Hadoop MapReduce
  5. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  6. Installing Ubuntu Along With Windows
  7. CopyOnWriteArrayList in Java
  8. Creating a Maven Project in Eclipse