Sunday, December 31, 2023

Using Avro File With Hadoop MapReduce

In this post we’ll see how to use Avro file with Hadoop MapReduce.

Avro MapReduce jar

You will need to download following jar and put it into project’s class path.

avro-mapred-1.8.2.jar

Avro MapReduce example

In this MapReduce program we have to get total sales per item and the output of MapReduce is an Avro file. Records are in the given tab separated format.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3

Avro schema (SALES_SCHEMA) used in the program is inlined with in the MapReduce code. While creating output record this Avro schema is referred as given below-

GenericRecord record = new GenericData.Record(SALES_SCHEMA);
MapReduce code
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{
    /// Schema
    private static final Schema SALES_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"SalesRecord\",\n" +
            "    \"doc\":    \"Sales Records\",\n" +
            "    \"fields\":\n" + 
            "    [\n" + 
            "            {\"name\": \"item\",    \"type\":    \"string\"},\n"+ 
            "            {\"name\":    \"totalsales\", \"type\":    \"int\"}\n"+
            "    ]\n"+
            "}\n");

    //Mapper
    public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
            AvroValue<GenericRecord>>{
        private Text item = new Text();
        private GenericRecord record = new GenericData.Record(SALES_SCHEMA);
        public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
            //splitting record
            String[] salesArr = value.toString().split("\t");        
            item.set(salesArr[0]);
            record.put("item", salesArr[0]);
            record.put("totalsales", Integer.parseInt(salesArr[1]));
            context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
         }
    }
    
    // Reducer
    public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
                      AvroKey<GenericRecord>, NullWritable>{       
        public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, 
              Context context) throws IOException, InterruptedException {
          int sum = 0;
          for (AvroValue<GenericRecord> value : values) {
              GenericRecord    record = value.datum();
              sum += (Integer)record.get("totalsales");
          }
          GenericRecord record = new GenericData.Record(SALES_SCHEMA);
          record.put("item", key.datum());
          record.put("totalsales", sum);
          context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
        }
    }
    
    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new AvroMR(), args);
        System.exit(exitFlag);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "AvroMR");
        job.setJarByClass(getClass());
        job.setMapperClass(ItemMapper.class);    
        job.setReducerClass(SalesReducer.class);
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
        AvroJob.setOutputKeySchema(job, SALES_SCHEMA);    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}    

Running the MapReduce code using the following command.

hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR /test/input/sales.txt /test/out/sales 

That creates an Avro file as output, to see the content of the output file you can use the following command.

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sales/part-r-00000.avro 

That's all for this topic Using Avro File With Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Avro File in Hadoop
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. How to Compress Intermediate Map Output in Hadoop
  5. Shuffle And Sort Phases in Hadoop MapReduce

You may also like-

  1. How to Write a Map Only Job in Hadoop MapReduce
  2. Shuffle And Sort Phases in Hadoop MapReduce
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. YARN in Hadoop
  6. PermGen Space Removal in Java 8
  7. BlockingQueue in Java Concurrency
  8. How to Create Deadlock in Java Multi-Threading - Java Program

Saturday, December 30, 2023

How to Read And Write Avro File in Hadoop

In this post we’ll see a Java program to read and write Avro files in Hadoop environment.

Required jars

For reading and writing an Avro file using Java API in Hadoop you will need to download following jars and add them to your project's classpath.

  • avro-1.8.2.jar
  • avro-tools-1.8.2.jar
The Avro Java implementation also depends on the Jackson JSON library. so you'll also need
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar

Writing Avro file – Java program

To write an Avro file in Hadoop using Java API steps are as following.

  1. You need an Avro schema.
  2. In your program you will have to parse that scema.
  3. Then you need to create records referring that parsed schema.
  4. Write those records to file.

Avro Schema

Avro schema used for the program is called Person.avsc and it resides in the folder resources with in the project structure.

{
 "type": "record",
 "name": "personRecords",
 "doc": "Personnel Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Address",
   "type": "string"
  }
 ]
}

Java Code

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class AvroFileWrite {

    public static void main(String[] args) {
        Schema schema = parseSchema();
        writetoAvro(schema);
    }
    
    
    // parsing the schema
    private static Schema parseSchema() {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = null;
        try {
            // Path to schema file
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
               "resources/Person.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;        
    }
    
    private static void writetoAvro(Schema schema) {
        GenericRecord person1 = new GenericData.Record(schema);
        person1.put("id", 1);
        person1.put("Name", "Jack");
        person1.put("Address", "1, Richmond Drive");
        
        GenericRecord person2 = new GenericData.Record(schema);
        person2.put("id", 2);
        person2.put("Name", "Jill");
        person2.put("Address", "2, Richmond Drive");
                    
        DatumWriter<GenericRecord> datumWriter = new 
                     GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = null;
        try {
            //out file path in HDFS
            Configuration conf = new Configuration();
            // change the IP
            FileSystem fs = FileSystem.get(URI.create(
                       "hdfs://124.32.45.0:9000/test/out/person.avro"), conf);
            OutputStream out = fs.create(new Path(
                      "hdfs://124.32.45.0:9000/test/out/person.avro"));
                    
            dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
            // for compression
            //dataFileWriter.setCodec(CodecFactory.snappyCodec());
            dataFileWriter.create(schema, out);
            
            dataFileWriter.append(person1);
            dataFileWriter.append(person2);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            if(dataFileWriter != null) {
                try {
                    dataFileWriter.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
        }            
    }
}
To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.
$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 
Then you can run the Java program using the following command.
$ hadoop org.netjs.AvroFileWrite

Reading Avro file – Java program

Java program to read back the Avro file written in the above program in Hadoop environment.

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

public class AvroFileRead {

    public static void main(String[] args) {
        Schema schema = parseSchema();
        readFromAvroFile(schema);
    }
    
    // parsing the schema
    private static Schema parseSchema() {
        Schema.Parser parser = new    Schema.Parser();
        Schema schema = null;
        try {
            // Path to schema file
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
                      "resources/Person.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;        
    }
        
    private static void readFromAvroFile(Schema schema) {
        
        Configuration conf = new Configuration();
        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            // change the IP
            FsInput in = new FsInput(new Path(
                    "hdfs://124.32.45.0:9000/user/out/person.avro"), conf);
            DatumReader<GenericRecord> datumReader = new 
                     GenericDatumReader<GenericRecord>(schema);
            dataFileReader = new DataFileReader<GenericRecord>(in, datumReader);
            GenericRecord person = null;
            while (dataFileReader.hasNext()) {
                person = dataFileReader.next(person);
                System.out.println(person);
            }
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            if(dataFileReader != null) {
                try {
                    dataFileReader.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
        }       
    }
}
You can run the Java program using the following command.
$ hadoop org.netjs.AvroFileRead

{"id": 1, "Name": "Jack", "Address": "1, Richmond Drive"}
{"id": 2, "Name": "Jill", "Address": "2, Richmond Drive"}

That's all for this topic How to Read And Write Avro File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Read And Write SequenceFile in Hadoop
  3. How to Configure And Use LZO Compression in Hadoop
  4. Java Program to Write File in HDFS
  5. How to Compress MapReduce Job Output in Hadoop

You may also like-

  1. How to Check Hadoop MapReduce Logs
  2. How MapReduce Works in Hadoop
  3. Predefined Mapper And Reducer Classes in Hadoop
  4. NameNode, DataNode And Secondary NameNode in HDFS
  5. What is SafeMode in Hadoop
  6. Java Lambda Expressions Interview Questions
  7. Transaction in Java-JDBC
  8. Different Bean Scopes in Spring

Friday, December 29, 2023

Converting Text File to Parquet File Using Hadoop MapReduce

This post shows how to convert existing data to Parquet file format using MapReduce in Hadoop. In the example given here Text file is converted to Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema directly Avro framework is used to create schema as it is more convenient. Then you can use Avro API classes to write and read files respectively. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

MapReduce code to convert file to Parquet format file

In the code Avro schema is defined inline. Program uses Avro genric API to create generic record. Also it’s a Mapper only job as just conversion is required, records are not aggregated.

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.example.data.Group;

public class ParquetConvert extends Configured implements Tool{
    
    /// Schema
    private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"TextFile\",\n" +
            "    \"doc\":    \"Text File\",\n" +
            "    \"fields\":\n" + 
            "    [\n" +  
            "            {\"name\":    \"line\", \"type\":    \"string\"}\n"+
            "    ]\n"+
            "}\n");
    
    // Map function
    public static class ParquetConvertMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
        
        private GenericRecord record = new GenericData.Record(MAPPING_SCHEMA);
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
             record.put("line", value.toString());
             context.write(null, record); 
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetConvert");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetConvertMapper.class);    
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Void.class);
        job.setOutputValueClass(Group.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        // setting schema
        AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetConvert(), args);
        System.exit(exitFlag);
    }
}   
On runnning the MapReduce code using the following command
hadoop jar /PATH_TO_JAR org.netjs.ParquetConvert /test/input /test/output
You can see that the Parquet file is written at the output location.
hdfs dfs -ls /test/output

Found 4 items
-rw-r--r--   1 netjs supergroup          0 2018-07-06 09:54 /test/output/_SUCCESS
-rw-r--r--   1 netjs supergroup        276 2018-07-06 09:54 /test/output/_common_metadata
-rw-r--r--   1 netjs supergroup        429 2018-07-06 09:54 /test/output/_metadata
-rw-r--r--   1 netjs supergroup        646 2018-07-06 09:54 /test/output/part-m-00000.parquet

Reading Parquet file using MapReduce

The following MapReduce program takes Parquet file as input and output a text file. In the Parquet file the records are in following format, so you need to write appropriate logic to extract the relevant part.

{"line": "Hello wordcount MapReduce Hadoop program."}
{"line": "This is my first MapReduce program."}
{"line": "This file will be converted to Parquet using MR."}
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.ExampleInputFormat;

public class ParquetRead extends Configured implements Tool{
    // Map function
    public static class ParquetMapper extends Mapper<NullWritable, Group, NullWritable, Text> {
         public void map(NullWritable key, Group value, Context context) 
                 throws IOException, InterruptedException {
             NullWritable outKey = NullWritable.get();
             String line = value.toString();
             
             String[] fields = line.split(": ");
             context.write(outKey, new Text(fields[1]));
                     
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetRead");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetMapper.class);    
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(ExampleInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetRead(), args);
        System.exit(exitFlag);
    }

}
If you want to read back the data you got using the writing to Parquet MapReduce program you can use the following command.
hadoop jar /PATH_TO_JAR org.netjs.ParquetRead /test/output/part-m-00000.parquet /test/out

That's all for this topic Converting Text File to Parquet File Using Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Parquet File in Hadoop
  2. Using Avro File With Hadoop MapReduce
  3. How to Configure And Use LZO Compression in Hadoop
  4. MapReduce Flow in YARN
  5. Input Splits in Hadoop

You may also like-

  1. Data Locality in Hadoop
  2. Speculative Execution in Hadoop
  3. YARN in Hadoop
  4. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  5. Serialization in Java
  6. Nested Class And Inner Class in Java
  7. Reflection in Java
  8. Print Odd-Even Numbers Using Threads And wait-notify - Java Program

Thursday, December 28, 2023

Counters in Hadoop MapReduce Job

If you run a MapReduce job you would have seen a lot of counters displayed on the console after the MapReduce job is finished (You can also check the counters using UI while the job is running). These counters in Hadoop MapReduce give a lot of statistical information about the executed job. Apart from giving you the information about the tasks these counters also help you in diagnosing the problems in MapReduce job, improving the MapReduce performance.

For example you get information about the spilled records and memory usage which gives you an indicator about the performance of your MapReduce job.


Types of counters in Hadoop

There are 2 types of Counters in Hadoop MapReduce.

  1. Built-In Counters
  2. User-Defined Counters or Custom counters

Built-In Counters in MapReduce

Hadoop Framework has some built-in counters which give information pertaining to-

  1. File system like bytes read, bytes written.
  2. MapReduce job like launched map and reduce tasks
  3. MapReduce task like map input records, combiner output records.

These built-in counters are grouped based on the type of information they provide and represented by Enum classes in Hadoop framework. Following is the list of the Counter groups and the corresponding Enum class names.

  1. File System Counters – org.apache.hadoop.mapreduce.FileSystemCounter
  2. Job Counters– org.apache.hadoop.mapreduce.JobCounter
  3. Map-Reduce Framework Counters– org.apache.hadoop.mapreduce.TaskCounter
  4. File Input Format Counters– org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
  5. File Output Format Counters– org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter

File System Counters in MapReduce

File system counters will be repeated for each type of file system, prefixed with the file system for each entry. As example FILE: Number of bytes read, HDFS: Number of bytes read.

  • Number of bytes read- Displays the number of bytes read by the file system for both Map and Reduce tasks.
  • Number of bytes written- Displays the number of bytes read by the file system for both Map and Reduce tasks.
  • Number of read operations- Displays the number of read operations by both Map and Reduce tasks.
  • Number of large read operations- Displays the number of large read operations (example: traversing the directory tree) for both Map and Reduce tasks.
  • Number of write operations- Displays the number of write operations by both Map and Reduce tasks.

Job Counters in MapReduce

These counters give information about the whole job not at the task level.

  • Launched map tasks- Displays the number of launched map tasks.
  • Launched reduce tasks- Displays the number of launched reduce tasks.
  • Launched uber tasks- Displays the number of tasks launched as uber tasks.
  • Data-local map tasks- Displays the number of mappers run on the same node where the input data they have to process resides.
  • Rack-local map taks- Displays the number of mappers run on the node on the same rack where the input data they have to process resides.
  • Map in uber tasks- Displays the number of maps run as uber tasks.
  • Reduce in uber tasks- Displays the number of reducers run as uber tasks.
  • Total time spent by all map tasks - Total time in miliseconds running all the launched map tasks.
  • Total time spent by all reduce tasks- Total time in miliseconds running all the launched reducde tasks.
  • Failed map tasks- Displays the number of map tasks that failed.
  • Failed reduce tasks- Displays the number of reduce tasks that failed.
  • Failed uber tasks- Displays the number of uber tasks that failed.
  • Killed map tasks- Displays the number of killed map tasks.
  • Killed reduce tasks- Displays the number of killed reduce tasks.

Map-Reduce Framework Counters

These counters collect information about the running task.

  • Map input records– Displays the number of records processed by all the maps in the MR job.
  • Map output records– Displays the number of output records produced by all the maps in the MR job.
  • Map skipped records– Displays the number of records skipped by all the maps.
  • Map output bytes– Displays the number of bytes produced by all the maps in the MR job.
  • Map output materialized bytes– Displays the Map output bytes written to the disk.
  • Reduce input groups– Displays the number of key groups processed by all the Reducers.
  • Reduce shuffle bytes– Displays the number of bytes of Map output copied to Reducers in shuffle process.
  • Reduce input records– Displays the number of input records processed by all the Reducers.
  • Reduce output records– Displays the number of output records produced by all the Reducers.
  • Reduce skipped records– Displays the number of records skipped by Reducer.
  • Input split bytes– Displays the data about input split objects in bytes.
  • Combine input records– Displays the number of input records processed by combiner.
  • Combine output records– Displays the number of output records produced by combiner.
  • Spilled Records– Displays the number of records spilled to the disk by all the map and reduce tasks.
  • Shuffled Maps– Displays the number of map output files transferred during shuffle process to nodes where reducers are running.
  • Failed Shuffles– Displays the number of map output files failed during shuffle.
  • Merged Map outputs– Displays the number of map outputs merged after map output is transferred.
  • GC time elapsed– Displays the garbage collection time in mili seconds.
  • CPU time spent– Displays the CPU processing time spent in mili seconds.
  • Physical memory snapshot– Displays the total physical memory used in bytes.
  • Virtual memory snapshot– Displays the total virtual memory used in bytes.
  • Total committed heap usage– Displays the total amount of heap memory available in bytes.

File Input Format Counters in MapReduce

  • Bytes Read– Displays the bytes read by Map tasks using the specified Input format.

File Output Format Counters in MapReduce

  • Bytes Written– Displays the bytes written by Map and reduce tasks using the specified Output format.

User defined counters in MapReduce

You can also create user defined counters in Hadoop using Java enum. The name of the Enum becomes the counter group's name where as each field in enum is a counter name.

You can increment these counters in the mapper or reducer based on some logic that will help you with debugging. User defined counters are also aggregated across all the mappers or reducers by the Hadoop framework and displayed as a single unit.

User defined counter example

Suppose you have data is following format and in some records sales data is missing.

Item1 345 zone-1
Item1  zone-2
Item3 654 zone-2
Item2 231 zone-3

Now you want to determine the number of records where sales data is missing to get a picture how much skewness is happening in your analysis because of missing fields.

MapReduce code

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SalesCalc extends Configured implements Tool {    
  enum Sales {
    SALES_DATA_MISSING
  }
  // Mapper
  public static class SalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    IntWritable sales = new IntWritable();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] salesArr = value.toString().split("\t");
      item.set(salesArr[0]);
                
      if(salesArr[1] != null && !salesArr[1].trim().equals("")) {
        sales.set(Integer.parseInt(salesArr[1]));
      }else {
        // incrementing counter
        context.getCounter(Sales.SALES_DATA_MISSING).increment(1);
        sales.set(0);
      }            
        context.write(item, sales);
    }
  }
    
  // Reducer
  public static class TotalSalesReducer extends Reducer<Text, Text, Text, IntWritable>{    
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new SalesCalc(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "SalesCalc");
    job.setJarByClass(getClass());
    job.setMapperClass(SalesMapper.class);    
    job.setReducerClass(TotalSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

In the counters displayed for the MapReduce job you can see the counter defined for getting the number of fields where sales numbers are missing.

org.netjs.SalesCalc$Sales
        SALES_DATA_MISSING=4

That's all for this topic Counters in Hadoop MapReduce Job. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. ToolRunner and GenericOptionsParser in Hadoop
  2. Chaining MapReduce Job in Hadoop
  3. Predefined Mapper And Reducer Classes in Hadoop
  4. Input Splits in Hadoop
  5. Data Locality in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. Sequence File in Hadoop
  4. Parquet File Format in Hadoop
  5. Apache Avro Format in Hadoop
  6. How to Create Ubuntu Bootable USB
  7. Java Exception Handling Interview Questions
  8. Ternary Operator in Java

Wednesday, December 27, 2023

Predefined Mapper And Reducer Classes in Hadoop

Hadoop framework comes prepackaged with many Mapper and Reducer classes. This post explains some of these predefined Mappers and Reducers in Hadoop and shows examples using the predefined Mappers and Reducers classes.

Predefined Mapper classes in Hadoop

  1. ChainMapper- The ChainMapper class allows to use multiple Mapper classes within a single Map task. Using this predefined class you can chain mapper classes where output of one map task becomes input of the second map task. That helps in breaking a complex task with lots of data processing into a chain of smaller tasks.
  2. FieldSelectionMapper- This class implements a mapper class that can be used to perform field selections in a manner similar to Unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the map output keys, and a list of fields that form the map output values. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The map output field list spec is under attribute "mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec". Both keyFieldsSpec and valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all the fields starting from field 3. Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
    By using this predefined class you don't need to write your own mapper with the split logic, you can configure FieldSelectionMapper with the required data to split the record. Jump to FieldSelectionMapper example.
  3. InverseMapper- This predefined Mapper swaps keys and values.
  4. TokenCounterMapper- Tokenize the input values and emit each word with a count of 1. This predefined class can be used where you want to do the sum of values like in a word count MapReduce program. Jump to TokenCounterMapper example.
  5. MultithreadedMapper- This Mapper is a Multithreaded implementation for org.apache.hadoop.mapreduce.Mapper. This predefined mapper is useful if your job is more I/O bound than CPU bound.
  6. ValueAggregatorMapper- This class implements the generic mapper of Aggregate.
  7. WrappedMapper- This predefined mapper wraps a given one to allow custom Mapper.Context implementations.
  8. RegexMapper- A Mapper that extracts text matching a regular expression.

Predefined Reducer classes in Hadoop

  1. ChainReducer- The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.
  2. IntSumReducer- This predefined Reducer is used to sum the int values grouped with a key. You can use this predefined reducer where you want to get the sum of values grouped by keys. Jump to IntSumReducer example.
  3. LongSumReducer- This predefined Reducer is used to sum the long values grouped with a key.
  4. FieldSelectionReducer- This class implements a reducer class that can be used to perform field selections in a manner similar to unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the reduce output keys, and a list of fields that form the reduce output values. The fields are the union of those from the key and those from the value. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The reduce output field list spec is under attribute "mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... As example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
  5. ValueAggregatorReducer- This class implements the generic reducer of Aggregate.
  6. WrappedReducer- A Reducer which wraps a given one to allow for custom Reducer.Context implementations.

Predefined Mapper and Reducer - ChainMapper and ChainReducer example

Predefined Mapper and Reducer - FieldSelectionMapper example

If you have to get few fields of the input file you can use FieldSelectionMapper for the same. Let’s say you have data in following format for item, zone and total sales.

Item1 zone-1 234
Item1 zone-2 456
Item3 zone-2 123 

And you need to find total sales for each item which means you’ll have to extract field 0 and field 2 in your Mapper.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SalesCalc extends Configured implements Tool {    
    
  // Reduce function
  public static class TotalSalesReducer extends Reducer<Text, Text, Text, IntWritable>{
    public void reduce(Text key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (Text val : values) {
        sum += Integer.parseInt(val.toString());
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new SalesCalc(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    // setting the separator
    conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
    // Configure the fields that are to be extracted
    conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:2");
    Job job = Job.getInstance(conf, "Sales");
    job.setJarByClass(getClass());
    // setting predefined FieldSelectionMapper
    job.setMapperClass(FieldSelectionMapper.class);    
 
    job.setReducerClass(TotalSalesReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
} 

Predefined Mapper and Reducer - TokenCounterMapper and IntSumReducer example

You can write a word count MapReduce program using predefined TokenCounterMapper and IntSumReducer. In that case you don’t need to write any logic just configure these classes and run your MapReduce job.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class WordCount extends Configured implements Tool{
  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new SimpleWordCount(), args);
    System.exit(exitFlag);
  }
 
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WordCount");
    job.setJarByClass(getClass());
    // Setting pre-defing mapper and reducer
    job.setMapperClass(TokenCounterMapper.class);    
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Predefined Mapper And Reducer Classes in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. MapReduce Flow in YARN
  3. Speculative Execution in Hadoop
  4. How to Compress MapReduce Job Output in Hadoop
  5. Replica Placement Policy in Hadoop Framework

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Read And Write Parquet File in Hadoop
  4. Fair Scheduler in YARN
  5. HDFS High Availability
  6. Find Largest And Smallest Number in The Given Array - Java Program
  7. How HashSet Works Internally in Java
  8. What is Dependency Injection in Spring

Monday, December 25, 2023

Data Compression in Hadoop

When we think about Hadoop, we think about very large files which are stored in HDFS and lots of data transfer among nodes in the Hadoop cluster while storing HDFS blocks or while running map reduce tasks. If you could some how reduce the file size that would help you in reducing storage requirements as well as in reducing the data transfer across the network. That’s where data compression in Hadoop helps.


Data compression at various stages in Hadoop

You can compress data in Hadoop MapReduce at various stages.

  1. Compressing input files- You can compress the input file that will reduce storage space in HDFS. If you compress the input files then the files will be decompressed automatically when the file is processed by a MapReduce job. Determining the appropriate coded will be done using the file name extension. As example if file name extension is .snappy hadoop framework will automatically use SnappyCodec to decompress the file.
  2. Compressing the map output- You can compress the intermediate map output. Since map output is written to disk and data from several map outputs is used by a reducer so data from map outputs is transferred to the node where reduce task is running. Thus by compressing intermediate map output you can reduce both storage space and data transfer across network.
  3. Compressing output files- You can also compress the output of a MapReduce job.

Hadoop compression formats

There are many different compression formats available in Hadoop framework. You will have to use one that suits your requirement.

Parameters that you need to look for are-

  1. Time it takes to compress.
  2. Space saving.
  3. Compression format is splittable or not.
Let’s go through the list of available compression formats and see which format provides what characteristics.

Deflate– It is the compression algorithm whose implementation is zlib. Defalte compression algorithm is also used by gzip compression tool. Filename extension is .deflate.

gzip- gzip compression is based on Deflate compression algorithm. Gzip compression is not as fast as LZO or snappy but compresses better so space saving is more.
Gzip is not splittable.
Filename extension is .gz.

bzip2- Using bzip2 for compression will provide higher compression ratio but the compressing and decompressing speed is slow. Bzip2 is splittable, Bzip2Codec implements SplittableCompressionCodec interface which provides the capability to compress / de-compress a stream starting at any arbitrary position.
Filename extension is .bz2.

Snappy– The Snappy compressor from Google provides fast compression and decompression but compression ratio is less.
Snappy is not splittable.
Filename extension is .snappy.

LZO– LZO, just like snappy is optimized for speed so compresses and decompresses faster but compression ratio is less.
LZO is not splittable by default but you can index the lzo files as a pre-processing step to make them splittable.
Filename extension is .lzo.

LZ4– Has fast compression and decompression speed but compression ratio is less. LZ4 is not splittable.
Filename extension is .lz4.

Zstandard– Zstandard is a real-time compression algorithm, providing high compression ratios. It offers a very wide range of compression / speed trade-off.
Zstandard is not splittable.
Filename extension is .zstd.

Codecs in Hadoop

Codec, short form of compressor-decompressor is the implementation of a compression-decompression algorithm. In Hadoop framework there are different codec classes for different compression formats, you will use the codec class for the compression format you are using. The codec classes in Hadoop are as follows-

Deflate – org.apache.hadoop.io.compress.DefaultCodec or org.apache.hadoop.io.compress.DeflateCodec (DeflateCodec is an alias for DefaultCodec). This codec uses zlib compression.

Gzip – org.apache.hadoop.io.compress.GzipCodec

Bzip2 – org.apache.hadoop.io.compress.Bzip2Codec

Snappy – org.apache.hadoop.io.compress.SnappyCodec

LZO – com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec
LZO libraries are GPL licensed and doesn't come with Hadoop release. Hadoop codec for LZO has to be downloaded separately.

LZ4– org.apache.hadoop.io.compress.Lz4Codec

Zstandard – org.apache.hadoop.io.compress.ZstandardCodec

Compression and Input splits

As you must be knowing MapReduce job calculates the number of input splits for the job and as many map tasks are launched as the count of splits. These map tasks process the data referred by input splits in parallel.

If you compress the input file using the compression format that is not splittable, then it won't be possible to read data at an arbitrary point in the stream. So the map tasks won't be able to read split data. In this scenario MapReduce won’t create input splits and whole file will be processed by one mapper which means no advantage of parallel processing and data transfer overhead too.

Let's try to clarify it with an example. If you have a 1 GB file it will be partitioned and stored as 8 data blocks in HDFS (Block size is 128 MB). MapReduce job using this file will also create 8 input splits and launch 8 mappers to process these splits in parallel.

Now, if you compress this 1 GB file using gzip (which is not splittable) then HDFS still stores the file as 8 separate blocks. As it is not possible to read data at an arbitrary point in the compressed gzip stream, MapReduce job won’t calculate input splits and launch only one map task to process all the 8 HDFS blocks. So you lose the advantage of parallel processing and there is no data locality optimization too. Even if map task is launched on the node where data for one of the block is stored data for all the other blocks has to be transferred to the node where map task is launched.

Here note that compression used is Splittable or not is a factor for text files only. If you are using container file format like sequence file or Avro then splitting is supported even if the compressor used is not splittable like Snappy or Gzip.

Compression increases CPU processing

There is a performance cost associated with compressing and decompressing the data. Though you save on storage and I/O activity is less but compression and decompression requires extra CPU cycles.

Though in most of the cases compressing data increases the overall job performance, ensure that you weigh the pros and cons and compare the performance gains with compressed data.

That's all for this topic Data Compression in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Compressing File in snappy Format in Hadoop - Java Program
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. YARN in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. What is SafeMode in Hadoop
  3. NameNode, DataNode And Secondary NameNode in HDFS
  4. HDFS High Availability
  5. Speculative Execution in Hadoop
  6. MapReduce Flow in YARN
  7. How to Create Password Protected Zip File in Java
  8. Compressing And Decompressing File in GZIP Format - Java Program

Sunday, December 24, 2023

Tips for improving MapReduce performance

In this post some of the tips for improving MapReduce job performance are given. Three areas where you can look for improving the MapReduce performance are.

  1. The Map task
  2. The Reduce task.
  3. Configuration of the shuffle and sort phase.
We’ll go through these areas to see what can be done to improve the over all MapReduce job performance.

Performance improvement steps for Map and Reduce tasks

1- Input data file for Map Task- If the input file is very large or there are a large number of small files both may dent the performance of Map tasks.

If the file is very large (in TBs) and the HDFS block size is default i.e. 128 MB that will result in a large number of input splits. Note that for each split a Map task is spawned so having a large number of input splits means a large number of Map tasks resulting in overhead in terms of starting Map processes. In such scenario increasing the HDFS block size to 256 MB or even 512 MB would mean increased input split size too.

That way each Map task would get more data to process resulting in creation of less map tasks and reduced overhead.

Same way if there are a large number of small files that also means a lot of time is spent on starting and stopping map tasks where as process time is very less. As a thumb rule a Map task should run for 1-3 minutes at least, if it is not then you should think of combining the input.

Your options are using a container file format like Sequence file or using the CombineFileInputFormat.

2- Is input data splittable- If input data is compressed and the compression format used is not splittable then only a single input split would be created for the whole compressed file, which means only one map task processing the whole file.
Ensure that the compression format used is splittable, you can use Bzip2 compression or LZO compression with indexing which are the splittable compression formats.

3- Using binary file formats- If you can use binary file format like Avro, SequenceFiles or Parquet that is more compact and gives better serialization and deserialization time compared to storing data as text.

4- Using native compression libraries- Using native libraries for compression and decompression outperforms the Java implementation counterparts.

5- How Many Reduces- Another step to improve MapReduce performance is how many reducers are you using for the job. As per the Hadoop documentation-
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

6- Using the correct partitioner- Map data partition for a specific reduce is done using a Partitioner. Default HashPartitioner partitions the data by calculating a hash using the map output key.

If that default partitioner doesn’t work well for your data set there will be more (key, value) pairs in some partitions where as other partitions will have less data resulting in reduce side data skew.

In such scenario you may think of implementing a custom partitioner.

Map and Reduce code related improvements

1- One of the most frequent mistake is to create new object every time as shown in the following code.

public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 // creating new object
 Text outValue = new Text(lineArray[0]);
 context.write(key, outValue);
}

Here a new outValue object is created for each (key,value) pair causing CPU overhead two times. At object creation time and having many objects for garbage collection. It is better to reuse object and have a code as following.

// create object outside map function
Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException {
 String[] lineArray = StringUtils.split(value.toString(), "\t"));
 outValue.set(lineArray[0]);
 context.write(key, outValue);
}

2- Don’t store values in a data structure like List or Map in your Map or Reduce task. Storing objects like this is expensive and doing that for many (key,value) pairs will result in huge memory consumption.

Performance improvement steps to speed up shuffle and sort phase

Shuffling and sorting involves computations in terms of sorting keys and partitioning, I/O in terms of writing map outputs to local disk and network transfers in terms of data transfer from map nodes to reduce nodes. Thus optimizing shuffle and sort phase results in great improvements in the overall performance of the MapReduce job.

First step towards optimizing shuffle and sort phase is to reduce size of the Map output. Some of the things you can do for that are as follows.

  1. Filtering data- Filter data as much as you can at the Map side itself rather than carrying everything to the reducer and then writing logic to filter records. Also set only the required fields rather than having a big object.
  2. Using combiner- By using combiner you can also aggregate data at the map end resulting in less records as map output.
  3. Compressing Map output- Compressing map output reduces the map output size which means less data to transfer to the reduce nodes.
  4. Can it be a Mapper only job- If your job doesn’t need any aggregation then you can run a Mapper only job. That way you can do away with reducer and the whole shuffling and sorting.

Setting configuration parameter for optimizing shuffle phase

There are many configuration parameters too which you can set for optimizing shuffle phase for your MapReduce job.

  • mapreduce.task.io.sort.mb- Using this parameter you can set the total amount of buffer memory to use while sorting files. Default is 100 megabytes. By increasing this value you can keep more data in memory reducing the data written to local disk.
  • mapreduce.reduce.shuffle.input.buffer.percent- Using this parameter you can set the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%. Be increasing this percenage you can use more memory for storing data at the reduce side.

That's all for this topic Tips for improving MapReduce performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Distributed Cache in Hadoop MapReduce
  4. OutputCommitter in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Fair Scheduler in YARN
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. How to Handle Missing And Under Replicated Blocks in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps
  6. Core Java Basics Interview Questions
  7. Reflection in Java
  8. Invoking Getters And Setters Using Reflection - Java Program

Saturday, December 23, 2023

Distributed Cache in Hadoop MapReduce

Sometimes when you are running a MapReduce job your Map task and (or) reduce task may require some extra data in terms of a file, a jar or a zipped file in order to do their processing. In such scenarios you can use Distributed cache in Hadoop MapReduce.

What is distributed cache

Distributed cache in Hadoop provides a mechanism to copy files, jars or archives to the nodes where map and reduce tasks are running. Initially the specified file is cached to the HDFS, once the task is about to run Node manager copies the cached file to the local disk of the node where task is running.

Distributed cache in Hadoop

How to use Distributed cache in Hadoop

Earlier DistributedCache class methods were used to add files to distributed cache but that whole class is deprecated now.

You need to use methods in Job class to add the files to distributed cache, methods that can be used are as follows.

  1. public void addCacheArchive(URI uri)- This method is used to add an archive to be localized. Archived file will be unarchived on the node where task is running.
  2. public void addCacheFile(URI uri)- This method is used to add a file to be localized.
  3. public void setCacheArchives(URI[] archives)- Set the given set of archives that need to be localized.
  4. public void setCacheFiles(URI[] files)- Set the given set of files that need to be localized.
  5. public void addFileToClassPath(Path file)- This method adds a file path to the current set of classpath entries. It adds the file to cache as well.
  6. public void addArchiveToClassPath(Path archive)- This method adds an archive path to the current set of classpath entries. It adds the archive to cache as well.

If you are using GenericOptionsParser and ToolRunner in MapReduce code then you can pass the files to be added to the distributed cache through command line too.

Another advantage you get by using GenericOptionsParser to add a file is that you can add file from local file system too. With methods in job class, file has to be in shared file system which is one of the difference between these two options of adding files to a distributed cache in Hadoop.

If you are using Java API (Job class methods) to add file to distributed cache in Hadoop then you have to ensure that the file is copied to HDFS. Then you can use the relevant method (based on file, jar or archive).

For example - If you are adding a text file to distributed cache then you can use the following method call

job.addCacheFile(new URI("/test/input/abc.txt#abc")); 

Here #abc creates a symbolic link to the file and using this name (abc in this case) you can access the cached file in the task nodes.

If you are adding a jar to the class path then you can use the following method call

job.addFileToClassPath(new Path("/test/MyTestLib.jar")); 

If you are adding a .zip archive file to distributed cache then you can use the following method call.

job.addCacheArchive(new URI("/test/input/abc.zip")); 

Distributed cache with MapReduce example

Suppose you have data in the following format -

Item1 345 DEL
Item1 205 KOL
Item3 654 BLR
Item3 453 KOL
Item2 350 MUM
Item1 122 DEL
    
What is needed is to find Total sales per city but in the end file, you want the full name of the City in the following form, not the three letter city code .
    
Item1 Delhi  467
    
In this scenario you can add a properties file to distributed cache which has the following form.
DEL=Delhi
KOL=Kolkata
    

In the reducer you can get this properties file from the distributed cache and replace the city code with full name by referring the properties file. Getting the file from distributed cache and loading it into properties instance will be done in the setup() method of the Reducer.

MapReduce code

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TotalSalesWithDC extends Configured implements Tool{
  // Mapper
  public static class TotalSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0] + " " + stringArr[2]);             
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
  // Reducer
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private Properties cityProp = new Properties();
    private Text cityKey = new Text();
    private IntWritable result = new IntWritable();
    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
      // That's where file stored in distributed cache is used 
      InputStream iStream = new FileInputStream("./city");  
      //Loading properties
      cityProp.load(iStream);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;        
      String[] stringArr = key.toString().split(" ");
      // Getting the city name from prop file
      String city = cityProp.getProperty(stringArr[1].trim());
      cityKey.set(stringArr[0] + "\t"+ city);
      for (IntWritable val : values) {
        sum += val.get();
      }   
      result.set(sum);
      context.write(cityKey, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new TotalSalesWithDC(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception{
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "TotalSales");
    job.setJarByClass(getClass());
    job.setMapperClass(TotalSalesMapper.class); 
    job.setReducerClass(TotalSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // Adding file to distributed cache
    job.addCacheFile(new URI("/test/input/City.properties#city"));
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Distributed Cache in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Shuffle And Sort Phases in Hadoop MapReduce
  2. What Are Counters in Hadoop MapReduce
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Data Compression in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. Sequence File in Hadoop
  4. YARN in Hadoop
  5. HDFS High Availability
  6. Java Collections Interview Questions
  7. Fail-Fast Vs Fail-Safe Iterator in Java
  8. Data Access in Spring Framework

Friday, December 22, 2023

How to Read And Write Parquet File in Hadoop

This post shows how to use Hadoop Java API to read and write Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema and using ParquetWriter and ParquetReader to write and read file respectively it is more convenient to use a framework like Avro to create schema. Then you can use AvroParquetWriter and AvroParquetReader to write and read Parquet files. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

Writing Parquet file – Java program

First thing you’ll need is the schema, since Avro is used so you will have to define Avro schema.

EmpSchema.avsc

{
 "type": "record",
 "name": "empRecords",
 "doc": "Employee Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Dept",
   "type": "string"
  }
 ]
}

Java program

The task needed in the program are as follows-

  1. First thing is to parse the schema.
  2. Then create a generic record using Avro genric API.
  3. Once you have the record write it to file using AvroParquetWriter.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class ParquetFileWrite {

  public static void main(String[] args) {
    // First thing - parse the schema as it will be used
    Schema schema = parseSchema();
    List<GenericData.Record> recordList = getRecords(schema);
    writeToParquet(recordList, schema);
  }
    
  private static Schema parseSchema() {
    Schema.Parser parser = new    Schema.Parser();
    Schema schema = null;
    try {
        // pass path to schema
        schema = parser.parse(ClassLoader.getSystemResourceAsStream(
            "resources/EmpSchema.avsc"));
        
    } catch (IOException e) {
        e.printStackTrace();            
    }
    return schema;
      
  }
    
  private static List<GenericData.Record> getRecords(Schema schema){
    List<GenericData.Record> recordList = new ArrayList<GenericData.Record>();
    GenericData.Record record = new GenericData.Record(schema);
    // Adding 2 records
    record.put("id", 1);
    record.put("Name", "emp1");
    record.put("Dept", "D1");
    recordList.add(record);
    
    record = new GenericData.Record(schema);
    record.put("id", 2);
    record.put("Name", "emp2");
    record.put("Dept", "D2");
    recordList.add(record);
    
    return recordList;
  }
    
    
  private static void writeToParquet(List<GenericData.Record> recordList, Schema schema) {
    // Path to Parquet file in HDFS
    Path path = new Path("/test/EmpRecord.parquet");
    ParquetWriter<GenericData.Record> writer = null;
    // Creating ParquetWriter using builder
    try {
      writer = AvroParquetWriter.
          <GenericData.Record>builder(path)
          .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
          .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
          .withSchema(schema)
          .withConf(new Configuration())
          .withCompressionCodec(CompressionCodecName.SNAPPY)
          .withValidation(false)
          .withDictionaryEncoding(false)
          .build();
      
      for (GenericData.Record record : recordList) {
        writer.write(record);
      }
        
    }catch(IOException e) {
      e.printStackTrace();
    }finally {
      if(writer != null) {
        try {
          writer.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}
    

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.ParquetFileWrite 
18/07/05 19:56:41 INFO compress.CodecPool: Got brand-new compressor [.snappy] 18/07/05 19:56:41 INFO hadoop.InternalParquetRecordWriter:Flushing mem columnStore to file. allocated memory: 3072

Reading Parquet file – Java program

To read the parquet file created above you can use the following program.

import java.io.IOException;

import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetFileRead {

  public static void main(String[] args) {
    readParquetFile();
  }
        
  private static void readParquetFile() {
    ParquetReader<GenericData.Record> reader = null;
    Path path = new Path("/test/EmpRecord.parquet");
    try {
      reader = AvroParquetReader
              .<GenericData.Record>builder(path)
              .withConf(new Configuration())
              .build();
      GenericData.Record record;
      while ((record = reader.read()) != null) {
        System.out.println(record);
      }
    }catch(IOException e) {
      e.printStackTrace();
    }finally {
      if(reader != null) {
        try {
          reader.close();
        } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }
  }
}    

Using parquet-tools jar

You can also download parquet-tools jar and use it to see the content of a Parquet file, file metadata of the Parquet file, Parquet schema etc. As example to see the content of a Parquet file-

$ hadoop jar /parquet-tools-1.10.0.jar cat /test/EmpRecord.parquet 

That's all for this topic How to Read And Write Parquet File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Read And Write SequenceFile in Hadoop
  3. How to Read And Write Avro File in Hadoop
  4. Java Program to Read File in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps

You may also like-

  1. How to Compress MapReduce Job Output in Hadoop
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. Uber Mode in Hadoop
  5. Interface Default Methods in Java 8
  6. Java Stream API Tutorial
  7. How to Read File From The Last Line in Java
  8. Spring NamedParameterJdbcTemplate Insert, Update And Delete Example