Friday, April 5, 2019

Sequence File in Hadoop

Apart from text files Hadoop framework also supports binary files. One of the binary file format in Hadoop is Sequence file which is a flat file consisting of binary key/value pairs.


Advantages of SequenceFile in Hadoop

  1. Since sequence file stores data in the form of serialized key/value pair so it is good for storing images, binary data. Also good for storing complex data as (key, value) pair where complex data is stored as value and ID as key.
  2. Since data is stored in binary form so more compact and takes less space than text files.
  3. Sequence file is the native binary file format supported by Hadoop so extensively used in MapReduce as input/output formats. In fact with in the Hadoop framework internally, the temporary outputs of maps are stored using SequenceFile.
  4. Sequence files in Hadoop support compression at both record and block levels. You can also have uncompressed sequence file.
  5. Sequence files also support splitting. Since sequence file is not compressed as a single file unit but at record or block level, so splitting is supported even if the compression format used is not splittable like gzip, snappy.
  6. One of the use of the sequence file is to use it as a container for storing large number of small files. Since HDFS and MapReduce works well with large files rather than large number of small files so using Sequence file to wrap small files helps in effective processing of small files.

Compression types used in Sequence files

Sequence file in Hadoop offers compression at following levels.

  • No Compression- Uncompressed key/value records.
  • Record compressed key/value records- Only 'values' are compressed when Record compression is used.
  • Block compressed key/value records- Both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. Following property in core-site.xml has to be configured.
    io.seqfile.compress.blocksize– The minimum block size for compression in block compressed SequenceFiles. Default is 1000000 bytes (1 million bytes).

Hadoop SequenceFile Formats

There are 3 different formats for SequenceFiles depending on the CompressionType specified. Header remains same across all the three different formats.

SequenceFile Header

version- 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)

keyClassName- key class

valueClassName- value class

compression- A boolean which specifies if compression is turned on for keys/values in this file.

blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.

compression codec- CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).

metadata- SequenceFile.Metadata for this file.

sync- A sync marker to denote end of the header.

Uncompressed SequenceFile Format

  • Header
  • Record
    • Record length
    • Key length
    • Key
    • Value
  • A sync-marker every few 100 bytes or so.

Record-Compressed SequenceFile Format

  • Header
  • Record
    • Record length
    • Key length
    • Key
    • Compressed Value
  • A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format

Block-Compressed SequenceFile Format

  • Header
  • Record Block
    • Uncompressed number of records in the block
    • Compressed key-lengths block-size
    • Compressed key-lengths block
    • Compressed keys block-size
    • Compressed keys block
    • Compressed value-lengths block-size
    • Compressed value-lengths block
    • Compressed values block-size
    • Compressed values block
  • A sync-marker every block.
Block-Compressed SequenceFile Format

sync-marker in SequenceFile

In these formats if you have noticed there is a sync-marker which is inserted after every few 100 bytes in case no compression is used or Record compression is used. In case of block compression a sync-marker is inserted after every block. Using these sync-marks a reader can seek any random point in sequence file. This helps in splitting a large sequence file for parallel processing by MapReduce.

SequenceFile class (Java API) in Hadoop framework

Main class for working with sequence files in Hadoop is org.apache.hadoop.io.SequenceFile.

SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and SequenceFile.Sorter classes for writing, reading and sorting respectively. There are three SequenceFile Writers based on the SequenceFile.CompressionType used to compress key/value pairs:

  • Writer : Uncompressed records.
  • RecordCompressWriter : Record-compressed files, only compress values.
  • BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed.

Rather than using these Writer classes directly recommended way is to use the static createWriter methods provided by the SequenceFile to chose the preferred format.

The SequenceFile.Reader acts as the bridge and can read any of the above SequenceFile formats.

Refer How to Read And Write SequenceFile in Hadoop to see example code for reading and writing sequence files using Java API and MapReduce.

SequenceFile with MapReduce

When you have to use SequenceFile as input or output format with MapReduce you can use the following classes.

SequenceFileInputFormat- An InputFormat for SequenceFiles. When you want to use data from sequence files as the input to MapReduce.

SequenceFileAsTextInputFormat- Similar to SequenceFileInputFormat except that it converts the input keys and values to their String forms by calling toString() method.

SequenceFileAsBinaryInputFormat- This class is similar to SequenceFileInputFormat, except that it reads keys, values from SequenceFiles in binary (raw) format.

SequenceFileOutputFormat- An OutputFormat that writes SequenceFiles.

SequenceFileAsBinaryOutputFormat- An OutputFormat that writes keys, values to SequenceFiles in binary(raw) format.

Referencehttps://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

That's all for this topic Sequence File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Data Compression in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. YARN in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Input Splits in Hadoop
  2. Data Locality in Hadoop
  3. Replica Placement Policy in Hadoop Framework
  4. What is SafeMode in Hadoop
  5. HDFS Commands Reference List
  6. Java Concurrency Interview Questions
  7. String And Thread-Safety in Java
  8. Different Bean Scopes in Spring

Thursday, April 4, 2019

How to Configure And Use LZO Compression in Hadoop

In this post we’ll see how to configure and use LZO compression in Hadoop.

Since LZO is GPL licensed it doesn't come bundled with Hadoop installation. You will have to install it separately.

By default LZO compression is not splittable but LZO compressed files can be indexed to make it splittable. That needs downloading hadoop-lzo and creating hadoop-lzo jar. So these are the first two steps you need to do in order to use LZO compression in Hadoop.


Installing LZO

Use the following command to install LZO packages in Ubuntu

$ sudo apt-get install liblzo2-2 liblzo2-dev

Downloading hadoop-lzo and creating hadoop-lzo jar

Clone the hadoop-lzo repository.

$ git clone https://github.com/twitter/hadoop-lzo.git 

Please refer this URL too – https://github.com/twitter/hadoop-lzo

To build the cloned code you will need Maven. If you don’t already have Maven you can download and install it using following command.

$ sudo apt install maven 

Using Maven build the cloned code. Go to the directory where you have cloned the hadoop-lzo repository and run the following command.

$ mvn clean install 

If everything is going fine till now then a "target" folder should be created with the hadoop-lzo-0.4.21-SNAPSHOT.jar file inside it.

Rather than downloading and building jar you can also download the jars in rpm package (Preferred if you are not using Ubuntu) from here- https://code.google.com/archive/p/hadoop-gpl-packing/downloads

Configuring LZO compression in Hadoop

Now you need to configure LZO and hadoop-lzo jar in Hadoop environment.

Update the configuration file $HADOOP_INSTALLATION_DIR/etc/hadoop/core-site.xml to register LZO codecs.

    
<property> 
   <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.GzipCodec,
     org.apache.hadoop.io.compress.DefaultCodec,
     org.apache.hadoop.io.compress.BZip2Codec,
     com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec
   </value>
</property>
<property> 
   <name>io.compression.codec.lzo.class</name>
   <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

Add hadoop-lzo jar and native library for LZO compression codec to Hadoop class path. For that add the following to $HADOOP_INSTALLATION_DIR/etc/hadoop/hadoop-env.sh-

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar
 
export JAVA_LIBRARY_PATH=PATH_TO_HADOOP_LZO/target/native/Linux-amd64-64:$HADOOP_INSTALLATION_DIR/lib/native

Copy hadoop-lzo jar to /share/hadoop/mapreduce/lib in your $HADOOP_INSTALLATION_DIR.

 
sudo cp PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/lib

Using LZO compression in Hadoop

Now you can use LZO compression in Hadoop. First let us see a Java program to compress a file using LZO compression. In the Java program file is read from local file system and stored in LZO compressed format in HDFS.

Java program to compress file in LZO format in Hadoop

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class LZOCompress {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  InputStream in = null;
  OutputStream out = null;
  try {
   FileSystem fs = FileSystem.get(conf);
   // Input file - local file system
   in = new BufferedInputStream(new FileInputStream("netjs/Hadoop/Data/log.txt"));
    // Output file path in HDFS
   Path outFile = new Path("/user/out/test.lzo");
   // Verifying if the output file already exists
   if (fs.exists(outFile)) {
    throw new IOException("Output file already exists");
   }
   out = fs.create(outFile);
   
   // LZOP comression
   CompressionCodecFactory factory = new CompressionCodecFactory(conf);
   CompressionCodec codec = factory.getCodecByClassName
     ("com.hadoop.compression.lzo.LzopCodec");
   CompressionOutputStream compressionOutputStream = codec.createOutputStream(out);
   
   try {
    IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
    compressionOutputStream.finish();
    
   } finally {
    IOUtils.closeStream(in);
    IOUtils.closeStream(compressionOutputStream);
   }
   
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.LZOCompress

18/04/27 18:13:30 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/04/27 18:13:30 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library 
[hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
18/04/27 18:13:30 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
18/04/27 18:13:30 INFO compress.CodecPool: Got brand-new compressor [.lzo]

Using hdfs fsck command you can get information about the created compressed file in HDFS.

hdfs fsck /user/out/test.lzo

 Total size: 417954457 B
 Total dirs: 0
 Total files: 1
 Total symlinks:  0
 Total blocks (validated): 4 (avg. block size 104488614 B)
 Minimally replicated blocks: 4 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 0 (0.0 %)
 Mis-replicated blocks:  0 (0.0 %)
 Default replication factor: 1
As you can see compressed file is stored as 4 HDFS blocks.

In order to verify that MapReduce job will create input splits or not, giving this compressed file test.lzo as input to a wordcount MapReduce program. By default LZO compression format is not splittable, so only one split would be created for the MapReduce job even if there are 4 HDFS blocks.

If LZO compressed file is used as input then the input format has to be LzoTextInputFormat in the wordcount MapReduce program, so following change is required in the job configuration of the MapReduce job.

job.setInputFormatClass(LzoTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Running the MapReduce job

hadoop jar /home/netjs/wordcount.jar org.netjs.WordCount /user/out/test.lzo /user/mapout

18/04/27 18:23:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/27 18:23:19 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. 
Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/27 18:23:19 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. 
See Job or Job#setJar(String).
18/04/27 18:23:19 INFO input.FileInputFormat: Total input files to process : 1
18/04/27 18:23:20 INFO mapreduce.JobSubmitter: number of splits:1

You can see from the console message that only single split is created as the file is not indexed.

Running LZO indexer to split

In order to make LZO file splittable you will have to run indexer as a preprocessing step. You can run lzo indexer as a Java program or as a MapReduce job.

As Java program

$ hadoop jar PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.LzoIndexer /user/out/test.lzo

18/04/27 18:31:48 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/04/27 18:31:48 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
18/04/27 18:31:49 INFO lzo.LzoIndexer: [INDEX] LZO Indexing file /user/out/test.lzo, size 0.39 GB...
18/04/27 18:31:49 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
18/04/27 18:31:50 INFO lzo.LzoIndexer: Completed LZO Indexing in 0.73 seconds (549.03 MB/s).  Index size is 32.48 KB.
You can verify that the /user/out/test.lzo.index file is created.

Running indexer as a MapReduce job

You can also run indexer as a MapReduce job to take advantage of parallel processing.
$ hadoop jar PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/out/test.lzo
By running the MapReduce job now, you can verify that 4 input splits are getting created.
hadoop jar /home/netjs/wordcount.jar org.netjs.WordCount /user/out/test.lzo /user/mapout

18/04/27 18:38:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/27 18:38:13 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/27 18:38:13 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
18/04/27 18:38:13 INFO input.FileInputFormat: Total input files to process : 1
18/04/27 18:38:13 INFO mapreduce.JobSubmitter: number of splits:4

That's all for this topic How to Configure And Use LZO Compression in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Data Compression in Hadoop
  2. Compressing File in bzip2 Format in Hadoop - Java Program
  3. Java Program to Read File in HDFS
  4. MapReduce Flow in YARN
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. Fair Scheduler in YARN
  3. Speculative Execution in Hadoop
  4. HDFS High Availability
  5. How to Create Password Protected Zip File in Java
  6. How to Untar a File - Java Program
  7. StringBuffer in Java
  8. How HashMap Internally Works in Java

Data Compression in Hadoop

When we think about Hadoop, we think about very large files which are stored in HDFS and lots of data transfer among nodes in the Hadoop cluster while storing HDFS blocks or while running map reduce tasks. If you could some how reduce the file size that would help you in reducing storage requirements as well as in reducing the data transfer across the network. That’s where data compression in Hadoop helps.


Data compression at various stages in Hadoop

You can compress data in Hadoop MapReduce at various stages.

  1. Compressing input files- You can compress the input file that will reduce storage space in HDFS. If you compress the input files then the files will be decompressed automatically when the file is processed by a MapReduce job. Determining the appropriate coded will be done using the file name extension. As example if file name extension is .snappy hadoop framework will automatically use SnappyCodec to decompress the file.
  2. Compressing the map output- You can compress the intermediate map output. Since map output is written to disk and data from several map outputs is used by a reducer so data from map outputs is transferred to the node where reduce task is running. Thus by compressing intermediate map output you can reduce both storage space and data transfer across network.
  3. Compressing output files- You can also compress the output of a MapReduce job.

Hadoop compression formats

There are many different compression formats available in Hadoop framework. You will have to use one that suits your requirement.

Parameters that you need to look for are-

  1. Time it takes to compress.
  2. Space saving.
  3. Compression format is splittable or not.
Let’s go through the list of available compression formats and see which format provides what characteristics.

Deflate– It is the compression algorithm whose implementation is zlib. Defalte compression algorithm is also used by gzip compression tool. Filename extension is .deflate.

gzip- gzip compression is based on Deflate compression algorithm. Gzip compression is not as fast as LZO or snappy but compresses better so space saving is more.
Gzip is not splittable.
Filename extension is .gz.

bzip2- Using bzip2 for compression will provide higher compression ratio but the compressing and decompressing speed is slow. Bzip2 is splittable, Bzip2Codec implements SplittableCompressionCodec interface which provides the capability to compress / de-compress a stream starting at any arbitrary position.
Filename extension is .bz2.

Snappy– The Snappy compressor from Google provides fast compression and decompression but compression ratio is less.
Snappy is not splittable.
Filename extension is .snappy.

LZO– LZO, just like snappy is optimized for speed so compresses and decompresses faster but compression ratio is less.
LZO is not splittable by default but you can index the lzo files as a pre-processing step to make them splittable.
Filename extension is .lzo.

LZ4– Has fast compression and decompression speed but compression ratio is less. LZ4 is not splittable.
Filename extension is .lz4.

Zstandard– Zstandard is a real-time compression algorithm, providing high compression ratios. It offers a very wide range of compression / speed trade-off.
Zstandard is not splittable.
Filename extension is .zstd.

Codecs in Hadoop

Codec, short form of compressor-decompressor is the implementation of a compression-decompression algorithm. In Hadoop framework there are different codec classes for different compression formats, you will use the codec class for the compression format you are using. The codec classes in Hadoop are as follows-

Deflate – org.apache.hadoop.io.compress.DefaultCodec or org.apache.hadoop.io.compress.DeflateCodec (DeflateCodec is an alias for DefaultCodec). This codec uses zlib compression.

Gzip – org.apache.hadoop.io.compress.GzipCodec

Bzip2 – org.apache.hadoop.io.compress.Bzip2Codec

Snappy – org.apache.hadoop.io.compress.SnappyCodec

LZO – com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec
LZO libraries are GPL licensed and doesn't come with Hadoop release. Hadoop codec for LZO has to be downloaded separately.

LZ4– org.apache.hadoop.io.compress.Lz4Codec

Zstandard – org.apache.hadoop.io.compress.ZstandardCodec

Compression and Input splits

As you must be knowing MapReduce job calculates the number of input splits for the job and as many map tasks are launched as the count of splits. These map tasks process the data referred by input splits in parallel.

If you compress the input file using the compression format that is not splittable, then it won't be possible to read data at an arbitrary point in the stream. So the map tasks won't be able to read split data. In this scenario MapReduce won’t create input splits and whole file will be processed by one mapper which means no advantage of parallel processing and data transfer overhead too.

Let's try to clarify it with an example. If you have a 1 GB file it will be partitioned and stored as 8 data blocks in HDFS (Block size is 128 MB). MapReduce job using this file will also create 8 input splits and launch 8 mappers to process these splits in parallel.

Now, if you compress this 1 GB file using gzip (which is not splittable) then HDFS still stores the file as 8 separate blocks. As it is not possible to read data at an arbitrary point in the compressed gzip stream, MapReduce job won’t calculate input splits and launch only one map task to process all the 8 HDFS blocks. So you lose the advantage of parallel processing and there is no data locality optimization too. Even if map task is launched on the node where data for one of the block is stored data for all the other blocks has to be transferred to the node where map task is launched.

Here note that compression used is Splittable or not is a factor for text files only. If you are using container file format like sequence file or Avro then splitting is supported even if the compressor used is not splittable like Snappy or Gzip.

Compression increases CPU processing

There is a performance cost associated with compressing and decompressing the data. Though you save on storage and I/O activity is less but compression and decompression requires extra CPU cycles.

Though in most of the cases compressing data increases the overall job performance, ensure that you weigh the pros and cons and compare the performance gains with compressed data.

That's all for this topic Data Compression in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Compressing File in snappy Format in Hadoop - Java Program
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. YARN in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. What is SafeMode in Hadoop
  3. NameNode, DataNode And Secondary NameNode in HDFS
  4. HDFS High Availability
  5. Speculative Execution in Hadoop
  6. MapReduce Flow in YARN
  7. How to Create Password Protected Zip File in Java
  8. Compressing And Decompressing File in GZIP Format - Java Program

Wednesday, April 3, 2019

Predefined Mapper And Reducer Classes in Hadoop

Hadoop framework comes prepackaged with many Mapper and Reducer classes. This post explains some of these predefined Mappers and Reducers in Hadoop and shows examples using the predefined Mappers and Reducers classes.

Predefined Mapper classes in Hadoop

  1. ChainMapper- The ChainMapper class allows to use multiple Mapper classes within a single Map task. Using this predefined class you can chain mapper classes where output of one map task becomes input of the second map task. That helps in breaking a complex task with lots of data processing into a chain of smaller tasks.
  2. FieldSelectionMapper- This class implements a mapper class that can be used to perform field selections in a manner similar to Unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the map output keys, and a list of fields that form the map output values. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The map output field list spec is under attribute "mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec". Both keyFieldsSpec and valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all the fields starting from field 3. Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
    By using this predefined class you don't need to write your own mapper with the split logic, you can configure FieldSelectionMapper with the required data to split the record. Jump to FieldSelectionMapper example.
  3. InverseMapper- This predefined Mapper swaps keys and values.
  4. TokenCounterMapper- Tokenize the input values and emit each word with a count of 1. This predefined class can be used where you want to do the sum of values like in a word count MapReduce program. Jump to TokenCounterMapper example.
  5. MultithreadedMapper- This Mapper is a Multithreaded implementation for org.apache.hadoop.mapreduce.Mapper. This predefined mapper is useful if your job is more I/O bound than CPU bound.
  6. ValueAggregatorMapper- This class implements the generic mapper of Aggregate.
  7. WrappedMapper- This predefined mapper wraps a given one to allow custom Mapper.Context implementations.
  8. RegexMapper- A Mapper that extracts text matching a regular expression.

Predefined Reducer classes in Hadoop

  1. ChainReducer- The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.
  2. IntSumReducer- This predefined Reducer is used to sum the int values grouped with a key. You can use this predefined reducer where you want to get the sum of values grouped by keys. Jump to IntSumReducer example.
  3. LongSumReducer- This predefined Reducer is used to sum the long values grouped with a key.
  4. FieldSelectionReducer- This class implements a reducer class that can be used to perform field selections in a manner similar to unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the reduce output keys, and a list of fields that form the reduce output values. The fields are the union of those from the key and those from the value. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The reduce output field list spec is under attribute "mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... As example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
  5. ValueAggregatorReducer- This class implements the generic reducer of Aggregate.
  6. WrappedReducer- A Reducer which wraps a given one to allow for custom Reducer.Context implementations.

Predefined Mapper and Reducer - ChainMapper and ChainReducer example

Predefined Mapper and Reducer - FieldSelectionMapper example

If you have to get few fields of the input file you can use FieldSelectionMapper for the same. Let’s say you have data in following format for item, zone and total sales.

Item1 zone-1 234
Item1 zone-2 456
Item3 zone-2 123 

And you need to find total sales for each item which means you’ll have to extract field 0 and field 2 in your Mapper.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SalesCalc extends Configured implements Tool {    
    
  // Reduce function
  public static class TotalSalesReducer extends Reducer<Text, Text, Text, IntWritable>{
    public void reduce(Text key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (Text val : values) {
        sum += Integer.parseInt(val.toString());
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new SalesCalc(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    // setting the separator
    conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
    // Configure the fields that are to be extracted
    conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:2");
    Job job = Job.getInstance(conf, "Sales");
    job.setJarByClass(getClass());
    // setting predefined FieldSelectionMapper
    job.setMapperClass(FieldSelectionMapper.class);    
 
    job.setReducerClass(TotalSalesReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
} 

Predefined Mapper and Reducer - TokenCounterMapper and IntSumReducer example

You can write a word count MapReduce program using predefined TokenCounterMapper and IntSumReducer. In that case you don’t need to write any logic just configure these classes and run your MapReduce job.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class WordCount extends Configured implements Tool{
  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new SimpleWordCount(), args);
    System.exit(exitFlag);
  }
 
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WordCount");
    job.setJarByClass(getClass());
    // Setting pre-defing mapper and reducer
    job.setMapperClass(TokenCounterMapper.class);    
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Predefined Mapper And Reducer Classes in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. MapReduce Flow in YARN
  3. Speculative Execution in Hadoop
  4. How to Compress MapReduce Job Output in Hadoop
  5. Replica Placement Policy in Hadoop Framework

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Read And Write Parquet File in Hadoop
  4. Fair Scheduler in YARN
  5. HDFS High Availability
  6. Find Largest And Smallest Number in The Given Array - Java Program
  7. How HashSet Works Internally in Java
  8. What is Dependency Injection in Spring

Tuesday, April 2, 2019

Counters in Hadoop MapReduce Job

If you run a MapReduce job you would have seen a lot of counters displayed on the console after the MapReduce job is finished (You can also check the counters using UI while the job is running). These counters in Hadoop MapReduce give a lot of statistical information about the executed job. Apart from giving you the information about the tasks these counters also help you in diagnosing the problems in MapReduce job, improving the MapReduce performance.

For example you get information about the spilled records and memory usage which gives you an indicator about the performance of your MapReduce job.


Types of counters in Hadoop

There are 2 types of Counters in Hadoop MapReduce.

  1. Built-In Counters
  2. User-Defined Counters or Custom counters

Built-In Counters in MapReduce

Hadoop Framework has some built-in counters which give information pertaining to-

  1. File system like bytes read, bytes written.
  2. MapReduce job like launched map and reduce tasks
  3. MapReduce task like map input records, combiner output records.

These built-in counters are grouped based on the type of information they provide and represented by Enum classes in Hadoop framework. Following is the list of the Counter groups and the corresponding Enum class names.

  1. File System Counters – org.apache.hadoop.mapreduce.FileSystemCounter
  2. Job Counters– org.apache.hadoop.mapreduce.JobCounter
  3. Map-Reduce Framework Counters– org.apache.hadoop.mapreduce.TaskCounter
  4. File Input Format Counters– org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
  5. File Output Format Counters– org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter

File System Counters in MapReduce

File system counters will be repeated for each type of file system, prefixed with the file system for each entry. As example FILE: Number of bytes read, HDFS: Number of bytes read.

  • Number of bytes read- Displays the number of bytes read by the file system for both Map and Reduce tasks.
  • Number of bytes written- Displays the number of bytes read by the file system for both Map and Reduce tasks.
  • Number of read operations- Displays the number of read operations by both Map and Reduce tasks.
  • Number of large read operations- Displays the number of large read operations (example: traversing the directory tree) for both Map and Reduce tasks.
  • Number of write operations- Displays the number of write operations by both Map and Reduce tasks.

Job Counters in MapReduce

These counters give information about the whole job not at the task level.

  • Launched map tasks- Displays the number of launched map tasks.
  • Launched reduce tasks- Displays the number of launched reduce tasks.
  • Launched uber tasks- Displays the number of tasks launched as uber tasks.
  • Data-local map tasks- Displays the number of mappers run on the same node where the input data they have to process resides.
  • Rack-local map taks- Displays the number of mappers run on the node on the same rack where the input data they have to process resides.
  • Map in uber tasks- Displays the number of maps run as uber tasks.
  • Reduce in uber tasks- Displays the number of reducers run as uber tasks.
  • Total time spent by all map tasks - Total time in miliseconds running all the launched map tasks.
  • Total time spent by all reduce tasks- Total time in miliseconds running all the launched reducde tasks.
  • Failed map tasks- Displays the number of map tasks that failed.
  • Failed reduce tasks- Displays the number of reduce tasks that failed.
  • Failed uber tasks- Displays the number of uber tasks that failed.
  • Killed map tasks- Displays the number of killed map tasks.
  • Killed reduce tasks- Displays the number of killed reduce tasks.

Map-Reduce Framework Counters

These counters collect information about the running task.

  • Map input records– Displays the number of records processed by all the maps in the MR job.
  • Map output records– Displays the number of output records produced by all the maps in the MR job.
  • Map skipped records– Displays the number of records skipped by all the maps.
  • Map output bytes– Displays the number of bytes produced by all the maps in the MR job.
  • Map output materialized bytes– Displays the Map output bytes written to the disk.
  • Reduce input groups– Displays the number of key groups processed by all the Reducers.
  • Reduce shuffle bytes– Displays the number of bytes of Map output copied to Reducers in shuffle process.
  • Reduce input records– Displays the number of input records processed by all the Reducers.
  • Reduce output records– Displays the number of output records produced by all the Reducers.
  • Reduce skipped records– Displays the number of records skipped by Reducer.
  • Input split bytes– Displays the data about input split objects in bytes.
  • Combine input records– Displays the number of input records processed by combiner.
  • Combine output records– Displays the number of output records produced by combiner.
  • Spilled Records– Displays the number of records spilled to the disk by all the map and reduce tasks.
  • Shuffled Maps– Displays the number of map output files transferred during shuffle process to nodes where reducers are running.
  • Failed Shuffles– Displays the number of map output files failed during shuffle.
  • Merged Map outputs– Displays the number of map outputs merged after map output is transferred.
  • GC time elapsed– Displays the garbage collection time in mili seconds.
  • CPU time spent– Displays the CPU processing time spent in mili seconds.
  • Physical memory snapshot– Displays the total physical memory used in bytes.
  • Virtual memory snapshot– Displays the total virtual memory used in bytes.
  • Total committed heap usage– Displays the total amount of heap memory available in bytes.

File Input Format Counters in MapReduce

  • Bytes Read– Displays the bytes read by Map tasks using the specified Input format.

File Output Format Counters in MapReduce

  • Bytes Written– Displays the bytes written by Map and reduce tasks using the specified Output format.

User defined counters in MapReduce

You can also create user defined counters in Hadoop using Java enum. The name of the Enum becomes the counter group's name where as each field in enum is a counter name.

You can increment these counters in the mapper or reducer based on some logic that will help you with debugging. User defined counters are also aggregated across all the mappers or reducers by the Hadoop framework and displayed as a single unit.

User defined counter example

Suppose you have data is following format and in some records sales data is missing.

Item1 345 zone-1
Item1  zone-2
Item3 654 zone-2
Item2 231 zone-3

Now you want to determine the number of records where sales data is missing to get a picture how much skewness is happening in your analysis because of missing fields.

MapReduce code

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SalesCalc extends Configured implements Tool {    
  enum Sales {
    SALES_DATA_MISSING
  }
  // Mapper
  public static class SalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    IntWritable sales = new IntWritable();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] salesArr = value.toString().split("\t");
      item.set(salesArr[0]);
                
      if(salesArr[1] != null && !salesArr[1].trim().equals("")) {
        sales.set(Integer.parseInt(salesArr[1]));
      }else {
        // incrementing counter
        context.getCounter(Sales.SALES_DATA_MISSING).increment(1);
        sales.set(0);
      }            
        context.write(item, sales);
    }
  }
    
  // Reducer
  public static class TotalSalesReducer extends Reducer<Text, Text, Text, IntWritable>{    
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new SalesCalc(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "SalesCalc");
    job.setJarByClass(getClass());
    job.setMapperClass(SalesMapper.class);    
    job.setReducerClass(TotalSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

In the counters displayed for the MapReduce job you can see the counter defined for getting the number of fields where sales numbers are missing.

org.netjs.SalesCalc$Sales
        SALES_DATA_MISSING=4

That's all for this topic Counters in Hadoop MapReduce Job. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. ToolRunner and GenericOptionsParser in Hadoop
  2. Chaining MapReduce Job in Hadoop
  3. Predefined Mapper And Reducer Classes in Hadoop
  4. Input Splits in Hadoop
  5. Data Locality in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. Sequence File in Hadoop
  4. Parquet File Format in Hadoop
  5. Apache Avro Format in Hadoop
  6. How to Create Ubuntu Bootable USB
  7. Java Exception Handling Interview Questions
  8. Ternary Operator in Java