Sunday, December 24, 2023

Tips for improving MapReduce performance

In this post some of the tips for improving MapReduce job performance are given. Three areas where you can look for improving the MapReduce performance are.

  1. The Map task
  2. The Reduce task.
  3. Configuration of the shuffle and sort phase.
We’ll go through these areas to see what can be done to improve the over all MapReduce job performance.

Performance improvement steps for Map and Reduce tasks

1- Input data file for Map Task- If the input file is very large or there are a large number of small files both may dent the performance of Map tasks.

If the file is very large (in TBs) and the HDFS block size is default i.e. 128 MB that will result in a large number of input splits. Note that for each split a Map task is spawned so having a large number of input splits means a large number of Map tasks resulting in overhead in terms of starting Map processes. In such scenario increasing the HDFS block size to 256 MB or even 512 MB would mean increased input split size too.

That way each Map task would get more data to process resulting in creation of less map tasks and reduced overhead.

Same way if there are a large number of small files that also means a lot of time is spent on starting and stopping map tasks where as process time is very less. As a thumb rule a Map task should run for 1-3 minutes at least, if it is not then you should think of combining the input.

Your options are using a container file format like Sequence file or using the CombineFileInputFormat.

2- Is input data splittable- If input data is compressed and the compression format used is not splittable then only a single input split would be created for the whole compressed file, which means only one map task processing the whole file.
Ensure that the compression format used is splittable, you can use Bzip2 compression or LZO compression with indexing which are the splittable compression formats.

3- Using binary file formats- If you can use binary file format like Avro, SequenceFiles or Parquet that is more compact and gives better serialization and deserialization time compared to storing data as text.

4- Using native compression libraries- Using native libraries for compression and decompression outperforms the Java implementation counterparts.

5- How Many Reduces- Another step to improve MapReduce performance is how many reducers are you using for the job. As per the Hadoop documentation-
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

6- Using the correct partitioner- Map data partition for a specific reduce is done using a Partitioner. Default HashPartitioner partitions the data by calculating a hash using the map output key.

If that default partitioner doesn’t work well for your data set there will be more (key, value) pairs in some partitions where as other partitions will have less data resulting in reduce side data skew.

In such scenario you may think of implementing a custom partitioner.

Map and Reduce code related improvements

1- One of the most frequent mistake is to create new object every time as shown in the following code.

public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 // creating new object
 Text outValue = new Text(lineArray[0]);
 context.write(key, outValue);
}

Here a new outValue object is created for each (key,value) pair causing CPU overhead two times. At object creation time and having many objects for garbage collection. It is better to reuse object and have a code as following.

// create object outside map function
Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 outValue.set(lineArray[0]);
 context.write(key, outValue);
}

2- Don’t store values in a data structure like List or Map in your Map or Reduce task. Storing objects like this is expensive and doing that for many (key,value) pairs will result in huge memory consumption.

Performance improvement steps to speed up shuffle and sort phase

Shuffling and sorting involves computations in terms of sorting keys and partitioning, I/O in terms of writing map outputs to local disk and network transfers in terms of data transfer from map nodes to reduce nodes. Thus optimizing shuffle and sort phase results in great improvements in the overall performance of the MapReduce job.

First step towards optimizing shuffle and sort phase is to reduce size of the Map output. Some of the things you can do for that are as follows.

  1. Filtering data- Filter data as much as you can at the Map side itself rather than carrying everything to the reducer and then writing logic to filter records. Also set only the required fields rather than having a big object.
  2. Using combiner- By using combiner you can also aggregate data at the map end resulting in less records as map output.
  3. Compressing Map output- Compressing map output reduces the map output size which means less data to transfer to the reduce nodes.
  4. Can it be a Mapper only job- If your job doesn’t need any aggregation then you can run a Mapper only job. That way you can do away with reducer and the whole shuffling and sorting.

Setting configuration parameter for optimizing shuffle phase

There are many configuration parameters too which you can set for optimizing shuffle phase for your MapReduce job.

  • mapreduce.task.io.sort.mb- Using this parameter you can set the total amount of buffer memory to use while sorting files. Default is 100 megabytes. By increasing this value you can keep more data in memory reducing the data written to local disk.
  • mapreduce.reduce.shuffle.input.buffer.percent- Using this parameter you can set the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%. Be increasing this percenage you can use more memory for storing data at the reduce side.

That's all for this topic Tips for improving MapReduce performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Distributed Cache in Hadoop MapReduce
  4. OutputCommitter in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Fair Scheduler in YARN
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. How to Handle Missing And Under Replicated Blocks in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps
  6. Core Java Basics Interview Questions
  7. Reflection in Java
  8. Invoking Getters And Setters Using Reflection - Java Program