Saturday, December 23, 2023

Distributed Cache in Hadoop MapReduce

Sometimes when you are running a MapReduce job your Map task and (or) reduce task may require some extra data in terms of a file, a jar or a zipped file in order to do their processing. In such scenarios you can use Distributed cache in Hadoop MapReduce.

What is distributed cache

Distributed cache in Hadoop provides a mechanism to copy files, jars or archives to the nodes where map and reduce tasks are running. Initially the specified file is cached to the HDFS, once the task is about to run Node manager copies the cached file to the local disk of the node where task is running.

Distributed cache in Hadoop

How to use Distributed cache in Hadoop

Earlier DistributedCache class methods were used to add files to distributed cache but that whole class is deprecated now.

You need to use methods in Job class to add the files to distributed cache, methods that can be used are as follows.

  1. public void addCacheArchive(URI uri)- This method is used to add an archive to be localized. Archived file will be unarchived on the node where task is running.
  2. public void addCacheFile(URI uri)- This method is used to add a file to be localized.
  3. public void setCacheArchives(URI[] archives)- Set the given set of archives that need to be localized.
  4. public void setCacheFiles(URI[] files)- Set the given set of files that need to be localized.
  5. public void addFileToClassPath(Path file)- This method adds a file path to the current set of classpath entries. It adds the file to cache as well.
  6. public void addArchiveToClassPath(Path archive)- This method adds an archive path to the current set of classpath entries. It adds the archive to cache as well.

If you are using GenericOptionsParser and ToolRunner in MapReduce code then you can pass the files to be added to the distributed cache through command line too.

Another advantage you get by using GenericOptionsParser to add a file is that you can add file from local file system too. With methods in job class, file has to be in shared file system which is one of the difference between these two options of adding files to a distributed cache in Hadoop.

If you are using Java API (Job class methods) to add file to distributed cache in Hadoop then you have to ensure that the file is copied to HDFS. Then you can use the relevant method (based on file, jar or archive).

For example - If you are adding a text file to distributed cache then you can use the following method call

job.addCacheFile(new URI("/test/input/abc.txt#abc")); 

Here #abc creates a symbolic link to the file and using this name (abc in this case) you can access the cached file in the task nodes.

If you are adding a jar to the class path then you can use the following method call

job.addFileToClassPath(new Path("/test/MyTestLib.jar")); 

If you are adding a .zip archive file to distributed cache then you can use the following method call.

job.addCacheArchive(new URI("/test/input/abc.zip")); 

Distributed cache with MapReduce example

Suppose you have data in the following format -

Item1 345 DEL
Item1 205 KOL
Item3 654 BLR
Item3 453 KOL
Item2 350 MUM
Item1 122 DEL
    
What is needed is to find Total sales per city but in the end file, you want the full name of the City in the following form, not the three letter city code .
    
Item1 Delhi  467
    
In this scenario you can add a properties file to distributed cache which has the following form.
DEL=Delhi
KOL=Kolkata
    

In the reducer you can get this properties file from the distributed cache and replace the city code with full name by referring the properties file. Getting the file from distributed cache and loading it into properties instance will be done in the setup() method of the Reducer.

MapReduce code

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TotalSalesWithDC extends Configured implements Tool{
  // Mapper
  public static class TotalSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0] + " " + stringArr[2]);             
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
  // Reducer
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private Properties cityProp = new Properties();
    private Text cityKey = new Text();
    private IntWritable result = new IntWritable();
    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
      // That's where file stored in distributed cache is used 
      InputStream iStream = new FileInputStream("./city");  
      //Loading properties
      cityProp.load(iStream);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;        
      String[] stringArr = key.toString().split(" ");
      // Getting the city name from prop file
      String city = cityProp.getProperty(stringArr[1].trim());
      cityKey.set(stringArr[0] + "\t"+ city);
      for (IntWritable val : values) {
        sum += val.get();
      }   
      result.set(sum);
      context.write(cityKey, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new TotalSalesWithDC(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "TotalSales");
    job.setJarByClass(getClass());
    job.setMapperClass(TotalSalesMapper.class); 
    job.setReducerClass(TotalSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // Adding file to distributed cache
    job.addCacheFile(new URI("/test/input/City.properties#city"));
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Distributed Cache in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Shuffle And Sort Phases in Hadoop MapReduce
  2. What Are Counters in Hadoop MapReduce
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Data Compression in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. Sequence File in Hadoop
  4. YARN in Hadoop
  5. HDFS High Availability
  6. Java Collections Interview Questions
  7. Fail-Fast Vs Fail-Safe Iterator in Java
  8. Data Access in Spring Framework