Wednesday, December 20, 2023

Shuffle And Sort Phases in Hadoop MapReduce

When you run a MapReduce job and mappers start producing output internally lots of processing is done by the Hadoop framework before the reducers get their input. Hadoop framework also guarantees that the map output is sorted by keys. This whole internal processing of sorting map output and transfering it to reducers is known as shuffle phase in Hadoop framework.

The tasks done internally by Hadoop framework with in the shuffle phase are as follows-

  1. Data from mappers is partitioned as per the number of reducers.
  2. Data is also sorted by keys with in a partition.
  3. Output from Maps is written to disk as many temporary files.
  4. Once the map task is finished all the files written to the disk are merged to create a single file.
  5. Data from a particular partition (from all mappers) is transferred to a reducer that is suppose to process that particular partition.
  6. If data transferred to a reducer exceeded the memory limit then it is copied to a disk.
  7. Once reducer has got its portion of data from all the mappers data is again merged while still maintaining the sort order of keys to create reduce task input.

As you can see some of the shuffle phase tasks happen at the nodes where mappers are running and some of them at the nodes where reducers are running.

Shuffle phase process at mappers side

When the map task starts producing output it is not directly written to disk instead there is a memory buffer (size 100 MB by default) where map output is kept. This size is configurable and parameter that is used is – mapreduce.task.io.sort.mb

When that data from memory is spilled to disk is controlled by the configuration parameter mapreduce.map.sort.spill.percent (default is 80% of the memory buffer). Once this threshold of 80% is reached, a thread will begin to spill the contents to disk in the background.

Before writing to the disk the Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. For example let's say there are 4 mappers and 2 reducers for a MapReduce job. Then output of all of these mappers will be divided into 2 partitions one for each reducer.

shuffle phase in Hadoop

If there is a Combiner that is also executed in order to reduce the size of data written to the disk.

This process of keeping data into memory until threshold is reached, partitioning and sorting, creating a new spill file every time threshold is reached and writing data to the disk is repeated until all the records for the particular map tasks are processed. Before the Map task is finished all these spill files are merged, keeping the data partitioned and sorted by keys with in each partition, to create a single merged file.

Following image illustrates the shuffle phase process at the Map end.

shuffle phase map side

Shuffle phase process at Reducer side

By this time you have the Map output ready and stored on a local disk of the node where Map task was executed. Now the relevant partition of the output of all the mappers has to be transferred to the nodes where reducers are running.

Reducers don’t wait for all the map tasks to finish to start copying the data, as soon as a Map task is finished data transfer from that node is started. For example if there are 10 mappers running, framework won’t wait for all the 10 mappers to finish to start map output transfer. As soon as a map task finishes transfer of data starts.

Data copied from mappers is kept is memory buffer at the reducer side too. The size of the buffer is configured using the following parameter.

mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Default is 70%.

When the buffer reaches a certain threshold map output data is merged and written to disk.

This merging of Map outputs is known as sort phase. During this phase the framework groups Reducer inputs by keys since different mappers may have produced the same key as output.

The threshold for triggering the merge to disk is configured using the following parameter.

mapreduce.reduce.merge.inmem.thresholds- The number of sorted map outputs fetched into memory before being merged to disk. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk.

The merged file, which is the combination of data written to the disk as well as the data still kept in memory constitutes the input for Reduce task.

shuffle and sort phase in MapReduce

Points to note-

  1. The Mapper outputs are sorted and then partitioned per Reducer.
  2. The total number of partitions is the same as the number of reduce tasks for the job.
  3. Reducer has 3 primary phases: shuffle, sort and reduce.
  4. Input to the Reducer is the sorted output of the mappers.
  5. In shuffle phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
  6. In sort phase the framework groups Reducer inputs by keys from different map outputs.
  7. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

That's all for this topic Shuffle And Sort Phases in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. MapReduce Flow in YARN
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Speculative Execution in Hadoop
  4. Uber Mode in Hadoop
  5. Data Compression in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. Capacity Scheduler in YARN
  5. Java Multi-Threading Interview Questions
  6. ArrayBlockingQueue in Java Concurrency
  7. String Vs StringBuffer Vs StringBuilder in Java
  8. How to Convert a File to Byte Array