Wednesday, November 29, 2023

ToolRunner and GenericOptionsParser in Hadoop

GenericOptionsParser is a utility class in Hadoop which resides in org.apache.hadoop.util package. GenericOptionsParser class helps in setting options through command line. It parses the command line arguments and sets them on a configuration object that can then be used in the application.

How GenericOptionsParser class is used

Rather than using GenericOptionsParser class directly generally you will implement Tool interface in your MapReduce class and use ToolRunner.run method to run your application which will use GenericOptionsParser internally to parse the command line arguments.

How GenericOptionsParser class helps

If you set configuration arguments with in your code then you are hard coding those arguments. Any change in any argument will require code change and recreation of jar.

Passing argument in command line gives the flexibility to add, reduce or change arguments without requiring any change in the code.

Generic Options

You can specify command line arguments using the following generic options.

  1. -archives <comma separated list of archives>- Specify comma separated archives to be unarchived on the compute machines. Applies only to job.
  2. -conf <configuration file>- Specify an application configuration file.
  3. -D <property>=<value>- Use value for given property.
  4. -files <comma separated list of files>- Specify comma separated files to be copied to the map reduce cluster. Applies only to job.
  5. -fs <file:///> or <hdfs://namenode:port>- Specify default filesystem URL to use. Overrides ‘fs.defaultFS’ property from configurations.
  6. -jt <local> or <resourcemanager:port>- Specify a ResourceManager. Applies only to job.
  7. -libjars <comma seperated list of jars>- Specify comma separated jar files to include in the classpath. Applies only to job.

GenericOptionParser with ToolRunner example

In the post Using Avro File With Hadoop MapReduce there is an example of using Avro file with MapReduce. In that example Avro schema is inlined with in the code.

Here the same example is written by passing that schema file (saleschema.avsc) as a command line argument.

saleschema.avsc

{
  "type": "record",    
  "name": "SalesRecord",
  "doc" : "Sales Records",
  "fields": 
 [
  {"name":"item", "type": "string"},
  {"name":"totalsales", "type": "int"}
 ]
}

MapReduce code

import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{

  //Mapper
  public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
      AvroValue<GenericRecord>>{
    private Text item = new Text();
    private GenericRecord record;
     @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      Schema SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
      record = new GenericData.Record(SALES_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");        
      item.set(salesArr[0]);
      record.put("item", salesArr[0]);
      record.put("totalsales", Integer.parseInt(salesArr[1]));
      context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
    }
  }
  
  // Reducer
  public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
             AvroKey<GenericRecord>, NullWritable>{    
    Schema SALES_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
    }
    public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values,
        Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue<GenericRecord> value : values) {
        GenericRecord    record = value.datum();
        sum += (Integer)record.get("totalsales");
      }
      GenericRecord record = new GenericData.Record(SALES_SCHEMA);
      record.put("item", key.datum());
      record.put("totalsales", sum);
      context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
    }
  }
  
  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroMR(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "AvroMR");
    job.setJarByClass(getClass());
    job.setMapperClass(ItemMapper.class);    
    job.setReducerClass(SalesReducer.class);
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    // Schema file needed here also
    Schema SALES_SCHEMA = new Schema.Parser().parse(
        new File("/home/netjs/saleschema.avsc"));
    AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
    AvroJob.setOutputKeySchema(job,    SALES_SCHEMA);    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}
Running this Hadoop MapReduce program with schema file passed as command line argument.
hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR -files /home/netjs/saleschema.avsc /test/input/sales.txt /test/out/sale

Here location of the schema file in the local file system is passed as a command line argument.

You can see the content of Avro output file using the avro-tools jar

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sale/part-r-00000.avro

{"item":"Item1","totalsales":1158}
{"item":"Item2","totalsales":642}
{"item":"Item3","totalsales":1507}

That's all for this topic ToolRunner and GenericOptions in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. What is SafeMode in Hadoop
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Write a Map Only Job in Hadoop MapReduce
  5. How to Check Hadoop MapReduce Logs

You may also like-

  1. Input Splits in Hadoop
  2. Capacity Scheduler in YARN
  3. How to Read And Write SequenceFile in Hadoop
  4. Parquet File Format in Hadoop
  5. Java Stream API Interview Questions
  6. Generating Getters And Setters Using Reflection - Java Program
  7. BigDecimal in Java
  8. Zipping Files in Java