Wednesday, October 25, 2023

Tips for improving MapReduce performance

In this post some of the tips for improving MapReduce job performance are given. Three areas where you can look for improving the MapReduce performance are.

  1. The Map task
  2. The Reduce task.
  3. Configuration of the shuffle and sort phase.
We’ll go through these areas to see what can be done to improve the over all MapReduce job performance.

Performance improvement steps for Map and Reduce tasks

1- Input data file for Map Task- If the input file is very large or there are a large number of small files both may dent the performance of Map tasks.

If the file is very large (in TBs) and the HDFS block size is default i.e. 128 MB that will result in a large number of input splits. Note that for each split a Map task is spawned so having a large number of input splits means a large number of Map tasks resulting in overhead in terms of starting Map processes. In such scenario increasing the HDFS block size to 256 MB or even 512 MB would mean increased input split size too.

That way each Map task would get more data to process resulting in creation of less map tasks and reduced overhead.

Same way if there are a large number of small files that also means a lot of time is spent on starting and stopping map tasks where as process time is very less. As a thumb rule a Map task should run for 1-3 minutes at least, if it is not then you should think of combining the input.

Your options are using a container file format like Sequence file or using the CombineFileInputFormat.

2- Is input data splittable- If input data is compressed and the compression format used is not splittable then only a single input split would be created for the whole compressed file, which means only one map task processing the whole file.
Ensure that the compression format used is splittable, you can use Bzip2 compression or LZO compression with indexing which are the splittable compression formats.

3- Using binary file formats- If you can use binary file format like Avro, SequenceFiles or Parquet that is more compact and gives better serialization and deserialization time compared to storing data as text.

4- Using native compression libraries- Using native libraries for compression and decompression outperforms the Java implementation counterparts.

5- How Many Reduces- Another step to improve MapReduce performance is how many reducers are you using for the job. As per the Hadoop documentation-
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

6- Using the correct partitioner- Map data partition for a specific reduce is done using a Partitioner. Default HashPartitioner partitions the data by calculating a hash using the map output key.

If that default partitioner doesn’t work well for your data set there will be more (key, value) pairs in some partitions where as other partitions will have less data resulting in reduce side data skew.

In such scenario you may think of implementing a custom partitioner.

Map and Reduce code related improvements

1- One of the most frequent mistake is to create new object every time as shown in the following code.

public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 // creating new object
 Text outValue = new Text(lineArray[0]);
 context.write(key, outValue);
}

Here a new outValue object is created for each (key,value) pair causing CPU overhead two times. At object creation time and having many objects for garbage collection. It is better to reuse object and have a code as following.

// create object outside map function
Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 outValue.set(lineArray[0]);
 context.write(key, outValue);
}

2- Don’t store values in a data structure like List or Map in your Map or Reduce task. Storing objects like this is expensive and doing that for many (key,value) pairs will result in huge memory consumption.

Performance improvement steps to speed up shuffle and sort phase

Shuffling and sorting involves computations in terms of sorting keys and partitioning, I/O in terms of writing map outputs to local disk and network transfers in terms of data transfer from map nodes to reduce nodes. Thus optimizing shuffle and sort phase results in great improvements in the overall performance of the MapReduce job.

First step towards optimizing shuffle and sort phase is to reduce size of the Map output. Some of the things you can do for that are as follows.

  1. Filtering data- Filter data as much as you can at the Map side itself rather than carrying everything to the reducer and then writing logic to filter records. Also set only the required fields rather than having a big object.
  2. Using combiner- By using combiner you can also aggregate data at the map end resulting in less records as map output.
  3. Compressing Map output- Compressing map output reduces the map output size which means less data to transfer to the reduce nodes.
  4. Can it be a Mapper only job- If your job doesn’t need any aggregation then you can run a Mapper only job. That way you can do away with reducer and the whole shuffling and sorting.

Setting configuration parameter for optimizing shuffle phase

There are many configuration parameters too which you can set for optimizing shuffle phase for your MapReduce job.

  • mapreduce.task.io.sort.mb- Using this parameter you can set the total amount of buffer memory to use while sorting files. Default is 100 megabytes. By increasing this value you can keep more data in memory reducing the data written to local disk.
  • mapreduce.reduce.shuffle.input.buffer.percent- Using this parameter you can set the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%. Be increasing this percenage you can use more memory for storing data at the reduce side.

That's all for this topic Tips for improving MapReduce performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Distributed Cache in Hadoop MapReduce
  4. OutputCommitter in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Fair Scheduler in YARN
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. How to Handle Missing And Under Replicated Blocks in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps
  6. Core Java Basics Interview Questions
  7. Reflection in Java
  8. Invoking Getters And Setters Using Reflection - Java Program

Distributed Cache in Hadoop MapReduce

Sometimes when you are running a MapReduce job your Map task and (or) reduce task may require some extra data in terms of a file, a jar or a zipped file in order to do their processing. In such scenarios you can use Distributed cache in Hadoop MapReduce.

What is distributed cache

Distributed cache in Hadoop provides a mechanism to copy files, jars or archives to the nodes where map and reduce tasks are running. Initially the specified file is cached to the HDFS, once the task is about to run Node manager copies the cached file to the local disk of the node where task is running.

Distributed cache in Hadoop

How to use Distributed cache in Hadoop

Earlier DistributedCache class methods were used to add files to distributed cache but that whole class is deprecated now.

You need to use methods in Job class to add the files to distributed cache, methods that can be used are as follows.

  1. public void addCacheArchive(URI uri)- This method is used to add an archive to be localized. Archived file will be unarchived on the node where task is running.
  2. public void addCacheFile(URI uri)- This method is used to add a file to be localized.
  3. public void setCacheArchives(URI[] archives)- Set the given set of archives that need to be localized.
  4. public void setCacheFiles(URI[] files)- Set the given set of files that need to be localized.
  5. public void addFileToClassPath(Path file)- This method adds a file path to the current set of classpath entries. It adds the file to cache as well.
  6. public void addArchiveToClassPath(Path archive)- This method adds an archive path to the current set of classpath entries. It adds the archive to cache as well.

If you are using GenericOptionsParser and ToolRunner in MapReduce code then you can pass the files to be added to the distributed cache through command line too.

Another advantage you get by using GenericOptionsParser to add a file is that you can add file from local file system too. With methods in job class, file has to be in shared file system which is one of the difference between these two options of adding files to a distributed cache in Hadoop.

If you are using Java API (Job class methods) to add file to distributed cache in Hadoop then you have to ensure that the file is copied to HDFS. Then you can use the relevant method (based on file, jar or archive).

For example - If you are adding a text file to distributed cache then you can use the following method call

job.addCacheFile(new URI("/test/input/abc.txt#abc")); 

Here #abc creates a symbolic link to the file and using this name (abc in this case) you can access the cached file in the task nodes.

If you are adding a jar to the class path then you can use the following method call

job.addFileToClassPath(new Path("/test/MyTestLib.jar")); 

If you are adding a .zip archive file to distributed cache then you can use the following method call.

job.addCacheArchive(new URI("/test/input/abc.zip")); 

Distributed cache with MapReduce example

Suppose you have data in the following format -

Item1 345 DEL
Item1 205 KOL
Item3 654 BLR
Item3 453 KOL
Item2 350 MUM
Item1 122 DEL
    
What is needed is to find Total sales per city but in the end file, you want the full name of the City in the following form, not the three letter city code .
    
Item1 Delhi  467
    
In this scenario you can add a properties file to distributed cache which has the following form.
DEL=Delhi
KOL=Kolkata
    

In the reducer you can get this properties file from the distributed cache and replace the city code with full name by referring the properties file. Getting the file from distributed cache and loading it into properties instance will be done in the setup() method of the Reducer.

MapReduce code

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TotalSalesWithDC extends Configured implements Tool{
  // Mapper
  public static class TotalSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0] + " " + stringArr[2]);             
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
  // Reducer
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private Properties cityProp = new Properties();
    private Text cityKey = new Text();
    private IntWritable result = new IntWritable();
    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
      // That's where file stored in distributed cache is used 
      InputStream iStream = new FileInputStream("./city");  
      //Loading properties
      cityProp.load(iStream);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;        
      String[] stringArr = key.toString().split(" ");
      // Getting the city name from prop file
      String city = cityProp.getProperty(stringArr[1].trim());
      cityKey.set(stringArr[0] + "\t"+ city);
      for (IntWritable val : values) {
        sum += val.get();
      }   
      result.set(sum);
      context.write(cityKey, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new TotalSalesWithDC(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "TotalSales");
    job.setJarByClass(getClass());
    job.setMapperClass(TotalSalesMapper.class); 
    job.setReducerClass(TotalSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // Adding file to distributed cache
    job.addCacheFile(new URI("/test/input/City.properties#city"));
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Distributed Cache in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Shuffle And Sort Phases in Hadoop MapReduce
  2. What Are Counters in Hadoop MapReduce
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Data Compression in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. Sequence File in Hadoop
  4. YARN in Hadoop
  5. HDFS High Availability
  6. Java Collections Interview Questions
  7. Fail-Fast Vs Fail-Safe Iterator in Java
  8. Data Access in Spring Framework

Tuesday, October 24, 2023

Shuffle And Sort Phases in Hadoop MapReduce

When you run a MapReduce job and mappers start producing output internally lots of processing is done by the Hadoop framework before the reducers get their input. Hadoop framework also guarantees that the map output is sorted by keys. This whole internal processing of sorting map output and transfering it to reducers is known as shuffle phase in Hadoop framework.

The tasks done internally by Hadoop framework with in the shuffle phase are as follows-

  1. Data from mappers is partitioned as per the number of reducers.
  2. Data is also sorted by keys with in a partition.
  3. Output from Maps is written to disk as many temporary files.
  4. Once the map task is finished all the files written to the disk are merged to create a single file.
  5. Data from a particular partition (from all mappers) is transferred to a reducer that is suppose to process that particular partition.
  6. If data transferred to a reducer exceeded the memory limit then it is copied to a disk.
  7. Once reducer has got its portion of data from all the mappers data is again merged while still maintaining the sort order of keys to create reduce task input.

As you can see some of the shuffle phase tasks happen at the nodes where mappers are running and some of them at the nodes where reducers are running.

Shuffle phase process at mappers side

When the map task starts producing output it is not directly written to disk instead there is a memory buffer (size 100 MB by default) where map output is kept. This size is configurable and parameter that is used is – mapreduce.task.io.sort.mb

When that data from memory is spilled to disk is controlled by the configuration parameter mapreduce.map.sort.spill.percent (default is 80% of the memory buffer). Once this threshold of 80% is reached, a thread will begin to spill the contents to disk in the background.

Before writing to the disk the Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. For example let's say there are 4 mappers and 2 reducers for a MapReduce job. Then output of all of these mappers will be divided into 2 partitions one for each reducer.

shuffle phase in Hadoop

If there is a Combiner that is also executed in order to reduce the size of data written to the disk.

This process of keeping data into memory until threshold is reached, partitioning and sorting, creating a new spill file every time threshold is reached and writing data to the disk is repeated until all the records for the particular map tasks are processed. Before the Map task is finished all these spill files are merged, keeping the data partitioned and sorted by keys with in each partition, to create a single merged file.

Following image illustrates the shuffle phase process at the Map end.

shuffle phase map side

Shuffle phase process at Reducer side

By this time you have the Map output ready and stored on a local disk of the node where Map task was executed. Now the relevant partition of the output of all the mappers has to be transferred to the nodes where reducers are running.

Reducers don’t wait for all the map tasks to finish to start copying the data, as soon as a Map task is finished data transfer from that node is started. For example if there are 10 mappers running, framework won’t wait for all the 10 mappers to finish to start map output transfer. As soon as a map task finishes transfer of data starts.

Data copied from mappers is kept is memory buffer at the reducer side too. The size of the buffer is configured using the following parameter.

mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Default is 70%.

When the buffer reaches a certain threshold map output data is merged and written to disk.

This merging of Map outputs is known as sort phase. During this phase the framework groups Reducer inputs by keys since different mappers may have produced the same key as output.

The threshold for triggering the merge to disk is configured using the following parameter.

mapreduce.reduce.merge.inmem.thresholds- The number of sorted map outputs fetched into memory before being merged to disk. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk.

The merged file, which is the combination of data written to the disk as well as the data still kept in memory constitutes the input for Reduce task.

shuffle and sort phase in MapReduce

Points to note-

  1. The Mapper outputs are sorted and then partitioned per Reducer.
  2. The total number of partitions is the same as the number of reduce tasks for the job.
  3. Reducer has 3 primary phases: shuffle, sort and reduce.
  4. Input to the Reducer is the sorted output of the mappers.
  5. In shuffle phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
  6. In sort phase the framework groups Reducer inputs by keys from different map outputs.
  7. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

That's all for this topic Shuffle And Sort Phases in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. MapReduce Flow in YARN
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Speculative Execution in Hadoop
  4. Uber Mode in Hadoop
  5. Data Compression in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. Capacity Scheduler in YARN
  5. Java Multi-Threading Interview Questions
  6. ArrayBlockingQueue in Java Concurrency
  7. String Vs StringBuffer Vs StringBuilder in Java
  8. How to Convert a File to Byte Array

Monday, October 23, 2023

Using Combiner in Hadoop MapReduce to Improve Performance

In this post we’ll see what is combiner in Hadoop and how combiner helps in speeding up the shuffle and sort phase in Hadoop MapReduce.


What is combiner in Hadoop

Generally in a MapReduce job, data is collated in the Map phase and later aggregated in reduce phase. By specifying a combiner function in MapReduce you can aggregate data at the Map phase also.

You can specify a combiner in your MapReduce driver using the following statement -

job.setCombinerClass(COMBINER_CLASS.class); 

Note that specifying combiner in your MapReduce job is optional.

How combiner helps in improving MapReduce performance

Once the Map tasks start producing output that data has to be stored in memory, partitioned as per the number of reducers, sorted on keys and then spilled to the disk.

Once the Map task is done the data partitions have to be sent to the reducers (on different nodes) working on specific partitions. As you can see this whole shuffle and sort process involves consuming memory, I/O and data transfer across network.

If you specify a combiner function in MapReduce, when the map output stored in memory is written to disk, combiner function is run on the data so that there is less data to be written to the disk (reducing I/O) which also results in less data being transferred to reducer nodes (reducing bandwidth).

For example– Suppose you have sales data of several items and you are trying to find the maximum sales number per item. For Item1 if following (key,value) pair are the output of Map-1 and Map-2.

Map-1
(Item1, 300)
(Item1, 250)
(Item1, 340)
(Item1, 450)
Map-2
(Item1, 120)
(Item1, 540)
(Item1, 290)
(Item1, 300)
Then the reduce function which gets data for this key (Item1) will receive all these (key, value) pairs as input after the shuffle phase.
    
[Item1,(300,250,340,450,120,540,290,300)]
 

Resulting in final output - (Item1, 540)

If you are using a combiner in MapReduce job and the reducer class itself is used as the combiner class  then combiner will be called for each map output.

Map-1 Combiner output

      (Item1, 450) 
    

Map-2 Combiner output

      (Item1, 540)
         

Input to Reducer - [Item1, (450, 540)]

Resulting in final output - (Item1, 540)

So you can see by using a combiner map output is reduced which means less data is written to disk and less data is transferred to reducer nodes.

How to write a Combiner function

For writing Combiner class you need to extend Reducer and implement the reduce method just like you do for writing the reducer. In fact in many cases reducer itself can be used as the Combiner.

The output key value types of combiner must be same as the output key value type of the mapper.

Combiner in Hadoop

Though it is not always possible to use the reducer as the combiner class, classic example of this constraint is calculation of average.

For example- If there are two maps with (key, value) pair as following

Map-1 (1,4,7) and Map-2 (8,9)

Then reduce function will calculate average as – (1+4+7+8+9)/5 = 29/5 = 5.8

where as with combiner where average will also be calculated per map output

Map-1 – (1+4+7)/3 = 12/3 = 4

Map-2 – (8+9)/2 = 17/2 = 8.5

So the average calculated at reduce side will be – (4+8.5)/2 = 12.5/2 = 6.25

Combiner with MapReduce example

Here is a example where combiner is specified while calculating maximum sales figure per item.

  
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxSales extends Configured implements Tool{
  // Mapper
  public static class MaxSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0]);
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
    
  // Reducer
  public static class MaxSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int maxSalesValue = Integer.MIN_VALUE;
      for(IntWritable val : values) {
        maxSalesValue = Math.max(maxSalesValue, val.get());
      }  
      result.set(maxSalesValue);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new MaxSales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "MaxSales");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxSalesMapper.class); 
    // Specifying combiner class
    job.setCombinerClass(MaxSalesReducer.class);
    job.setReducerClass(MaxSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

In the displayed counters for the MapReduce job you can see the reduction in number of records passed to reducer.

   
Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=57
  Input split bytes=103
  Combine input records=21
  Combine output records=4
  Reduce input groups=4
  Reduce shuffle bytes=57
  Reduce input records=4
  Reduce output records=4

For comparison here are the counters when the same MapReduce job is run without a Combiner class.

   
  Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=273
  Input split bytes=103
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=273
  Reduce input records=21
  Reduce output records=4
  Spilled Records=42
    

That's all for this topic Using Combiner in Hadoop MapReduce to Improve Performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. What Are Counters in Hadoop MapReduce
  3. MapReduce Flow in YARN
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. File Write in HDFS - Hadoop Framework Internal Steps
  3. How to Read And Write Parquet File in Hadoop
  4. Using Avro File With Hadoop MapReduce
  5. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  6. Installing Ubuntu Along With Windows
  7. CopyOnWriteArrayList in Java
  8. Creating a Maven Project in Eclipse

How to Check Hadoop MapReduce Logs

In your Hadoop MapReduce job if you are wondering how to put logs or where to check MapReduce logs or even System.out statements then this post shows the same. Note that here accessing logs is shown for MapReuduce 2.

Location of logs in Hadoop MapReduce

An application ID is created for every MapReduce job. You can get that application ID from the console itself after starting your MapReduce job. It will be similar to as shown below.

18/07/11 14:39:23 INFO impl.YarnClientImpl: Submitted application application_1531299441901_0001

A folder with the same application ID will be created in the logs/userlogs of your Hadoop installation directory. For example I can see following directory for the application ID mentioned above.
HADOOP_INSTALLATION_DIR/logs/userlogs/application_1531299441901_0001

With in this directory you will find separate folders created for mappers and reducers and there you will have following files for logs and sysouts.

syslog- Contains the log messages.

sysout- Contains the System.out messages.

MapReduce example with logs

Here is a simple word count MapReduce program with logs and sysouts added.

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
  public static final Log log = LogFactory.getLog(WordCount.class);
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      log.info("In map method");
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      System.out.println("Array length- " + stringArr.length);
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }       
    }
  }
    
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
          throws IOException, InterruptedException {
      log.info("In reduce method with key " + key);
      int sum = 0;
      for (IntWritable val : values) {
          sum += val.get();
      }
      System.out.println("Key - " + key + " sum - " + sum);
      result.set(sum);
      context.write(key, result);
    }
  }
    
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new WordCount(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WordCount");
    job.setJarByClass(getClass());
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}
 
Once you run this MapReduce job, using the application ID you can go to the location as already explained above and check the log and sysout messages.

That's all for this topic How to Check Hadoop MapReduce Logs. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. How to Compress Intermediate Map Output in Hadoop
  3. How to Write a Map Only Job in Hadoop MapReduce
  4. Predefined Mapper And Reducer Classes in Hadoop
  5. How to Configure And Use LZO Compression in Hadoop

You may also like-

  1. NameNode, DataNode And Secondary NameNode in HDFS
  2. HDFS Commands Reference List
  3. Parquet File Format in Hadoop
  4. Sequence File in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. Spliterator in Java
  7. PermGen Space Removal in Java 8
  8. Converting double to String - Java Program

Sunday, October 22, 2023

Chaining MapReduce Job in Hadoop

While processing data using MapReduce you may want to break the requirement into a series of task and do them as a chain of MapReduce jobs rather than doing everything with in one MapReduce job and making it more complex. Hadoop provides two predefined classes ChainMapper and ChainReducer for the purpose of chaining MapReduce job in Hadoop.


ChainMapper class in Hadoop

Using ChainMapper class you can use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For adding map tasks to the ChainedMapper addMapper() method is used.

ChainReducer class in Hadoop

Using the predefined ChainReducer class in Hadoop you can chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For setting the Reducer class to the chain job setReducer() method is used.

For adding a Mapper class to the chain reducer addMapper() method is used.

How to chain MapReduce jobs

Using the ChainMapper and the ChainReducer classes it is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*].

In the chain of MapReduce job you can have-

  • A chain of map tasks executed using ChainMapper
  • A reducer set using ChainReducer.
  • A chain of map tasks added using ChainReducer (This step is optional).

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain.

Benefits of using a chained MapReduce job

  • When MapReduce jobs are chained data from immediate mappers is kept in memory rather than storing to disk so that another mapper in chain doesn't have to read data from disk. Immediate benefit of this pattern is a dramatic reduction in disk IO.
  • Gives you a chance to break the problem into simpler tasks and execute them as a chain.

Chained MapReduce job example

Let’s take a simple example to show chained MapReduce job in action. Here input file has item, sales and zone columns in the below format (tab separated) and you have to get the total sales per item for zone-1.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3
    

For the sake of example let’s say in first mapper you get all the records, in the second mapper you filter them to get only the records for zone-1. In the reducer you get the total for each item and then you flip the records so that key become value and value becomes key. For that Inverse Mapper is used which is a predefined mapper in Hadoop.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sales extends Configured implements Tool{
  // First Mapper
  public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{
    private Text item = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");
      item.set(salesArr[0]);
      // Writing (sales,zone) as value
      context.write(item, new Text(salesArr[1] + "," + salesArr[2]));
    }
  }
    
  // Mapper 2
  public static class FilterMapper extends Mapper<Text, Text, Text, IntWritable>{
    public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {
    
      String[] recordArr = value.toString().split(",");
      // Filtering on zone
      if(recordArr[1].equals("zone-1")) {
          Integer sales = Integer.parseInt(recordArr[0]);
          context.write(key, new IntWritable(sales));
      }
    }
  }
    
  // Reduce function
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{      
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new Sales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Sales");
    job.setJarByClass(getClass());
        
    // MapReduce chaining
    Configuration mapConf1 = new Configuration(false);
    ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class,
        Text.class, Text.class,  mapConf1);
        
    Configuration mapConf2 = new Configuration(false);
    ChainMapper.addMapper(job, FilterMapper.class, Text.class, Text.class,
        Text.class, IntWritable.class, mapConf2);
        
    Configuration reduceConf = new Configuration(false);        
    ChainReducer.setReducer(job, TotalSalesReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
         
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Chaining MapReduce Job in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Write a Map Only Job in Hadoop MapReduce
  3. Data Locality in Hadoop
  4. How to Compress Intermediate Map Output in Hadoop
  5. Java Program to Write File in HDFS

You may also like-

  1. HDFS Commands Reference List
  2. How to Handle Missing And Under Replicated Blocks in HDFS
  3. What is SafeMode in Hadoop
  4. Parquet File Format in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. How to Run a Shell Script From Java Program
  7. Java Collections Interview Questions
  8. Best Practices For Exception Handling in Java

Saturday, October 21, 2023

How to Write a Map Only Job in Hadoop MapReduce

In a MapReduce job in Hadoop you generally write both map function and reduce function. Map function to generate (key, value) pairs and reduce function to aggregate those (key, value) pairs but you may opt to have only the map function in your MapReduce job and skip the reducer part. That is known as a Mapper only job in Hadoop MapReduce.

Mapper only job in Hadoop

You may have a scenario where you just want to generate (key, value) pair in that case you can write a job with only map function. For example if you want to convert file to a binary file format like SequenceFile or to a columnar file format like Parquet.

Note that, generally in a MapReduce job output of Mappers are written to local disk rather than in HDFS. In case of Mapper only job map output is written to HDFS which is one of the difference between a MapReduce job and a Mapper only job in Hadoop.

Writing Mapper only job

In order to write a mapper only job you need to set number of reducers as zero. You can do by adding job.setNumReduceTasks(0); in your driver class.

As example

@Override
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 Job job = Job.getInstance(conf, "TestClass");
 job.setJarByClass(getClass());
 job.setMapperClass(TestMapper.class);
 // Setting reducer to zero
 job.setNumReduceTasks(0);
 .....
 .....

}

Another way to have a Mapper only job is to pass the configuration parameter in the command line. Parameter used is mapreduce.job.reduces note that before Hadoop 2 parameter was mapred.reduce.tasks which is deprecated now.

As example-

hadoop jar /path/to/jar ClasstoRun -D mapreduce.job.reduces=0 /input/path /output/path

Mapper only job runs faster

The output of map job is partitioned and sorted on keys. Then it is sent across the network to the nodes where reducer is running. This whole shuffle phase can be avoided by having a Mapper only job in Hadoop making it faster.

That's all for this topic How to Write a Map Only Job in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Word Count MapReduce Program in Hadoop
  2. How to Compress Intermediate Map Output in Hadoop
  3. Input Splits in Hadoop
  4. Uber Mode in Hadoop
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. HDFS Commands Reference List
  2. How to Handle Missing And Under Replicated Blocks in HDFS
  3. What is SafeMode in Hadoop
  4. HDFS High Availability
  5. Fair Scheduler in YARN
  6. Difference Between Abstract Class And Interface in Java
  7. Writing File in Java
  8. Java Lambda Expressions Interview Questions

Friday, October 20, 2023

Input Splits in Hadoop

When a MapReduce job is run to process input data one of the thing Hadoop framework does is to divide the input data into smaller chunks, these chunks are referred as input splits in Hadoop.

For each input split Hadoop creates one map task to process records in that input split. That is how parallelism is achieved in Hadoop framework. For example if a MapReduce job calculates that input data is divided into 8 input splits, then 8 mappers will be created to process those input splits.

How input splits are calculated

Input splits are created by the InputFormat class used in the MapReduce job. Actually the implementation for calculating splits for the input file is in the FileInputFormat class which is the base class for all the implementations of InputFormat.

InputFormat Class

public abstract class InputFormat<K, V> {
  public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

FileInputFormat method for computing splits

If you want to see how split size is calculated you can have a look at computeSplitSize method in org.apache.hadoop.mapreduce.lib.input.FileInputFormat class.

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}
Here values of minSize and maxSize are referred from the following configuration parameters-
  • mapreduce.input.fileinputformat.split.minsize- The minimum size chunk that map input should be split into. Default value is 0.
  • mapreduce.input.fileinputformat.split.maxsize- The maximum size chunk that map input should be split into. Default value is Long.MAX_VALUE.

So you can see with default values input split size is equal to HDFS block size.

Input split is a logical split

We have already seen how input data is divided into input splits in Hadoop. Further each split is divided into records (key-value pair). Map tasks process each of these records.

These input splits and records are logical, they don’t store or contain the actual data. They just refer to the data which is stored as blocks in HDFS. You can have a look at InputSplit class to verify that. InputSplit represents the data to be processed by an individual Mapper.

public abstract class InputSplit {
  public abstract long getLength() throws IOException, InterruptedException;

  public abstract String[] getLocations() throws IOException, InterruptedException;

  @Evolving
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
  }
}

As you can see the InputSplit class has the length of input split in bytes and a set of storage locations where the actual data is stored. These storage locations help Hadoop framework to spawn map tasks as close to the data as possible in order to take advantage of data locality optimization.

Input split Vs HDFS blocks

As already stated input split is the logical representation of the data stored in HDFS blocks. Where as data of file is stored physically in HDFS blocks. Now, you may think why these two representations of data is needed why can’t MapReduce directly process the data stored in blocks.

HDFS breaks the input file into chunks of exactly 128 MB (As per default configuration), with no consideration of data residing in those blocks. What if there is a record that spans the block boundaries i.e. A record that starts in one block and ends in another block. MapReduce job that is processing that HDFS block would only be able to read the portion of the record with in the block MapReduce job is processing rather than the whole record.That’s where the logical representation of data as used in input splits help. Input splits take care of the logical boundaries. Using the starting record in the block and the byte offset it can get the complete record even if it spans the block boundaries.

Input Splits in Hadoop

That's all for this topic Input Splits in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Speculative Execution in Hadoop
  2. How MapReduce Works in Hadoop
  3. What is SafeMode in Hadoop
  4. Data Compression in Hadoop
  5. YARN in Hadoop

You may also like-

  1. How to Compress Intermediate Map Output in Hadoop
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. HDFS Commands Reference List
  4. How to Configure And Use LZO Compression in Hadoop
  5. Capacity Scheduler in YARN
  6. How to Run a Shell Script From Java Program
  7. Enum Type in Java
  8. Switch-Case Statement in Java

Thursday, October 19, 2023

Data Locality in Hadoop

In this post we’ll see what is data locality in Hadoop and how it helps in minimizing the network traffic and increasing the overall throughput of the cluster.

Data locality optimization in Hadoop

To understand data locality in Hadoop you will need to understand how file is stored in HDFS and how MapReduce job calculates the number of input splits and launch map tasks to process data referred by those splits.

HDFS stores a file by dividing it into blocks of 128 MB (which is the default block size). These blocks are then stored in different nodes across the Hadoop cluster. There is also replication of blocks (by default replication factor is 3) so each block is stored on 3 different nodes for redundancy.

MapReduce job splits its input into input splits where split size is the size of an HDFS block, which is 128 MB by default. Hadoop creates one map task for each split i.e. if there are 8 input splits then 8 map tasks will be launched.

It is actually the client running the MapReduce job that calculates the splits for the job by calling getSplits().

That split information is used by YARN ApplicationMaster to try to schedule map tasks on the same node where split data is residing thus making the task data local. If map tasks are spawned at random locations then each map task has to copy the data it needs to process from the DataNode where that split data is residing, resulting in lots of cluster bandwidth. By trying to schedule map tasks on the same node where split data is residing what Hadoop framework does is to send computation to data rather than bringing data to computation, saving cluster bandwidth. This is called the data locality optimization.

Note here that it is not always possible to launch the map task on the same node where the input data resides because of resource constraints, in that case Hadoop framework will try to minimize the distance by trying to make map task rack local, if that is also not possible then it runs map task on different rack.

Categories based on data proximity

Based on where data for the mapper resides there are three categories.

  • Data local– If map task runs on the same node where the split data resides it is referred as data local. This is the optimal scenario.
  • Rack local– If the map task working on the data is launched on different node but in the same rack where the data resides this is known as rack local.
  • Different rack- If rack local is also not possible then map task is launched on a node on a different rack. In this case data has to be transferred between racks from the node where the split data resides to the node where map task is running.
data locality in Hadoop

That's all for this topic Data Locality in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Uber Mode in Hadoop
  2. What is SafeMode in Hadoop
  3. Data Compression in Hadoop
  4. Speculative Execution in Hadoop
  5. YARN in Hadoop

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. Java Program to Read File in HDFS
  3. NameNode, DataNode And Secondary NameNode in HDFS
  4. MapReduce Flow in YARN
  5. How to Compress Intermediate Map Output in Hadoop
  6. Stream API in Java 8
  7. How to Run a Shell Script From Java Program
  8. Lazy Initializing Spring Beans

MapReduce Flow in YARN

How a MapReduce job runs in YARN is different from how it used to run in MRv1. This post shows how MapReduce v2 runs internally in YARN Hadoop.

MapReduce flow - Submitting MapReduce job

First step is of course submitting the job in order to kick start the process.

For submitting the job you can use one of the following methods of the org.apache.hadoop.mapreduce.Job class-

  • void submit()- Submit the job to the cluster and return immediately.
  • boolean waitForCompletion(boolean)- Submit the job to the cluster and wait for it to finish.

When a job is submitted using one of the above mentioned methods, Job class creates an instance of JobSubmitter instance and submitJobInternal() method is called on that instance and following steps are taken.

  1. Getting a new application ID from the resource manager for the MapReduce job.
  2. Checking the input and output specifications of the job.
  3. Computing the InputSplits for the job.
  4. Setup the requisite accounting information for the org.apache.hadoop.mapreduce.filecache.DistributedCache of the job, if necessary.
  5. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
  6. Submit the job using the SubmitApplication method in YARNClient. SubmitApplication method submits a new application to YARN. It is a blocking call, it will not return ApplicationId until the submitted application is submitted successfully and accepted by the ResourceManager.

Important YARN components for running MapReduce job

Main components when running a MapReduce job in YARN are–

  1. Client- An application submission client that submits an application.
  2. ResourceManager- Daemon that manages the cluster resources.
  3. ApplicationMaster- communicates with the ResourceManager to negotiate and allocate resources for future containers to run the map and reduce tasks for the submitted job.
  4. NodeManager- Launches and monitor the resources used by the containers that run the mappers and reducers for the job. NodeManager daemon runs on each node in the cluster.

Interaction among these components is shown here-

Client<-->ResourceManager - By using YarnClient objects.

ApplicationMaster<-->ResourceManager - By using AMRMClientAsync objects, handling events asynchronously by AMRMClientAsync.CallbackHandler

ApplicationMaster<-->NodeManager - Launch containers. Communicate with NodeManagers by using NMClientAsync objects, handling container events by NMClientAsync.CallbackHandler.

Running tasks for the submitted MapReduce job

Once the job is submitted to the ResourceManager, initially a single container is negotiated for executing the application specific ApplicationMaster (Which is MRAppMaster in case of MapReduce applications). The YARN ResourceManager will then launch the ApplicationMaster on the allocated container.

Once launched ApplicationMaster performs the following tasks-

  1. Communicate with the ResourceManager to negotiate and allocate resources for containers required to run mappers and reducers for the submitted MapReduce job.
  2. After the containers are allocated, communicate with YARN NodeManagers to launch application containers on the nodes where the containers are allocated.
  3. Track the progress of the tasks running on the containers.

How ApplicationMaster runs the Map and Reduce tasks

ApplicationMaster retrieves the number of input splits calculated for the job at the time of submission. While running a MapReduce job as many map tasks are created as the count of input splits and the number of reducers is calculated using the mapreduce.job.reduces property which sets the default number of reduce tasks per job.

After knowing the number of mappers and reducers required for the job ApplicationMaster has to decide should it run the tasks sequentially in the same JVM where ApplicationMaster itself is running. If it does that, it is known as running the tasks in uber mode in Hadoop.

If the job is not run as an uber task then ApplicationMaster has to negotiate with ResourceManager to get resource containers to run those map and reduce tasks.

In the resource requests to ResourceManager memory requirements and CPUs for tasks are also specified. Values used for determining memory and CPU requirements for the map and reduce tasks are in mapred-site.xml configuration file.

  • mapreduce.map.memory.mb- The amount of memory to request from the scheduler for each map task. Default value is 1024 MB.
  • mapreduce.map.cpu.vcores– The number of virtual cores to request from the scheduler for each map task. Default value is 1.
  • mapreduce.reduce.memory.mb– The amount of memory to request from the scheduler for each reduce task. Default value is 1024 MB.
  • mapreduce.reduce.cpu.vcores– The number of virtual cores to request from the scheduler for each reduce task. Default value is 1.

ResourceManager’s scheduler will allocate these containers on different nodes in the Hadoop cluster. Reduce tasks can be assigned containers on any node with no locality constraint. For map tasks scheduler tries to allocate containers on the nodes where the split data resides for data locality optimization.

Once the containers are allocated, ApplicationMaster launch those containers on the nodes by contacting the NodeManagers of those nodes. The ApplicationMaster executes the mappers and reducers in a separate jvm on the launched containers.

MapReduce flow - Task progress and completion

The running map and reduce tasks report their progress every three seconds to ApplicationMaster which can create overall job progress from the updates from these separate tasks.

Client also receives the current status from the ApplicationMaster.

ApplicationMaster also emit heartbeats to the ResourceManager to keep it informed that it is alive and still running.

If a task fails to complete, the ApplicationMaster will reschedule that task on another node in the cluster.

When all the map and reduce tasks for the job are completed ApplicationMaster changes the job status to successful. After that ApplicationMaster unregisters itself and then stops the client.

MapReduce flow in YARN Hadoop
MapReduce flow in YARN

Reference- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html

That's all for this topic MapReduce Flow in YARN. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. YARN in Hadoop
  2. Fair Scheduler in YARN
  3. Replica Placement Policy in Hadoop Framework
  4. How MapReduce Works in Hadoop
  5. Speculative Execution in Hadoop

You may also like-

  1. Word Count MapReduce Program in Hadoop
  2. Java Program to Read File in HDFS
  3. What is SafeMode in Hadoop
  4. HDFS High Availability
  5. Data Compression in Hadoop
  6. Multi-Catch Statement in Java Exception Handling
  7. How HashSet Works Internally in Java
  8. Converting double to String - Java Program

Monday, October 16, 2023

Uber Mode in Hadoop

When a MapReduce job is submitted, ResourceManager launches the ApplicationMaster process (for MapReduce the ApplicationMaster is MRAppMaster) on a container. Then ApplicationMaster retrieves the number of input splits for the job and based on that it decides the number of mappers that has to be launched and also the number of reducers that has to be launched as per the configuration.

At this juncture ApplicationMaster has to decide whether to negotiate resources with the ResourceManager’s scheduler to run the map and reduce tasks or run the job sequentially with in the same JVM where ApplicationMaster is running.

This decision making by ApplicationMaster happens only if Uber mode is set to true in Hadoop. If uber mode is true and ApplicationMaster decides to run the MapReduce job with in the same JVM then the job is said to be running as uber task in YARN.

While running a MapReduce job in Hadoop2 you might have noticed this message on the console-

Job job_XXXXX_xxxx running in uber mode : false
This message will be displayed if you have not modified the default which is false for uber mode in Hadoop.

Why to run job as uber task in YARN

For running a job as uber task in YARN, job has to be "sufficiently small".

If a job is small enough, the ApplicationMaster can decide that the process of negotiating with ResourceManager to get resource containers, contacting the node managers of remote nodes and running tasks on those containers requires more effort than running the job sequentially on the same JVM as ApplicationMaster. Then it can run the job as uber task in YARN.

Uber mode configuration parameters in Hadoop

There are certain configuration parameters that help ApplicationMaster to decide when the job is small enough to be run as uber task in YARN. By default the values are; there should be less than 10 mappers, only one reducer and maximum input bytes should not be more than a HDFS block size.

Configurations parameters required for uber mode are set in etc/hadoop/mapred-site.xml

  • mapreduce.job.ubertask.enable- Used to set the uber mode as true or false. Set to true means small-jobs "ubertask" optimization is enabled. Default value is false.
  • mapreduce.job.ubertask.maxmaps- This configuration sets the maximum number of maps, beyond which job is considered too big for the ubertasking optimization. Users may override this value, but only downward. Default value is 9.
  • mapreduce.job.ubertask.maxreduces- This configuration sets the maximum number of reduces, beyond which job is considered too big for the ubertasking optimization. Currently support is for one reduce only. Larger values will be ignored. Users may override this value, but only downward. Default value is 1.
  • mapreduce.job.ubertask.maxbytes- Threshold for number of input bytes, beyond which job is considered too big for the ubertasking optimization. If no value is specified, dfs.block.size is used as a default. In case of HDFS it would mean HDFS bock size. Users may override this value, but only downward.

That's all for this topic Uber Mode in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. YARN in Hadoop
  2. Capacity Scheduler in YARN
  3. Introduction to Hadoop Framework
  4. What is HDFS
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. What is SafeMode in Hadoop
  2. Java Program to Write File in HDFS
  3. How to Configure And Use LZO Compression in Hadoop
  4. Data Locality in Hadoop
  5. Speculative Execution in Hadoop
  6. How HashMap Internally Works in Java
  7. Volatile Keyword in Java With Examples
  8. Angular @ViewChildren Decorator With Examples

Sunday, October 15, 2023

Speculative Execution in Hadoop

Speculative execution in Hadoop MapReduce is an option to run a duplicate map or reduce task for the same input data on an alternative node. This is done so that any slow running task doesn’t slow down the whole job.

Why is speculative execution in Hadoop needed

In Hadoop an input file is partitioned into several blocks and those blocks are stored on different nodes on a Hadoop cluster, there is also a replication of those blocks for redundancy.

When a Map-Reduce job runs it calculates the number of input splits (size of split is equal to HDFS block) and run as many map tasks as the number of splits. These map tasks run in parallel on the nodes where the data referred by the split resides.

What if few nodes in the cluster are not performing as fast as other nodes because of hardware or network problems. Map tasks running on those nodes will be slower compared to the map tasks running on other nodes. Reduce tasks can only start their execution once intermediate outputs of all the map tasks are available. So few slow moving map tasks can delay the execution of reduce tasks.

Also reduce tasks running on a slower node may take more time to finish thus delaying the over all job final output.

To guard against such slow tasks Hadoop starts the same task (working on the same input) on another node. Note that every block is replicated thrice by default. Hadoop will get the location of another node where the same input data resides and launch the task on that node with the assumption that on that node task will finish faster. This optimization by Hadoop is called the speculative execution of the task.

When is speculative task started

Once the map tasks or reduce tasks are started and monitored for some time Hadoop framework can determine which map task or reduce task is not making as much progress as the other running tasks of the same type. Only after this monitoring for some time and determining which tasks are slower Hadoop starts speculative execution of the tasks.

Since the speculative task in MapReduce and the original task both are working on the same set of data, output of which ever task finishes first successfully is used and the other one is killed.

How to configure speculative execution in Hadoop

Speculative execution is enabled by default for both map and reduce tasks. Properties for speculative execution are set in mapred-site.xml file.

  • mapreduce.map.speculative- If set to true then speculative execution of map task is enabled. Default is true.
  • mapreduce.reduce.speculative- If set to true then speculative execution of reduce task is enabled. Default is true.
  • mapreduce.job.speculative.speculative-cap-running-tasks- The max percent (0-1) of running tasks that can be speculatively re-executed at any time. Default value is 0.1.
  • mapreduce.job.speculative.speculative-cap-total-tasks- The max percent (0-1) of all tasks that can be speculatively re-executed at any time. Default value is 0.01.

Consideration for turning off speculative execution

Since speculative execution of task means running duplicate tasks, it increases the cluster load. If you have a very busy cluster or a cluster with limited resources then you may consider turning off the speculative execution.

Another thing to consider is that reduce task gets its input from more than one map tasks running on different nodes so there is data transfer in case of reduce tasks. Running a duplicate reduce task means same data transfer happens more than once thus increasing load on network.

That's all for this topic Speculative Execution in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Uber Mode in Hadoop
  2. What is SafeMode in Hadoop
  3. Replica Placement Policy in Hadoop Framework
  4. YARN in Hadoop
  5. Data Compression in Hadoop

You may also like-

  1. What is Big Data
  2. HDFS Commands Reference List
  3. File Write in HDFS - Hadoop Framework Internal Steps
  4. Compressing File in bzip2 Format in Hadoop - Java Program
  5. Capacity Scheduler in YARN
  6. How to Create Ubuntu Bootable USB
  7. Heap Memory Allocation in Java
  8. Lazy Initializing Spring Beans