Thursday, November 30, 2023

OutputCommitter in Hadoop

In a distributed environment like Hadoop framework it is important to track the success or failure of all the tasks running on different nodes. The whole job should be marked as successfully finished or aborted based on the success (or failure of any task) of all the tasks. To ensure that Hadoop framework uses commit protocol and the class used for the purpose is known as OutputCommitter in Hadoop.

OutputCommitter class in Hadoop is an abstract class and its concrete implementation is the FileOutputCommitter class.

As per Hadoop docs-

FileOutputCommitter- An OutputCommitter that commits files specified in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.

Tasks performed by OutputCommitter

The Hadoop MapReduce framework relies on the OutputCommitter of the job to do the following tasks-

  1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job.
  2. Job cleanup after the job completion. For example, remove the temporary output directory after the job completion.
  3. Setup the task temporary output.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
  5. Commit of the task output.
  6. Discard the task commit.

Methods in OutputCommitter class in Hadoop (For MR 2 API)

  • abortJob(JobContext jobContext, org.apache.hadoop.mapreduce.JobStatus.State state)- For aborting an unsuccessful job's output. Note that this is invoked for jobs with final runstate as JobStatus.FAILED or JobStatus.KILLED. This is called from the application master process for the entire job. This may be called multiple times.
  • abortTask(TaskAttemptContext taskContext)- Discard the task output. This is called from a task's process to clean up a single task's output that can not yet been committed. This may be called multiple times for the same task, but for different task attempts.
  • commitJob(JobContext jobContext)- For committing job's output after successful job completion. That is when job clean up also happens. Note that this is invoked for jobs with final runstate as SUCCESSFUL. This is called from the application master process for the entire job. This is guaranteed to only be called once.
  • commitTask(TaskAttemptContext taskContext)- To promote the task's temporary output to final output location. If needsTaskCommit(TaskAttemptContext) returns true and this task is the task that the AM determines finished first, this method is called to commit an individual task's output. This is to mark that tasks output as complete.
  • needsTaskCommit(TaskAttemptContext taskContext)- Check whether task needs a commit. This is called from each individual task's process that will output to HDFS, and it is called just for that task.
  • setupJob(JobContext jobContext)- For the framework to setup the job output during initialization. This is called from the application master process for the entire job. This will be called multiple times, once per job attempt.
  • setupTask(TaskAttemptContext taskContext)- Sets up output for the task. This is called from each individual task's process that will output to HDFS, and it is called just for that task. This may be called multiple times for the same task, but for different task attempts.

Reference- https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/OutputCommitter.html

That's all for this topic OutputCommitter in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Distributed Cache in Hadoop MapReduce
  2. Using Combiner to Improve MapReduce Performance in Hadoop
  3. How MapReduce Works in Hadoop
  4. Chaining MapReduce Job in Hadoop
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. NameNode, DataNode And Secondary NameNode in HDFS
  2. HDFS Federation in Hadoop Framework
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. Using Avro File With Hadoop MapReduce
  5. Fair Scheduler in YARN
  6. ArrayBlockingQueue in Java Concurrency
  7. String And Thread-Safety in Java
  8. Converting int to String - Java Program

Wednesday, November 29, 2023

ToolRunner and GenericOptionsParser in Hadoop

GenericOptionsParser is a utility class in Hadoop which resides in org.apache.hadoop.util package. GenericOptionsParser class helps in setting options through command line. It parses the command line arguments and sets them on a configuration object that can then be used in the application.

How GenericOptionsParser class is used

Rather than using GenericOptionsParser class directly generally you will implement Tool interface in your MapReduce class and use ToolRunner.run method to run your application which will use GenericOptionsParser internally to parse the command line arguments.

How GenericOptionsParser class helps

If you set configuration arguments with in your code then you are hard coding those arguments. Any change in any argument will require code change and recreation of jar.

Passing argument in command line gives the flexibility to add, reduce or change arguments without requiring any change in the code.

Generic Options

You can specify command line arguments using the following generic options.

  1. -archives <comma separated list of archives>- Specify comma separated archives to be unarchived on the compute machines. Applies only to job.
  2. -conf <configuration file>- Specify an application configuration file.
  3. -D <property>=<value>- Use value for given property.
  4. -files <comma separated list of files>- Specify comma separated files to be copied to the map reduce cluster. Applies only to job.
  5. -fs <file:///> or <hdfs://namenode:port>- Specify default filesystem URL to use. Overrides ‘fs.defaultFS’ property from configurations.
  6. -jt <local> or <resourcemanager:port>- Specify a ResourceManager. Applies only to job.
  7. -libjars <comma seperated list of jars>- Specify comma separated jar files to include in the classpath. Applies only to job.

GenericOptionParser with ToolRunner example

In the post Using Avro File With Hadoop MapReduce there is an example of using Avro file with MapReduce. In that example Avro schema is inlined with in the code.

Here the same example is written by passing that schema file (saleschema.avsc) as a command line argument.

saleschema.avsc

{
  "type": "record",    
  "name": "SalesRecord",
  "doc" : "Sales Records",
  "fields": 
 [
  {"name":"item", "type": "string"},
  {"name":"totalsales", "type": "int"}
 ]
}

MapReduce code

import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{

  //Mapper
  public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
      AvroValue<GenericRecord>>{
    private Text item = new Text();
    private GenericRecord record;
     @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      Schema SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
      record = new GenericData.Record(SALES_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");        
      item.set(salesArr[0]);
      record.put("item", salesArr[0]);
      record.put("totalsales", Integer.parseInt(salesArr[1]));
      context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
    }
  }
  
  // Reducer
  public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
             AvroKey<GenericRecord>, NullWritable>{    
    Schema SALES_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
    }
    public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values,
        Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue<GenericRecord> value : values) {
        GenericRecord    record = value.datum();
        sum += (Integer)record.get("totalsales");
      }
      GenericRecord record = new GenericData.Record(SALES_SCHEMA);
      record.put("item", key.datum());
      record.put("totalsales", sum);
      context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
    }
  }
  
  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroMR(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "AvroMR");
    job.setJarByClass(getClass());
    job.setMapperClass(ItemMapper.class);    
    job.setReducerClass(SalesReducer.class);
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    // Schema file needed here also
    Schema SALES_SCHEMA = new Schema.Parser().parse(
        new File("/home/netjs/saleschema.avsc"));
    AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
    AvroJob.setOutputKeySchema(job,    SALES_SCHEMA);    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}
Running this Hadoop MapReduce program with schema file passed as command line argument.
hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR -files /home/netjs/saleschema.avsc /test/input/sales.txt /test/out/sale

Here location of the schema file in the local file system is passed as a command line argument.

You can see the content of Avro output file using the avro-tools jar

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sale/part-r-00000.avro

{"item":"Item1","totalsales":1158}
{"item":"Item2","totalsales":642}
{"item":"Item3","totalsales":1507}

That's all for this topic ToolRunner and GenericOptions in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. What is SafeMode in Hadoop
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Write a Map Only Job in Hadoop MapReduce
  5. How to Check Hadoop MapReduce Logs

You may also like-

  1. Input Splits in Hadoop
  2. Capacity Scheduler in YARN
  3. How to Read And Write SequenceFile in Hadoop
  4. Parquet File Format in Hadoop
  5. Java Stream API Interview Questions
  6. Generating Getters And Setters Using Reflection - Java Program
  7. BigDecimal in Java
  8. Zipping Files in Java

Tuesday, November 28, 2023

How to Handle Missing And Under Replicated Blocks in HDFS

In this post we’ll see how to get information about missing or corrupt blocks in HDFS and how to fix it. We'll also see how to fix under replicated blocks in HDFS.

Get information about corrupt or missing HDFS blocks

For getting information about corrupt or missing blocks in HDFS you can use following HDFS command which prints out list of missing blocks and files they belong to.

hdfs fsck -list-corruptfileblocks

Fixing corrupt or missing HDFS blocks

Using that information you can decide how important the file is where you have missing blocks. Since the easiest way is to delete the file and copy it to HDFS again. If you are ok with deleting the files that have corrupt blocks you can use the following command.

hdfs fsck / -delete

This command deletes corrupted files.

If you still want to have a shot at fixing the blocks that are corrupted using the file names which you got from running the hdfs fsck -list-corruptfileblocks command you can use the following command.

hdfs fsck <path to file> -locations -blocks -files

This command prints out locations for every block. Using that information you can go the data nodes where block is stored. You can verify if there is any network or hardware related error or any file system problem and fixing that will make the block healthy again or not.

Fixing under replicated blocks problem in Hadoop

If you have under replicated blocks in HDFS for files then you can use hdfs fsck / command to get that information.

Then you can use the following script where hdfs dfs -setrep <replication number> command is used to set required replication factor for the files.

 
$ hdfs fsck / | grep 'Under replicated' | awk -F':' '{print $1}' >> /tmp/files

$ for problemfile in `cat /tmp/files`; do echo "Setting replication for $problemfile"; hdfs dfs -setrep 3 $problemfile; done 

Actually when you run hdfs fsck / command the output is in the following form for the under replicated blocks -

File name: Under replicated <block>.
   Target Replicas is 3 but found 1 live replica(s), 0 decommissioned replica(s), 0 decommissioning replica(s).
From this output using awk command you take the file name where word “Under replicated” is found and write them in a temp file. Then you set replication factor to 3 ( in this case) for those files.

That's all for this topic How to Handle Missing And Under Replicated Blocks in HDFS. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Replica Placement Policy in Hadoop Framework
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. File Read in HDFS - Hadoop Framework Internal Steps

You may also like-

  1. Speculative Execution in Hadoop
  2. YARN in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. How to Configure And Use LZO Compression in Hadoop
  5. How HashMap Internally Works in Java
  6. Stream API in Java 8
  7. How to Run a Shell Script From Java Program
  8. Lazy Initializing Spring Beans

Monday, November 27, 2023

How to Configure And Use LZO Compression in Hadoop

In this post we’ll see how to configure and use LZO compression in Hadoop.

Since LZO is GPL licensed it doesn't come bundled with Hadoop installation. You will have to install it separately.

By default LZO compression is not splittable but LZO compressed files can be indexed to make it splittable. That needs downloading hadoop-lzo and creating hadoop-lzo jar. So these are the first two steps you need to do in order to use LZO compression in Hadoop.


Installing LZO

Use the following command to install LZO packages in Ubuntu

$ sudo apt-get install liblzo2-2 liblzo2-dev

Downloading hadoop-lzo and creating hadoop-lzo jar

Clone the hadoop-lzo repository.

$ git clone https://github.com/twitter/hadoop-lzo.git 

Please refer this URL too – https://github.com/twitter/hadoop-lzo

To build the cloned code you will need Maven. If you don’t already have Maven you can download and install it using following command.

$ sudo apt install maven 

Using Maven build the cloned code. Go to the directory where you have cloned the hadoop-lzo repository and run the following command.

$ mvn clean install 

If everything is going fine till now then a "target" folder should be created with the hadoop-lzo-0.4.21-SNAPSHOT.jar file inside it.

Rather than downloading and building jar you can also download the jars in rpm package (Preferred if you are not using Ubuntu) from here- https://code.google.com/archive/p/hadoop-gpl-packing/downloads

Configuring LZO compression in Hadoop

Now you need to configure LZO and hadoop-lzo jar in Hadoop environment.

Update the configuration file $HADOOP_INSTALLATION_DIR/etc/hadoop/core-site.xml to register LZO codecs.

    
<property> 
   <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.GzipCodec,
     org.apache.hadoop.io.compress.DefaultCodec,
     org.apache.hadoop.io.compress.BZip2Codec,
     com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec
   </value>
</property>
<property> 
   <name>io.compression.codec.lzo.class</name>
   <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

Add hadoop-lzo jar and native library for LZO compression codec to Hadoop class path. For that add the following to $HADOOP_INSTALLATION_DIR/etc/hadoop/hadoop-env.sh-

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar
 
export JAVA_LIBRARY_PATH=PATH_TO_HADOOP_LZO/target/native/Linux-amd64-64:$HADOOP_INSTALLATION_DIR/lib/native

Copy hadoop-lzo jar to /share/hadoop/mapreduce/lib in your $HADOOP_INSTALLATION_DIR.

 
sudo cp PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/lib

Using LZO compression in Hadoop

Now you can use LZO compression in Hadoop. First let us see a Java program to compress a file using LZO compression. In the Java program file is read from local file system and stored in LZO compressed format in HDFS.

Java program to compress file in LZO format in Hadoop

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class LZOCompress {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  InputStream in = null;
  OutputStream out = null;
  try {
   FileSystem fs = FileSystem.get(conf);
   // Input file - local file system
   in = new BufferedInputStream(new FileInputStream("netjs/Hadoop/Data/log.txt"));
    // Output file path in HDFS
   Path outFile = new Path("/user/out/test.lzo");
   // Verifying if the output file already exists
   if (fs.exists(outFile)) {
    throw new IOException("Output file already exists");
   }
   out = fs.create(outFile);
   
   // LZOP comression
   CompressionCodecFactory factory = new CompressionCodecFactory(conf);
   CompressionCodec codec = factory.getCodecByClassName
     ("com.hadoop.compression.lzo.LzopCodec");
   CompressionOutputStream compressionOutputStream = codec.createOutputStream(out);
   
   try {
    IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
    compressionOutputStream.finish();
    
   } finally {
    IOUtils.closeStream(in);
    IOUtils.closeStream(compressionOutputStream);
   }
   
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.LZOCompress

18/04/27 18:13:30 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/04/27 18:13:30 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library 
[hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
18/04/27 18:13:30 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
18/04/27 18:13:30 INFO compress.CodecPool: Got brand-new compressor [.lzo]

Using hdfs fsck command you can get information about the created compressed file in HDFS.

hdfs fsck /user/out/test.lzo

 Total size: 417954457 B
 Total dirs: 0
 Total files: 1
 Total symlinks:  0
 Total blocks (validated): 4 (avg. block size 104488614 B)
 Minimally replicated blocks: 4 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 0 (0.0 %)
 Mis-replicated blocks:  0 (0.0 %)
 Default replication factor: 1
As you can see compressed file is stored as 4 HDFS blocks.

In order to verify that MapReduce job will create input splits or not, giving this compressed file test.lzo as input to a wordcount MapReduce program. By default LZO compression format is not splittable, so only one split would be created for the MapReduce job even if there are 4 HDFS blocks.

If LZO compressed file is used as input then the input format has to be LzoTextInputFormat in the wordcount MapReduce program, so following change is required in the job configuration of the MapReduce job.

job.setInputFormatClass(LzoTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Running the MapReduce job

hadoop jar /home/netjs/wordcount.jar org.netjs.WordCount /user/out/test.lzo /user/mapout

18/04/27 18:23:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/27 18:23:19 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. 
Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/27 18:23:19 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. 
See Job or Job#setJar(String).
18/04/27 18:23:19 INFO input.FileInputFormat: Total input files to process : 1
18/04/27 18:23:20 INFO mapreduce.JobSubmitter: number of splits:1

You can see from the console message that only single split is created as the file is not indexed.

Running LZO indexer to split

In order to make LZO file splittable you will have to run indexer as a preprocessing step. You can run lzo indexer as a Java program or as a MapReduce job.

As Java program

$ hadoop jar PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.LzoIndexer /user/out/test.lzo

18/04/27 18:31:48 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/04/27 18:31:48 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
18/04/27 18:31:49 INFO lzo.LzoIndexer: [INDEX] LZO Indexing file /user/out/test.lzo, size 0.39 GB...
18/04/27 18:31:49 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
18/04/27 18:31:50 INFO lzo.LzoIndexer: Completed LZO Indexing in 0.73 seconds (549.03 MB/s).  Index size is 32.48 KB.
You can verify that the /user/out/test.lzo.index file is created.

Running indexer as a MapReduce job

You can also run indexer as a MapReduce job to take advantage of parallel processing.
$ hadoop jar PATH_TO_HADOOP_LZO/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/out/test.lzo
By running the MapReduce job now, you can verify that 4 input splits are getting created.
hadoop jar /home/netjs/wordcount.jar org.netjs.WordCount /user/out/test.lzo /user/mapout

18/04/27 18:38:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/27 18:38:13 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/27 18:38:13 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
18/04/27 18:38:13 INFO input.FileInputFormat: Total input files to process : 1
18/04/27 18:38:13 INFO mapreduce.JobSubmitter: number of splits:4

That's all for this topic How to Configure And Use LZO Compression in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Data Compression in Hadoop
  2. Compressing File in bzip2 Format in Hadoop - Java Program
  3. Java Program to Read File in HDFS
  4. MapReduce Flow in YARN
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. Fair Scheduler in YARN
  3. Speculative Execution in Hadoop
  4. HDFS High Availability
  5. How to Create Password Protected Zip File in Java
  6. How to Untar a File - Java Program
  7. StringBuffer in Java
  8. How HashMap Internally Works in Java

Sunday, November 26, 2023

Sequence File in Hadoop

Apart from text files Hadoop framework also supports binary files. One of the binary file format in Hadoop is Sequence file which is a flat file consisting of binary key/value pairs.


Advantages of SequenceFile in Hadoop

  1. Since sequence file stores data in the form of serialized key/value pair so it is good for storing images, binary data. Also good for storing complex data as (key, value) pair where complex data is stored as value and ID as key.
  2. Since data is stored in binary form so more compact and takes less space than text files.
  3. Sequence file is the native binary file format supported by Hadoop so extensively used in MapReduce as input/output formats. In fact with in the Hadoop framework internally, the temporary outputs of maps are stored using SequenceFile.
  4. Sequence files in Hadoop support compression at both record and block levels. You can also have uncompressed sequence file.
  5. Sequence files also support splitting. Since sequence file is not compressed as a single file unit but at record or block level, so splitting is supported even if the compression format used is not splittable like gzip, snappy.
  6. One of the use of the sequence file is to use it as a container for storing large number of small files. Since HDFS and MapReduce works well with large files rather than large number of small files so using Sequence file to wrap small files helps in effective processing of small files.

Compression types used in Sequence files

Sequence file in Hadoop offers compression at following levels.

  • No Compression- Uncompressed key/value records.
  • Record compressed key/value records- Only 'values' are compressed when Record compression is used.
  • Block compressed key/value records- Both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. Following property in core-site.xml has to be configured.
    io.seqfile.compress.blocksize– The minimum block size for compression in block compressed SequenceFiles. Default is 1000000 bytes (1 million bytes).

Hadoop SequenceFile Formats

There are 3 different formats for SequenceFiles depending on the CompressionType specified. Header remains same across all the three different formats.

SequenceFile Header

version- 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)

keyClassName- key class

valueClassName- value class

compression- A boolean which specifies if compression is turned on for keys/values in this file.

blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.

compression codec- CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).

metadata- SequenceFile.Metadata for this file.

sync- A sync marker to denote end of the header.

Uncompressed SequenceFile Format

  • Header
  • Record
    • Record length
    • Key length
    • Key
    • Value
  • A sync-marker every few 100 bytes or so.

Record-Compressed SequenceFile Format

  • Header
  • Record
    • Record length
    • Key length
    • Key
    • Compressed Value
  • A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format

Block-Compressed SequenceFile Format

  • Header
  • Record Block
    • Uncompressed number of records in the block
    • Compressed key-lengths block-size
    • Compressed key-lengths block
    • Compressed keys block-size
    • Compressed keys block
    • Compressed value-lengths block-size
    • Compressed value-lengths block
    • Compressed values block-size
    • Compressed values block
  • A sync-marker every block.
Block-Compressed SequenceFile Format

sync-marker in SequenceFile

In these formats if you have noticed there is a sync-marker which is inserted after every few 100 bytes in case no compression is used or Record compression is used. In case of block compression a sync-marker is inserted after every block. Using these sync-marks a reader can seek any random point in sequence file. This helps in splitting a large sequence file for parallel processing by MapReduce.

SequenceFile class (Java API) in Hadoop framework

Main class for working with sequence files in Hadoop is org.apache.hadoop.io.SequenceFile.

SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and SequenceFile.Sorter classes for writing, reading and sorting respectively. There are three SequenceFile Writers based on the SequenceFile.CompressionType used to compress key/value pairs:

  • Writer : Uncompressed records.
  • RecordCompressWriter : Record-compressed files, only compress values.
  • BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed.

Rather than using these Writer classes directly recommended way is to use the static createWriter methods provided by the SequenceFile to chose the preferred format.

The SequenceFile.Reader acts as the bridge and can read any of the above SequenceFile formats.

Refer How to Read And Write SequenceFile in Hadoop to see example code for reading and writing sequence files using Java API and MapReduce.

SequenceFile with MapReduce

When you have to use SequenceFile as input or output format with MapReduce you can use the following classes.

SequenceFileInputFormat- An InputFormat for SequenceFiles. When you want to use data from sequence files as the input to MapReduce.

SequenceFileAsTextInputFormat- Similar to SequenceFileInputFormat except that it converts the input keys and values to their String forms by calling toString() method.

SequenceFileAsBinaryInputFormat- This class is similar to SequenceFileInputFormat, except that it reads keys, values from SequenceFiles in binary (raw) format.

SequenceFileOutputFormat- An OutputFormat that writes SequenceFiles.

SequenceFileAsBinaryOutputFormat- An OutputFormat that writes keys, values to SequenceFiles in binary(raw) format.

Referencehttps://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

That's all for this topic Sequence File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Data Compression in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. YARN in Hadoop
  5. Uber Mode in Hadoop

You may also like-

  1. Input Splits in Hadoop
  2. Data Locality in Hadoop
  3. Replica Placement Policy in Hadoop Framework
  4. What is SafeMode in Hadoop
  5. HDFS Commands Reference List
  6. Java Concurrency Interview Questions
  7. String And Thread-Safety in Java
  8. Different Bean Scopes in Spring

Saturday, November 25, 2023

How to Write a Map Only Job in Hadoop MapReduce

In a MapReduce job in Hadoop you generally write both map function and reduce function. Map function to generate (key, value) pairs and reduce function to aggregate those (key, value) pairs but you may opt to have only the map function in your MapReduce job and skip the reducer part. That is known as a Mapper only job in Hadoop MapReduce.

Mapper only job in Hadoop

You may have a scenario where you just want to generate (key, value) pair in that case you can write a job with only map function. For example if you want to convert file to a binary file format like SequenceFile or to a columnar file format like Parquet.

Note that, generally in a MapReduce job output of Mappers are written to local disk rather than in HDFS. In case of Mapper only job map output is written to HDFS which is one of the difference between a MapReduce job and a Mapper only job in Hadoop.

Writing Mapper only job

In order to write a mapper only job you need to set number of reducers as zero. You can do by adding job.setNumReduceTasks(0); in your driver class.

As example

@Override
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 Job job = Job.getInstance(conf, "TestClass");
 job.setJarByClass(getClass());
 job.setMapperClass(TestMapper.class);
 // Setting reducer to zero
 job.setNumReduceTasks(0);
 .....
 .....

}

Another way to have a Mapper only job is to pass the configuration parameter in the command line. Parameter used is mapreduce.job.reduces note that before Hadoop 2 parameter was mapred.reduce.tasks which is deprecated now.

As example-

hadoop jar /path/to/jar ClasstoRun -D mapreduce.job.reduces=0 /input/path /output/path

Mapper only job runs faster

The output of map job is partitioned and sorted on keys. Then it is sent across the network to the nodes where reducer is running. This whole shuffle phase can be avoided by having a Mapper only job in Hadoop making it faster.

That's all for this topic How to Write a Map Only Job in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Word Count MapReduce Program in Hadoop
  2. How to Compress Intermediate Map Output in Hadoop
  3. Input Splits in Hadoop
  4. Uber Mode in Hadoop
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. HDFS Commands Reference List
  2. How to Handle Missing And Under Replicated Blocks in HDFS
  3. What is SafeMode in Hadoop
  4. HDFS High Availability
  5. Fair Scheduler in YARN
  6. Difference Between Abstract Class And Interface in Java
  7. Writing File in Java
  8. Java Lambda Expressions Interview Questions

Friday, November 24, 2023

Input Splits in Hadoop

When a MapReduce job is run to process input data one of the thing Hadoop framework does is to divide the input data into smaller chunks, these chunks are referred as input splits in Hadoop.

For each input split Hadoop creates one map task to process records in that input split. That is how parallelism is achieved in Hadoop framework. For example if a MapReduce job calculates that input data is divided into 8 input splits, then 8 mappers will be created to process those input splits.

How input splits are calculated

Input splits are created by the InputFormat class used in the MapReduce job. Actually the implementation for calculating splits for the input file is in the FileInputFormat class which is the base class for all the implementations of InputFormat.

InputFormat Class

public abstract class InputFormat<K, V> {
  public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

FileInputFormat method for computing splits

If you want to see how split size is calculated you can have a look at computeSplitSize method in org.apache.hadoop.mapreduce.lib.input.FileInputFormat class.

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}
Here values of minSize and maxSize are referred from the following configuration parameters-
  • mapreduce.input.fileinputformat.split.minsize- The minimum size chunk that map input should be split into. Default value is 0.
  • mapreduce.input.fileinputformat.split.maxsize- The maximum size chunk that map input should be split into. Default value is Long.MAX_VALUE.

So you can see with default values input split size is equal to HDFS block size.

Input split is a logical split

We have already seen how input data is divided into input splits in Hadoop. Further each split is divided into records (key-value pair). Map tasks process each of these records.

These input splits and records are logical, they don’t store or contain the actual data. They just refer to the data which is stored as blocks in HDFS. You can have a look at InputSplit class to verify that. InputSplit represents the data to be processed by an individual Mapper.

public abstract class InputSplit {
  public abstract long getLength() throws IOException, InterruptedException;

  public abstract String[] getLocations() throws IOException, InterruptedException;

  @Evolving
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
  }
}

As you can see the InputSplit class has the length of input split in bytes and a set of storage locations where the actual data is stored. These storage locations help Hadoop framework to spawn map tasks as close to the data as possible in order to take advantage of data locality optimization.

Input split Vs HDFS blocks

As already stated input split is the logical representation of the data stored in HDFS blocks. Where as data of file is stored physically in HDFS blocks. Now, you may think why these two representations of data is needed why can’t MapReduce directly process the data stored in blocks.

HDFS breaks the input file into chunks of exactly 128 MB (As per default configuration), with no consideration of data residing in those blocks. What if there is a record that spans the block boundaries i.e. A record that starts in one block and ends in another block. MapReduce job that is processing that HDFS block would only be able to read the portion of the record with in the block MapReduce job is processing rather than the whole record.That’s where the logical representation of data as used in input splits help. Input splits take care of the logical boundaries. Using the starting record in the block and the byte offset it can get the complete record even if it spans the block boundaries.

Input Splits in Hadoop

That's all for this topic Input Splits in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Speculative Execution in Hadoop
  2. How MapReduce Works in Hadoop
  3. What is SafeMode in Hadoop
  4. Data Compression in Hadoop
  5. YARN in Hadoop

You may also like-

  1. How to Compress Intermediate Map Output in Hadoop
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. HDFS Commands Reference List
  4. How to Configure And Use LZO Compression in Hadoop
  5. Capacity Scheduler in YARN
  6. How to Run a Shell Script From Java Program
  7. Enum Type in Java
  8. Switch-Case Statement in Java

Wednesday, November 22, 2023

Parquet File Format in Hadoop

Apache Parquet is a columnar storage file format available to any project in the Hadoop ecosystem (Hive, Hbase, MapReduce, Pig, Spark)

What is a columnar storage format

In order to understand Parquet file format in Hadoop better, first let’s see what is columnar format. In a column oriented format values of each column of in the records are stored together.

For example if there is a record which comprises of ID, emp Name and Department then all the values for ID column will be stored together, values for Name column together and so on. If we take the same record schema as mentioned above having three fields ID (int), NAME (varchar) and Department (varchar)

ID Name Department
1 emp1 d1
2 emp2 d2
3 emp3 d3

For this table in a row wise storage format the data will be stored as follows-

1 emp1 d1 2 emp2 d2 3 emp3 d3

Where as the same data will be stored as follows in a Column oriented storage format-

1 2 3 emp1 emp2 emp3 d1 d2 d3

How columnar storage format helps

As you can see from the storage formats, if you need to query few columns from a table then columnar storage format is more efficient as it will read only required columns since they are adjacent thus minimizing IO.

For example, let’s say you want only the NAME column. In a row storage format each record in the dataset has to be loaded, parsed into fields and then data for Name is extracted. With column oriented format it can directly go to Name column as all the values for that columns are stored together and get those values. No need to go through the whole record.

So column oriented format increases the query performance as less seek time is required to go the required columns and less IO is required as it needs to read only the columns whose data is required.

If you see from BigData context, where generally data is loaded to Hadoop after denormalizing it so columns are generally more in number, using a columnar file format like parquet brings a lot of improvement in performance.

Another benefit that you get is in the form of less storage. Compression works better if data is of same type. With column oriented format columns of the same type are stored together resulting in better compression.

Parquet format

Coming back to parquet file format, since it is a column oriented format so it brings the same benefit of improved performance and better compression.

One of the unique feature of Parquet is that it can store data with nested structures also in columnar fashion. Other columnar file formats flatten the nested structures and store only the top level in columnar format. Which means in Parquet file format even the nested fields can be read individually with out the need to read all the fields in the nested structure.
Note that Parquet format uses the record shredding and assembly algorithm described in the Dremel paper for storing nested structures in columnar fashion. Read more about it here.

Primitive data types in Parquet format

Primitive data types supported by the Parquet file format are as follows

  • BOOLEAN: 1 bit boolean
  • INT32: 32 bit signed ints
  • INT64: 64 bit signed ints
  • INT96: 96 bit signed ints
  • FLOAT: IEEE 32-bit floating point values
  • DOUBLE: IEEE 64-bit floating point values
  • BYTE_ARRAY: arbitrarily long byte arrays.

Logical types in Parquet format

Parquet format also defines logical types that can be used to store data, by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses parquet’s efficient encoding. For example, strings are stored as byte arrays (binary) with a UTF8 annotation, DATE must annotate an int32. These annotations define how to further decode and interpret the data.

For example- Defining a String in Parquet

message p {
    required binary s (UTF8);
}
Defining a date field in Parquet.
message p {
  required int32 d (DATE);
}

You can get the full list of Parquet logical types here - https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

Parquet file format

To understand the Parquet file format in Hadoop you should be aware of the following three terms-

  • Row group: A logical horizontal partitioning of the data into rows. A row group consists of a column chunk for each column in the dataset.
  • Column chunk: A chunk of the data for a particular column. These column chunks live in a particular row group and is guaranteed to be contiguous in the file.
  • Page: Column chunks are divided up into pages written back to back. The pages share a common header and readers can skip over page they are not interested in.

Parquet file format structure has a header, row group and footer. So the Parquet file format can be illustrated as follows.

Parquet File Format
Parquet File Format

Here Header just contains a magic number "PAR1" (4-byte) that identifies the file as Parquet format file.

Footer contains the following-

  • File metadata- The file metadata contains the locations of all the column metadata start locations. Readers are expected to first read the file metadata to find all the column chunks they are interested in. The columns chunks should then be read sequentially. It also includes the format version, the schema, any extra key-value pairs.
  • length of file metadata (4-byte)
  • magic number "PAR1" (4-byte)

That's all for this topic Parquet File Format in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Parquet File in Hadoop
  2. Sequence File in Hadoop
  3. Apache Avro Format in Hadoop
  4. How to Configure And Use LZO Compression in Hadoop
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. What is SafeMode in Hadoop
  2. HDFS High Availability
  3. Speculative Execution in Hadoop
  4. HDFS Commands Reference List
  5. Data Compression in Hadoop
  6. Lambda Expressions in Java 8
  7. How to Create Password Protected Zip File in Java
  8. Compressing And Decompressing File in GZIP Format - Java Program

Apache Avro Format in Hadoop

Apache Avro file format created by Doug cutting is a data serialization system for Hadoop. Avro provides simple integration with dynamic languages. Avro implementations for C, C++, C#, Java, PHP, Python, and Ruby are available.

Avro file

Avro file has two things-

  • Data definition (Schema)
  • Data

Both data definition and data are stored together in one file. With in the Avro data there is a header, in that there is a metadata section where the schema is stored. All objects stored in the file must be written according to that schema.


Avro Schema

Avro relies on schemas for reading and writing data. Avro schemas are defined with JSON that helps in data interoperability. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

While defining schema you can write it in a separate file having .avsc extension.

Avro Data

Avro data is serialized and stored in binary format which makes for a compact and efficient storage. Avro data itself is not tagged with type information because the schema used to write data is always available when the data is read. The schema is required to parse data. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

Avro file format

Avro specifies an object container file format. A file has a schema, and all objects stored in the file must be written according to that schema, using binary encoding.

Objects are stored in blocks that may be compressed. Synchronization markers are used between blocks to permit efficient splitting of files for MapReduce processing.

A file consists of:

  • A file header, followed by
  • one or more file data blocks

Following image shows the Avro file format.

Header Data block Data block …….

Avro file header consists of:

  1. Four bytes, ASCII 'O', 'b', 'j', followed by 1.
  2. File metadata, including the schema.
  3. The 16-byte, randomly-generated sync marker for this file.

A file header is thus described by the following schema:

{"type": "record", "name": "org.apache.avro.file.Header",
 "fields" : [
   {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
   {"name": "meta", "type": {"type": "map", "values": "bytes"}},
   {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}},
  ]
}
A file data block consists of:
  1. A long indicating the count of objects in this block.
  2. A long indicating the size in bytes of the serialized objects in the current block, after any codec is applied
  3. The serialized objects. If a codec is specified, this is compressed by that codec.
  4. The file's 16-byte sync marker.

How schema is defined in Avro

Avro schema is defined using JSON and consists of-

  1. A JSON string, naming a defined type.
  2. A JSON object, of the form: {"type": "typeName" ...attributes...}
    where typeName is either a primitive or derived type name, as defined below. Attributes not defined in this document are permitted as metadata, but must not affect the format of serialized data.
  3. A JSON array, representing a union of embedded types.

Primitive Types in Avro

Primitive types used in Avro are as follows-

  • null: no value
  • boolean: a binary value
  • int: 32-bit signed integer
  • long: 64-bit signed integer
  • float: single precision (32-bit) IEEE 754 floating-point number
  • double: double precision (64-bit) IEEE 754 floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: unicode character sequence
As example if you are defining field of type String
 {"name": "personName",  "type": "string"}

Complex Types in Avro

Avro supports six kinds of complex types: record, enum, array, map, union and fixed.

record- Records are defined using the type name "record" and support following attributes:

  • name- A JSON string providing the name of the record, this is a required attribute.
  • doc- A JSON string providing documentation to the user of this schema, this is an optional attribute.
  • aliases- A JSON array of strings, providing alternate names for this record, this is an optional attribute.
  • fields- A JSON array, listing fields, this is a required attribute. Each field in Record is a JSON object with the following attributes:
    • name- A JSON string providing the name of the field, this is a required attribute.
    • doc- A JSON string describing this field for users, this is an optional attribute.
    • type- A JSON object defining a schema, or a JSON string naming a record definition, this is a required attribute.
    • default- A default value for this field, used when reading instances that lack this field, this is an optional attribute.
    • order- Specifies how this field impacts sort ordering of this record, this is an optional attribute. Valid values are "ascending" (the default), "descending", or "ignore".
    • aliases- A JSON array of strings, providing alternate names for this field, this is an optional attribute.
As example schema for Person having Id, Name and Address fields.
{
 "type": "record",
 "name": "PersonRecord",
 "doc": "Person Record",
 "fields": [
  {"name":"Id",  "type":"long"},
  {"name":"Name",  "type":"string"},
  {"name":"Address",   "type":"string"}
 ]
}

enum- Enums use the type name "enum" and support the following attributes:

  • name- A JSON string providing the name of the enum, this is a required attribute. namespace, a JSON string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • doc- a JSON string providing documentation to the user of this schema, this is an optional attribute.
  • symbols- A JSON array, listing symbols, as JSON strings, this is a required attribute. All symbols in an enum must be unique; duplicates are prohibited.
For example, four seasons can be defined as:
{ "type": "enum",
  "name": "Seasons",
  "symbols" : ["WINTER", "SPRING", "SUMMER", "AUTUMN"]
}

array- Arrays use the type name "array" and support a single attribute:

  • items- The schema of the array's items.
For example, an array of strings is declared with:
{"type": "array", "items": "string"}

map- Maps use the type name "map" and support one attribute:

  • values- The schema of the map's values.
Map keys are assumed to be strings. For example, a map from string to int is declared with:
{"type": "map", "values": "int"}

union- Unions are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string. Avro data confirming to this union should match one of the schemas represented by union.

fixed- Fixed uses the type name "fixed" and supports following attributes:

  • name- A string naming this fixed, this is a required attribute. namespace, a string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • size- An integer, specifying the number of bytes per value, this is a required attribute.
For example, 16-byte quantity may be declared with:
{"type": "fixed", "size": 16, "name": "md5"}

Data encoding in Avro

Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster.

Reference: https://avro.apache.org/docs/1.8.2/index.html

That's all for this topic Apache Avro Format in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Parquet File Format in Hadoop
  2. Sequence File in Hadoop
  3. How to Configure And Use LZO Compression in Hadoop
  4. File Write in HDFS - Hadoop Framework Internal Steps
  5. Java Program to Read File in HDFS

You may also like-

  1. How to Check Hadoop MapReduce Logs
  2. How to Compress Intermediate Map Output in Hadoop
  3. Shuffle And Sort Phases in Hadoop MapReduce
  4. What is SafeMode in Hadoop
  5. How to Create Ubuntu Bootable USB
  6. Java Multi-Threading Interview Questions
  7. Difference Between ArrayList And CopyOnWriteArrayList in Java
  8. Invoking Getters And Setters Using Reflection - Java Program

Monday, November 20, 2023

Word Count MapReduce Program in Hadoop

The first MapReduce program most of the people write after installing Hadoop is invariably the word count MapReduce program.

That’s what this post shows, detailed steps for writing word count MapReduce program in Java, IDE used is Eclipse.

Creating and copying input file to HDFS

If you already have a file in HDFS which you want to use as input then you can skip this step.

First thing is to create a file which will be used as input and copy it to HDFS.

Let’s say you have a file wordcount.txt with the following content.

Hello wordcount MapReduce Hadoop program.
This is my first MapReduce program.

You want to copy this file to /user/process directory with in HDFS. If that path doesn’t exist then you need to create those directories first.

hdfs dfs -mkdir -p /user/process

Then copy the file wordcount.txt to this directory.

hdfs dfs -put /netjs/MapReduce/wordcount.txt /user/process 

Word count MapReduce example Java program

Now you can write your wordcount MapReduce code. WordCount example reads text files and counts the frequency of the words. Each mapper takes a line of the input file as input and breaks it into words. It then emits a key/value pair of the word (In the form of (word, 1)) and each reducer sums the counts for each word and emits a single key/value with the word and sum.

In the word count MapReduce code there is a Mapper class (MyMapper) with map function and a Reducer class (MyReducer) with a reduce function.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }           
    }
  }
    
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{        
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args)  throws Exception{
    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "WC");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Required jars for Hadoop MapReduce code

You will also need to add at least the following Hadoop jars so that your code can compile. You will find these jars inside the /share/hadoop directory of your Hadoop installation. With in /share/hadoop path look in hdfs, mapreduce and common directories for required jars.

 
hadoop-common-2.9.0.jar
hadoop-hdfs-2.9.0.jar
hadoop-hdfs-client-2.9.0.jar
hadoop-mapreduce-client-core-2.9.0.jar
hadoop-mapreduce-client-common-2.9.0.jar
hadoop-mapreduce-client-jobclient-2.9.0.jar
hadoop-mapreduce-client-hs-2.9.0.jar
hadoop-mapreduce-client-app-2.9.0.jar
commons-io-2.4.jar

Creating jar of your wordcount MapReduce code

Once you are able to compile your code you need to create jar file. In the eclipse IDE righ click on your Java program and select Export – Java – jar file.

Running the MapReduce code

You can use the following command to run the program. Assuming you are in your hadoop installation directory.

bin/hadoop jar /netjs/MapReduce/wordcount.jar org.netjs.WordCount  /user/process /user/out

Explanation for the arguments passed is as follows-

/netjs/MapReduce/wordcount.jar is the path to your jar file.

org.netjs.WordCount is the fully qualified path to your Java program class.

/user/process – path to input directory.

/user/out – path to output directory.

One your word count MapReduce program is succesfully executed you can verify the output file.

 
hdfs dfs -ls /user/out

Found 2 items
-rw-r--r--   1 netjs supergroup          0 2018-02-27 13:37 /user/out/_SUCCESS
-rw-r--r--   1 netjs supergroup         77 2018-02-27 13:37 /user/out/part-r-00000

As you can see Hadoop framework creates output files using part-r-xxxx format. Since only one reducer is used here so there is only one output file part-r-00000. You can see the content of the file using the following command.

 
hdfs dfs -cat /user/out/part-r-00000

Hadoop      1
Hello       1
MapReduce   2
This        1
first       1
is          1
my          1
program.    2
wordcount   1

That's all for this topic Word Count MapReduce Program in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. MapReduce Flow in YARN
  3. Speculative Execution in Hadoop
  4. What is HDFS
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. Data Locality in Hadoop
  3. Java Program to Read File in HDFS
  4. Data Compression in Hadoop
  5. YARN in Hadoop
  6. Uber Mode in Hadoop
  7. How to Create Immutable Class in Java
  8. Lambda Expressions in Java 8

Saturday, November 18, 2023

Capacity Scheduler in YARN

In the post YARN in Hadoop we have already seen that it is the scheduler component of the ResourceManager which is responsible for allocating resources to the running jobs. The scheduler component is pluggable in Hadoop and there are two options capacity scheduler and fair scheduler. This post talks about the capacity scheduler in YARN, its benefits and how capacity scheduler can be configured in Hadoop cluster.


YARN Capacity scheduler

Capacity scheduler in YARN allows multi-tenancy of the Hadoop cluster where multiple users can share the large cluster.

Every organization having their own private cluster leads to a poor resource utilization. An organization may provide enough resources in the cluster to meet their peak demand but that peak demand may not occur that frequently, resulting in poor resource utilization at rest of the time.

Thus sharing cluster among organizations is a more cost effective idea. However, organizations are concerned about sharing a cluster because they are worried that they may not get enough resources at the time of peak utilization. The CapacityScheduler in YARN mitigates that concern by giving each organization capacity guarantees.

Capacity scheduler in YARN functionality

Capacity scheduler in Hadoop works on the concept of queues. Each organization gets its own dedicated queue with a percentage of the total cluster capacity for its own use. For example if there are two organizations sharing the cluster, one organization may be given 60% of the cluster capacity where as the organization is given 40%.

On top of that, to provide further control and predictability on sharing of resources, the CapacityScheduler supports hierarchical queues. Organization can further divide its allocated cluster capacity into separate sub-queues for separate set of users with in the organization.

Capacity scheduler is also flexible and allows allocation of free resources to any queue beyond its capacity. This provides elasticity for the organizations in a cost-effective manner. When the queue to which these resources actually belongs has increased demand the resources are allocated to it when those resources are released from other queues.

Capacity scheduler in YARN configuration

To configure the ResourceManager to use the CapacityScheduler, set the following property in the conf/yarn-site.xml:

yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler 
For setting up queues in CapacityScheduler you need to make changes in etc/hadoop/capacity-scheduler.xml configuration file.

The CapacityScheduler has a predefined queue called root. All queues in the system are children of the root queue.

Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.

Setting up sub-queues with in a queue- configure property yarn.scheduler.capacity.<queue-path>.queues
Here queue-path is the full path of the queue’s hierarchy, starting at root, with . (dot) as the delimiter.

Capacity of the queue- Configure property yarn.scheduler.capacity.<queue-path>.capacity
Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each level, must be equal to 100. Applications in the queue may consume more resources than the queue’s capacity if there are free resources, providing elasticity.

Capacity scheduler queue configuration example

If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40%.

<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>XYZ, ABC</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.XYZ.queues</name>
  <value>technology,marketing</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.XYZ.capacity</name>
  <value>60</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.ABC.capacity</name>
  <value>40</value>
</property>

If you want to limit the elasticity for applications in the queue. Restricting XYZ's elasticity to 80% so that it doesn't use more than 80% of the total cluster capacity even if resources are available. In other words ABC has 20% to start with immediately.

<property>
  <name>yarn.scheduler.capacity.root.XYZ.maximum-capacity</name>
  <value>80</value>
</property>
For the two sub-queues of XYZ, you want to allocate 70% of the allocated queue capacity to technology and 30% to marketing.
<property>
  <name>yarn.scheduler.capacity.root.XYZ.technology.capacity</name>
  <value>70</value>
</property>

<property>
  <name>yarn.scheduler.capacity.root.XYZ.marketing.capacity</name>
  <value>30</value>
</property>

Reference: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

That's all for this topic Capacity Scheduler in YARN. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  3. Replica Placement Policy in Hadoop Framework
  4. Speculative Execution in Hadoop
  5. MapReduce Flow in YARN

You may also like-

  1. HDFS Federation in Hadoop Framework
  2. What is SafeMode in Hadoop
  3. Java Program to Read File in HDFS
  4. Data Compression in Hadoop
  5. Uber Mode in Hadoop
  6. CopyOnWriteArrayList in Java
  7. Type Casting in Java
  8. Getting All The Schemas in a DB - Java Program