Thursday, July 19, 2018

Using Avro File With Hadoop MapReduce

In this post we’ll see how to use Avro file with Hadoop MapReduce.

Avro MapReduce jar

You will need to download following jar and put it into project’s class path.

avro-mapred-1.8.2.jar

Avro MapReduce example

In this MapReduce program we have to get total sales per item and the output of MapReduce is an Avro file. Records are in the given tab separated format.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3

Avro schema (SALES_SCHEMA) used in the program is inlined with in the MapReduce code. While creating output record this Avro schema is referred as given below-

GenericRecord record = new GenericData.Record(SALES_SCHEMA);
MapReduce code
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{
    /// Schema
    private static final Schema SALES_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"SalesRecord\",\n" +
            "    \"doc\":    \"Sales Records\",\n" +
            "    \"fields\":\n" + 
            "    [\n" + 
            "            {\"name\": \"item\",    \"type\":    \"string\"},\n"+ 
            "            {\"name\":    \"totalsales\", \"type\":    \"int\"}\n"+
            "    ]\n"+
            "}\n");

    //Mapper
    public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
            AvroValue<GenericRecord>>{
        private Text item = new Text();
        private GenericRecord record = new GenericData.Record(SALES_SCHEMA);
        public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
            //splitting record
            String[] salesArr = value.toString().split("\t");        
            item.set(salesArr[0]);
            record.put("item", salesArr[0]);
            record.put("totalsales", Integer.parseInt(salesArr[1]));
            context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
         }
    }
    
    // Reducer
    public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
                      AvroKey<GenericRecord>, NullWritable>{       
        public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, 
              Context context) throws IOException, InterruptedException {
          int sum = 0;
          for (AvroValue<GenericRecord> value : values) {
              GenericRecord    record = value.datum();
              sum += (Integer)record.get("totalsales");
          }
          GenericRecord record = new GenericData.Record(SALES_SCHEMA);
          record.put("item", key.datum());
          record.put("totalsales", sum);
          context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
        }
    }
    
    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new AvroMR(), args);
        System.exit(exitFlag);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "AvroMR");
        job.setJarByClass(getClass());
        job.setMapperClass(ItemMapper.class);    
        job.setReducerClass(SalesReducer.class);
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
        AvroJob.setOutputKeySchema(job, SALES_SCHEMA);    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}    

Running the MapReduce code using the following command.

hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR /test/input/sales.txt /test/out/sales 

That creates an Avro file as output, to see the content of the output file you can use the following command.

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sales/part-r-00000.avro 

That's all for this topic Using Avro File With Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Avro File in Hadoop
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. How to Compress Intermediate Map Output in Hadoop
  5. Shuffle And Sort Phases in Hadoop MapReduce

You may also like-

  1. How to Write a Map Only Job in Hadoop MapReduce
  2. Shuffle And Sort Phases in Hadoop MapReduce
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. YARN in Hadoop
  6. PermGen Space Removal in Java 8
  7. BlockingQueue in Java Concurrency
  8. How to Create Deadlock in Java Multi-Threading - Java Program