Thursday, October 19, 2023

Data Locality in Hadoop

In this post we’ll see what is data locality in Hadoop and how it helps in minimizing the network traffic and increasing the overall throughput of the cluster.

Data locality optimization in Hadoop

To understand data locality in Hadoop you will need to understand how file is stored in HDFS and how MapReduce job calculates the number of input splits and launch map tasks to process data referred by those splits.

HDFS stores a file by dividing it into blocks of 128 MB (which is the default block size). These blocks are then stored in different nodes across the Hadoop cluster. There is also replication of blocks (by default replication factor is 3) so each block is stored on 3 different nodes for redundancy.

MapReduce job splits its input into input splits where split size is the size of an HDFS block, which is 128 MB by default. Hadoop creates one map task for each split i.e. if there are 8 input splits then 8 map tasks will be launched.

It is actually the client running the MapReduce job that calculates the splits for the job by calling getSplits().

That split information is used by YARN ApplicationMaster to try to schedule map tasks on the same node where split data is residing thus making the task data local. If map tasks are spawned at random locations then each map task has to copy the data it needs to process from the DataNode where that split data is residing, resulting in lots of cluster bandwidth. By trying to schedule map tasks on the same node where split data is residing what Hadoop framework does is to send computation to data rather than bringing data to computation, saving cluster bandwidth. This is called the data locality optimization.

Note here that it is not always possible to launch the map task on the same node where the input data resides because of resource constraints, in that case Hadoop framework will try to minimize the distance by trying to make map task rack local, if that is also not possible then it runs map task on different rack.

Categories based on data proximity

Based on where data for the mapper resides there are three categories.

  • Data local– If map task runs on the same node where the split data resides it is referred as data local. This is the optimal scenario.
  • Rack local– If the map task working on the data is launched on different node but in the same rack where the data resides this is known as rack local.
  • Different rack- If rack local is also not possible then map task is launched on a node on a different rack. In this case data has to be transferred between racks from the node where the split data resides to the node where map task is running.
data locality in Hadoop

That's all for this topic Data Locality in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Uber Mode in Hadoop
  2. What is SafeMode in Hadoop
  3. Data Compression in Hadoop
  4. Speculative Execution in Hadoop
  5. YARN in Hadoop

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. Java Program to Read File in HDFS
  3. NameNode, DataNode And Secondary NameNode in HDFS
  4. MapReduce Flow in YARN
  5. How to Compress Intermediate Map Output in Hadoop
  6. Stream API in Java 8
  7. How to Run a Shell Script From Java Program
  8. Lazy Initializing Spring Beans

Monday, October 16, 2023

Uber Mode in Hadoop

When a MapReduce job is submitted, ResourceManager launches the ApplicationMaster process (for MapReduce the ApplicationMaster is MRAppMaster) on a container. Then ApplicationMaster retrieves the number of input splits for the job and based on that it decides the number of mappers that has to be launched and also the number of reducers that has to be launched as per the configuration.

At this juncture ApplicationMaster has to decide whether to negotiate resources with the ResourceManager’s scheduler to run the map and reduce tasks or run the job sequentially with in the same JVM where ApplicationMaster is running.

This decision making by ApplicationMaster happens only if Uber mode is set to true in Hadoop. If uber mode is true and ApplicationMaster decides to run the MapReduce job with in the same JVM then the job is said to be running as uber task in YARN.

While running a MapReduce job in Hadoop2 you might have noticed this message on the console-

Job job_XXXXX_xxxx running in uber mode : false
This message will be displayed if you have not modified the default which is false for uber mode in Hadoop.

Why to run job as uber task in YARN

For running a job as uber task in YARN, job has to be "sufficiently small".

If a job is small enough, the ApplicationMaster can decide that the process of negotiating with ResourceManager to get resource containers, contacting the node managers of remote nodes and running tasks on those containers requires more effort than running the job sequentially on the same JVM as ApplicationMaster. Then it can run the job as uber task in YARN.

Uber mode configuration parameters in Hadoop

There are certain configuration parameters that help ApplicationMaster to decide when the job is small enough to be run as uber task in YARN. By default the values are; there should be less than 10 mappers, only one reducer and maximum input bytes should not be more than a HDFS block size.

Configurations parameters required for uber mode are set in etc/hadoop/mapred-site.xml

  • mapreduce.job.ubertask.enable- Used to set the uber mode as true or false. Set to true means small-jobs "ubertask" optimization is enabled. Default value is false.
  • mapreduce.job.ubertask.maxmaps- This configuration sets the maximum number of maps, beyond which job is considered too big for the ubertasking optimization. Users may override this value, but only downward. Default value is 9.
  • mapreduce.job.ubertask.maxreduces- This configuration sets the maximum number of reduces, beyond which job is considered too big for the ubertasking optimization. Currently support is for one reduce only. Larger values will be ignored. Users may override this value, but only downward. Default value is 1.
  • mapreduce.job.ubertask.maxbytes- Threshold for number of input bytes, beyond which job is considered too big for the ubertasking optimization. If no value is specified, dfs.block.size is used as a default. In case of HDFS it would mean HDFS bock size. Users may override this value, but only downward.

That's all for this topic Uber Mode in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. YARN in Hadoop
  2. Capacity Scheduler in YARN
  3. Introduction to Hadoop Framework
  4. What is HDFS
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. What is SafeMode in Hadoop
  2. Java Program to Write File in HDFS
  3. How to Configure And Use LZO Compression in Hadoop
  4. Data Locality in Hadoop
  5. Speculative Execution in Hadoop
  6. How HashMap Internally Works in Java
  7. Volatile Keyword in Java With Examples
  8. Angular @ViewChildren Decorator With Examples

Sunday, October 15, 2023

Speculative Execution in Hadoop

Speculative execution in Hadoop MapReduce is an option to run a duplicate map or reduce task for the same input data on an alternative node. This is done so that any slow running task doesn’t slow down the whole job.

Why is speculative execution in Hadoop needed

In Hadoop an input file is partitioned into several blocks and those blocks are stored on different nodes on a Hadoop cluster, there is also a replication of those blocks for redundancy.

When a Map-Reduce job runs it calculates the number of input splits (size of split is equal to HDFS block) and run as many map tasks as the number of splits. These map tasks run in parallel on the nodes where the data referred by the split resides.

What if few nodes in the cluster are not performing as fast as other nodes because of hardware or network problems. Map tasks running on those nodes will be slower compared to the map tasks running on other nodes. Reduce tasks can only start their execution once intermediate outputs of all the map tasks are available. So few slow moving map tasks can delay the execution of reduce tasks.

Also reduce tasks running on a slower node may take more time to finish thus delaying the over all job final output.

To guard against such slow tasks Hadoop starts the same task (working on the same input) on another node. Note that every block is replicated thrice by default. Hadoop will get the location of another node where the same input data resides and launch the task on that node with the assumption that on that node task will finish faster. This optimization by Hadoop is called the speculative execution of the task.

When is speculative task started

Once the map tasks or reduce tasks are started and monitored for some time Hadoop framework can determine which map task or reduce task is not making as much progress as the other running tasks of the same type. Only after this monitoring for some time and determining which tasks are slower Hadoop starts speculative execution of the tasks.

Since the speculative task in MapReduce and the original task both are working on the same set of data, output of which ever task finishes first successfully is used and the other one is killed.

How to configure speculative execution in Hadoop

Speculative execution is enabled by default for both map and reduce tasks. Properties for speculative execution are set in mapred-site.xml file.

  • mapreduce.map.speculative- If set to true then speculative execution of map task is enabled. Default is true.
  • mapreduce.reduce.speculative- If set to true then speculative execution of reduce task is enabled. Default is true.
  • mapreduce.job.speculative.speculative-cap-running-tasks- The max percent (0-1) of running tasks that can be speculatively re-executed at any time. Default value is 0.1.
  • mapreduce.job.speculative.speculative-cap-total-tasks- The max percent (0-1) of all tasks that can be speculatively re-executed at any time. Default value is 0.01.

Consideration for turning off speculative execution

Since speculative execution of task means running duplicate tasks, it increases the cluster load. If you have a very busy cluster or a cluster with limited resources then you may consider turning off the speculative execution.

Another thing to consider is that reduce task gets its input from more than one map tasks running on different nodes so there is data transfer in case of reduce tasks. Running a duplicate reduce task means same data transfer happens more than once thus increasing load on network.

That's all for this topic Speculative Execution in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Uber Mode in Hadoop
  2. What is SafeMode in Hadoop
  3. Replica Placement Policy in Hadoop Framework
  4. YARN in Hadoop
  5. Data Compression in Hadoop

You may also like-

  1. What is Big Data
  2. HDFS Commands Reference List
  3. File Write in HDFS - Hadoop Framework Internal Steps
  4. Compressing File in bzip2 Format in Hadoop - Java Program
  5. Capacity Scheduler in YARN
  6. How to Create Ubuntu Bootable USB
  7. Heap Memory Allocation in Java
  8. Lazy Initializing Spring Beans

Friday, October 13, 2023

HDFS Commands Reference List With Examples

In this post I have compiled a list of some frequently used HDFS commands along with examples.

Here note that you can either use hadoop fs - <command> or hdfs dfs - <command>. The difference is hadoop fs is generic which works with other file systems too where as hdfs dfs is for HDFS file system. Since we are specifically talking about hdfs here so hdfs dfs synonym is used.

1- If you need HDFS command help
hdfs dfs -help
Gives the list of all the HDFS commands and command description.

To get help about a specific command use that command with help.

As Example: If you want help about rm command

hdfs dfs -help rm

-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ... :
  Delete all files that match the specified file pattern. Equivalent to the Unix
  command "rm <src>"
                                                                                 
  -f          If the file does not exist, do not display a diagnostic message or 
              modify the exit status to reflect an error.                        
  -[rR]       Recursively deletes directories.                                   
  -skipTrash  option bypasses trash, if enabled, and immediately deletes <src>.  
  -safely     option requires safety confirmation, if enabled, requires          
              confirmation before deleting large directory with more than        
              <hadoop.shell.delete.limit.num.files> files. Delay is expected when
              walking over large directory recursively to count the number of    
              files to be deleted before the confirmation. 
2- Another way to get help for an individual command is to use usage command. It will list all the options that can be used with the given command.
hdfs dfs -usage rm 

Usage: hadoop fs [generic options] -rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...
3- HDFS command to create a directory
hdfs dfs -mkdir

As example– HDFS command to create test directory inside /user directory.

hdfs dfs -mkdir  /user/test
If you want to create parent directories along the path use -p switch.

As example If you want to create whole directory structure /user/test/abc

hdfs dfs -mkdir -p /user/test/abc

4- To list all the directories and files in the given path

hdfs dfs -ls

As example– HDFS command to list all the files under /user/test

hdfs dfs -ls /user/test
To recursively list all the files and sub directories use -R switch.

For example, HDFS command to recursively list all the files and directories starting from root directory.

hdfs dfs -ls -R /

5- HDFS command to delete a file

hdfs dfs -rm

As example – To delete file display.txt in the directory /user/test

hdfs dfs -rm /user/test/display.txt

To recursively delete a directory and any content under it use -R or -r option.

For example, HDFS command to recursively delete directory /user/test along with all the content under /user/test.

hdfs dfs -rm -R /user/test/

Deleted /user/test

6- HDFS command to delete a directory.

hdfs dfs -rmdir

As example– To delete directory /user/test. Note that rmdir command will delete a directory only if it is empty.

hdfs dfs -rmdir /user/test/

7- To copy file from local file system to HDFS.

hdfs dfs -put

As example– To copy file display.txt from /usr/netjs to /user/process in HDFS.

hdfs dfs -put /usr/netjs/display.txt /user/process

8- To copy file from local file system to HDFS using copyFromLocal command.

As example– To copy file display.txt from /usr/netjs in local file system to /user/process in HDFS.

hdfs dfs -copyFromLocal /usr/netjs/display.txt /user/process

9- To move file from local file system to HDFS. IF you use this command local file is deleted after it’s copied.

As example– HDFS command to move file display.txt from /usr/netjs to /user/process in HDFS.

hdfs dfs -moveFromLocal /usr/netjs/display.txt /user/process

10- HDFS command to copy file from HDFS to local

hdfs dfs -get

As example– If you want to copy file display.txt from /user/process in HDFS to /usr/netjs in local

hdfs dfs -get /user/process/display.txt /usr/netjs/

11- Another way to copy file from HDFS to local is using copyToLocal command.

As example – If you want to copy file display.txt from /user/process in HDFS to /usr/netjs in local

hdfs dfs -copyToLocal /user/process/display.txt /usr/netjs/

12- To copy file from one HDFS location to another HDFS location.

hdfs dfs -cp

As example- If you want to copy display.txt file from /user/process to /user/test with in HDFS.

hdfs dfs -cp /user/process/display.txt /user/test

13- HDFS command to display free space.

hdfs dfs -df

14- To displays sizes of files and directories contained with in the given directory.

hdfs dfs -du

As example- If you want to see the disk usage for /user directory

hdfs dfs -du /user

22  /user/process
22  /user/test
0   /user/test1

15- HDFS command to see the content of the file.

hdfs dfs -cat

As example– To display the content of the file /usr/process/display.txt

hdfs dfs -cat /user/process/display.txt

16- Permanently delete files in checkpoints older than the retention threshold from trash directory, and create new checkpoint.

hdfs dfs -expunge

17- HDFS command to change replication factor of a file.

hdfs dfs -setrep

As example– If you want to change the replication factor of the file /user/process/display.txt to two.

hdfs dfs -setrep 2 /user/process/display.txt

Replication 2 set: /user/process/display.txt

18- HDFS command to run HDFS filesystem checking utility.

hdfs fsck

As example– Running fsck for files under directory /user/process

hdfs fsck /user/process

/user/process/display.txt:  Under replicated BP-1309973318-127.0.1.1-1513945999329:blk_1073741865_1041. Target Replicas is 2 but found 1 live replica(s), 0 decommissioned replica(s), 0 decommissioning replica(s).
Status: HEALTHY
 Total size: 22 B
 Total dirs: 1
 Total files: 1
 Total symlinks:  0
 Total blocks (validated): 1 (avg. block size 22 B)
 Minimally replicated blocks: 1 (100.0 %)
 Over-replicated blocks: 0 (0.0 %)
 Under-replicated blocks: 1 (100.0 %)
 Mis-replicated blocks:  0 (0.0 %)
 Default replication factor: 1
 Average block replication: 1.0
 Corrupt blocks:  0
 Missing replicas:  1 (50.0 %)
 Number of data-nodes:  1
 Number of racks:  1
FSCK ended at Mon Feb 26 13:57:40 IST 2018 in 1 milliseconds

19- If you want to change group association of files.

hdfs dfs -chgrp

As example – Change group of /user/process/display.txt file to sales.

hdfs dfs -chgrp sales /user/process/display.txt

20- To change the permissions of files.

hdfs dfs -chmod

As example- If you want to provide read, write and execute permissions to user and read permission to group and others for file /user/process/display.txt.

hdfs dfs -chmod 744 /user/process/display.txt

That's all for this topic HDFS Commands Reference List With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. What is HDFS
  2. Replica Placement Policy in Hadoop Framework
  3. NameNode, DataNode And Secondary NameNode in HDFS
  4. What is SafeMode in Hadoop
  5. HDFS High Availability

You may also like-

  1. What is Big Data
  2. Introduction to Hadoop Framework
  3. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  4. Compressing File in bzip2 Format in Hadoop - Java Program
  5. How to Compress Intermediate Map Output in Hadoop
  6. Speculative Execution in Hadoop
  7. How to Compile Java Program at Runtime
  8. Deadlock in Java Multi-Threading

Thursday, October 12, 2023

Java Program to Read File in HDFS

In this post we’ll see a Java program to read a file in HDFS. You can read a file in HDFS in two ways-

  1. Create an object of FSDataInputStream and use that object to read data from file. See example.
  2. You can use IOUtils class provided by Hadoop framework. See example.

Reading HDFS file Using FSDataInputStream

import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSFileRead {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  FSDataInputStream in = null;
  OutputStream out = null;
  try {
   FileSystem fs = FileSystem.get(conf);
   // Input file path
   Path inFile = new Path(args[0]);
     
   // Check if file exists at the given location
   if (!fs.exists(inFile)) {
    System.out.println("Input file not found");
    throw new IOException("Input file not found");
   }
   // open and read from file
   in = fs.open(inFile);
   //displaying file content on terminal 
   out = System.out;
   byte buffer[] = new byte[256];
  
   int bytesRead = 0;
   while ((bytesRead = in.read(buffer)) > 0) {
    out.write(buffer, 0, bytesRead);
   }      
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }finally {
   // Closing streams
   try {
    if(in != null) {     
     in.close();    
    }
    if(out != null) {
     out.close();
    }
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}

In order to execute this program you need to add the class path to the Hadoop’s classpath.

export HADOOP_CLASSPATH=<PATH TO .class FILE>

To run program- hadoop org.netjs.HDFSFileRead /user/process/display.txt

Reading HDFS file Using IOUtils class

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSFileRead {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  FSDataInputStream in = null;
  //OutputStream out = null;
  try {
   FileSystem fs = FileSystem.get(conf);
   // Input file path
   Path inFile = new Path(args[0]);
     
   // Check if file exists at the given location
   if (!fs.exists(inFile)) {
    System.out.println("Input file not found");
    throw new IOException("Input file not found");
   }
   in = fs.open(inFile);
   
   IOUtils.copyBytes(in, System.out, 512, false);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }finally {
   IOUtils.closeStream(in);
  }
 }
}

That's all for this topic Java Program to Read File in HDFS. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Java Program to Write File in HDFS
  2. File Read in HDFS - Hadoop Framework Internal Steps
  3. HDFS Commands Reference List
  4. What is SafeMode in Hadoop
  5. HDFS Federation in Hadoop Framework

You may also like-

  1. What is Big Data
  2. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  3. Word Count MapReduce Program in Hadoop
  4. How MapReduce Works in Hadoop
  5. How to Compress MapReduce Job Output in Hadoop
  6. How to Configure And Use LZO Compression in Hadoop
  7. Reading File in Java Using Files.lines And Files.newBufferedReader
  8. StringBuilder Class in Java With Examples

Wednesday, October 11, 2023

Java Program to Write File in HDFS

In this post we’ll see a Java program to write a file in HDFS. You can write a file in HDFS in two ways-

  1. Create an object of FSDataOutputStream and use that object to write data to file. See example.
  2. You can use IOUtils class provided by Hadoop framework. See example.

Writing a file in HDFS using FSDataOutputStream

package org.netjs;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSFileWrite {
  public static void main(String[] args) {
    Configuration conf = new Configuration();
    FSDataInputStream in = null;
    FSDataOutputStream out = null;  
    try {
      FileSystem fs = FileSystem.get(conf);
      // Input & Output file paths
      Path inFile = new Path(args[0]);
      Path outFile = new Path(args[1]);
      // check if file exists
      if (!fs.exists(inFile)) {
        System.out.println("Input file not found");
        throw new IOException("Input file not found");
      }
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }
      in = fs.open(inFile);
      out = fs.create(outFile);
      byte buffer[] = new byte[256];
      int bytesRead = 0;
      while ((bytesRead = in.read(buffer)) > 0) {
        out.write(buffer, 0, bytesRead);
      }      
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }finally {
      try {
        if(in != null) {    
          in.close();    
        }
        if(out != null) {
          out.close();
        }
      } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}
In order to execute this program you need to add the class path to the Hadoop’s classpath.
export HADOOP_CLASSPATH=<PATH TO .class FILE>

To run program- hadoop org.netjs.HDFSFileWrite /user/process/display.txt /user/process/writeFile.txt

Writing a file in HDFS using IOUtils class

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSFileWrite {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  FSDataInputStream in = null;
  FSDataOutputStream out = null;
  
  try {
   FileSystem fs = FileSystem.get(conf);
   // Input & Output file paths
   Path inFile = new Path(args[0]);
   Path outFile = new Path(args[1]);
   // check if file exists
   if (!fs.exists(inFile)) {
    System.out.println("Input file not found");
    throw new IOException("Input file not found");
   }
   if (fs.exists(outFile)) {
    System.out.println("Output file already exists");
    throw new IOException("Output file already exists");
   }
   
   in = fs.open(inFile);
   out = fs.create(outFile);
   IOUtils.copyBytes(in, out, 512, false);
   
   
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }finally {
   IOUtils.closeStream(in);
   IOUtils.closeStream(out);
  }
 }
}

That's all for this topic Java Program to Write File in HDFS. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Java Program to Read File in HDFS
  2. File Write in HDFS - Hadoop Framework Internal Steps
  3. HDFS Commands Reference List
  4. What is SafeMode in Hadoop
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. Word Count MapReduce Program in Hadoop
  3. Speculative Execution in Hadoop
  4. MapReduce Flow in YARN
  5. Capacity Scheduler in YARN
  6. How to Create Ubuntu Bootable USB
  7. What is Dependency Injection in Spring
  8. Writing File in Java

Tuesday, October 10, 2023

File Write in HDFS - Hadoop Framework Internal Steps

In this post we’ll see the internal steps with in the Hadoop framework when a file is written in HDFS.

Writing file in HDFS - Initial step

When client application wants to create a file in HDFS it calls create() method on DistributedFileSystem which in turn calls the create() method of the DFSClient. With in the Hadoop framework it is the DFSClient class which communicates with NameNode and DataNodes.

Since the metadata about the file is stored in NameNode so first communication is with NameNode to store file data in its namespace. NameNode performs some checks related to client permissions and file not already existing. If those checks are passed metadata about the file is stored.

For creating Output Stream FSDataOutputStream and DFSOutputStream classes are used. Client streams file data to FSDataOutputStream which wraps an instance of DFSOutputStream class. The streamed file data is cached internally by DFSOutputStream class and divided into packets of 64 KB. These packets are enqueued into a queue called dataQueue which is actually a Java LinkedList.

Getting DataNode information for writing to the HDFS blocks

There is another class DataStreamer which connects to NameNode and retrieves a new blockid and block locations. Remember for the default replication factor of 3 NameNode will send a list of 3 DataNodes for storing each block of the file. These DataNodes form a pipeline to stream block data from one DataNode to another.

The DataStreamer thread picks up packets from the dataQueue and streams it to the first datanode in the pipeline which stores the packets and also forwards those packets to the second Datanode which stores them and forwards the packet to the third Datanode in the pipeline.

There is another queue in the DFSOutputStream class known as ackQueue. After sending the packet to the first datanode in the pipeline DataStreamer thread moves that packet from the dataQueue to the ackQueue. When that packet is written to all the DataNodes in the pipeline then only it is removed from ackQueue.

DataNode failure scenario while writing file in HDFS

In case any of the DataNode in the pipeline fails then the pipeline is closed and the packets in the ackQueue that are not yet acknowledged are removed from the ackQueue and added in the front of dataQueue so that the DataStreamer thread can pick them up again.

Then a new pipeline is created excluding the failed DataNode and the packets are written to the remaining two DataNodes.

When each DataNode in the pipeline has completed writing the block locally, DataNode also notify the NameNode of their block storage. For the scenario as stated above where block is replicated to only two DataNodes, after getting the reports from DataNodes, NameNode will know that the replication factor of 3 is not maintained so it will ensure that the replica is created in another DataNode.

HDFS data flow for file write in HDFS

That's all for this topic File Write in HDFS - Hadoop Framework Internal Steps. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. File Read in HDFS - Hadoop Framework Internal Steps
  2. What is HDFS
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. Introduction to Hadoop Framework
  2. Replica Placement Policy in Hadoop Framework
  3. What is SafeMode in Hadoop
  4. Data Compression in Hadoop
  5. Fair Scheduler in YARN
  6. Uber Mode in Hadoop
  7. Difference Between Abstract Class And Interface in Java
  8. Writing File in Java

Monday, October 9, 2023

File Read in HDFS - Hadoop Framework Internal Steps

In this post we’ll see what all happens internally with in the Hadoop framework when a file is read in HDFS.

Reading file in HDFS

With in the Hadoop framework it is the DFSClient class which communicates with NameNode and DataNodes. The instance of DFSClient is created by DistributedFileSystem which is the implementation class in case of HDFS.

When client application has to read a file it calls open() method on DistributedFileSystem which in turn calls open() method of DFSClient. DFSClient creates an instance of DFSInputStream which communicates with NameNode.

DFSInputStream connects to the NameNode and gets the location of first few blocks of the file. Note that default replication factor is 3 so, for every block, information about 3 DataNodes that are storing the specific block will be sent by NameNode. In the list sent by NameNode, DataNodes are also ordered by their proximity to the client for each block. So client application will try to read data from a local DataNode first rather than the remote DataNode.

Reading blocks from DataNodes

Once the list of blocks is retrieved client application calls read on the wrapper stream FSDataInputStream. In turn the wrapper stream DFSInputStream which already has a list of DataNodes connects to the nearest DataNode storing the first block of the file and start streaming data to the client. DFSInputStream will follow the same procedure for all the blocks in the list; connect to DataNode storing that block, stream data, disconnect from the DataNode.

Since NameNode sends only the first few blocks of the file, DFSInputStream also communicates with NameNode to get the DataNode information for the next set of blocks. This process will continue until all the blocks of the file are read.

Once all the blocks are read and streamed to the client, stream is closed.

In the architecture followed for reading the file the client is directly connecting and getting the data from DataNodes. No data flows through NameNode.

In case of any error while reading the block another DataNode storing the same block is tried. That is where replication helps.

file read in HDFS
HDFS data flow for file read in HDFS

That's all for this topic File Read in HDFS - Hadoop Framework Internal Steps. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. File Write in HDFS - Hadoop Framework Internal Steps
  2. What is HDFS
  3. HDFS Federation in Hadoop Framework
  4. HDFS High Availability
  5. NameNode, DataNode And Secondary NameNode in HDFS

You may also like-

  1. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  2. What is SafeMode in Hadoop
  3. Data Locality in Hadoop
  4. How to Configure And Use LZO Compression in Hadoop
  5. YARN in Hadoop
  6. How HashMap Internally Works in Java
  7. Installing Ubuntu Along With Windows
  8. How to Read File From The Last Line in Java

Sunday, October 8, 2023

HDFS High Availability

This post gives an overview of HDFS High Availability (HA), why it is required and how HDFS High Availability can be managed.

Problem with single NameNode

To guard against the vulnerability of having a single NameNode in a Hadoop cluster there are options like setting up a Secondary NameNode to take up the task of merging the FsImage and EditLog or to have an HDFS federation to have separate NameNodes for separate namespaces. Still, having a backup NameNode is something that was missing and the NameNode was a single point of failure (SPOF) in an HDFS cluster. This impacted the total availability of the HDFS cluster in two major ways:

  1. In the case of an unplanned event such as a machine crash, the cluster would be unavailable until an operator restarted the NameNode.
  2. In case of NameNode planned maintenance for any software or hardware upgrades would result in windows of cluster downtime.

In these cases a new NameNode will be able to start service requests only after-

  1. Loading the FsImage into memory and merging all the transactions stored in EditLog.
  2. Getting enough block reports from DataNodes as per configuration to leave safemode.
In a large cluster this may mean a lapse of close to half an hour where the whole cluster remains idle.

In Hadoop 2.x release a new feature HDFS High Availability is introduced that addresses the above problems by providing the option of running two redundant NameNodes in the same cluster in an Active/Passive configuration.


HDFS High Availability architecture

With HDFS high availability two separate machines are configured as NameNodes in a cluster.

Out of these two NameNodes, at any point in time, exactly one of the NameNodes is in an Active state and responsible for all client operations in the cluster.

The other NameNode remains in a Standby state. It has to maintain enough state to provide a fast failover if necessary.

For the standby NameNode to keep its state synchronized with the Active node both nodes should have access to an external shared entity. For this shared access there are two options-

  • Quorum Journal Manager (QJM)
  • Shared NFS directory

General concept in both of these options is same whenever any namespace modification is performed by the Active node, it logs a record of the modification to the shared access too. Standby node reads those edits from the shared access and applies them to its own namespace.

That way both the Namenodes are synchronized and standby NameNode can be promoted to the Active state in the event of a failover.

Both of the Namenodes should also have the location of all blocks in the Datanodes. To keep that block mapping information up-to-date DataNodes are configured with the location of both NameNodes, and send block location information and heartbeats to both.

Shared access with NFS

If you are using NFS directory as shared access then it is required that both the NameNodes have access to that NFS directory.

Any namespace modification performed by the Active node is looged to edit log file stored in the shared directory. The Standby node is constantly watching this directory for edits, and as it sees the edits, it applies them to its own namespace.

Shared access with QJM

In case of QJM both nodes communicate with a group of separate daemons called “JournalNodes” (JNs). When any namespace modification is performed by the Active node, it durably logs a record of the modification to a majority of these JNs.

The Standby node is capable of reading the edits from the JNs, and is constantly watching them for changes to the edit log. As the Standby Node sees the edits, it applies them to its own namespace.

There must be at least 3 JournalNode daemons, since edit log modifications must be written to a majority of JNs. This will allow the system to tolerate the failure of a single machine. You may also run more than 3 JournalNodes, but in order to actually increase the number of failures the system can tolerate, you should run an odd number of JNs, (i.e. 3, 5, 7, etc.). Note that when running with N JournalNodes, the system can tolerate at most (N - 1) / 2 failures and continue to function normally.

HDFS High Availability

Configuration for HA cluster

The configuration changes required for high availability NameNodes in Hadoop are as follows. Changes are required in hdfs-site.xml configuration file.

dfs.nameservices – Choose a logical name for this nameservice, for example “mycluster”

<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>

Then you need to configure the NameNodes suffixed with the nameservice ID. If the individual ids of the Namenodes are namenode1 and namenode2 and the nameservice ID is mycluster.

<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>namenode1,namenode2</value>
</property>

You also need to provide fully-qualified RPC address and fully-qualified HTTP address for each NameNode to listen on.

<property>
  <name>dfs.namenode.rpc-address.mycluster.namenode1</name>
  <value>machine1.example.com:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.namenode1</name>
  <value>machine1.example.com:50070</value>
</property>


<property>
  <name>dfs.namenode.rpc-address.mycluster.namenode2</name>
  <value>machine2.example.com:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.namenode2</name>
  <value>machine2.example.com:50070</value>
</property>
To provide the location of the shared storage.

In case of QJM with 3 machines.

<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://node1.example.com:8485;node2.example.com:8485;
      node3.example.com:8485/mycluster</value>
</property>
In case of NFS
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>file:///mnt/filer1/dfs/ha-name-dir-shared</value>
</property>

dfs.client.failover.proxy.provider.[nameservice ID] - the Java class that HDFS clients use to contact the Active NameNode. The two implementations which currently ship with Hadoop are the ConfiguredFailoverProxyProvider and the RequestHedgingProxyProvider. So use one of these unless you are using a custom proxy provider.

<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

Handling NameNode failover in HDFS HA

In case of failover the Standby node should be promoted to active state where as the previously active NameNode should transition to standby mode.

That failover can be managed manually by using the following command.

hdfs haadmin -failover

This command will initiate a failover between two NameNodes. If the first NameNode is in the Standby state, this command simply transitions the second to the Active state without error. If the first NameNode is in the Active state, an attempt will be made to gracefully transition it to the Standby state.

For automatic failover ZooKeeper can be configured. In ZooKeeper there is a ZKFailoverController (ZKFC) process which monitors and manages the state of the NameNode.

Each of the machines which runs a NameNode also runs a ZKFC. The ZKFC pings its local NameNode on a periodic basis with a health-check command.

If the node has crashed, frozen, or otherwise entered an unhealthy state, the health monitor will mark it as unhealthy. In that case failover mechanism is triggered.

Fencing procedure in HDFS High Availability

In a Hadoop cluster, at any given time, only one of the NameNode should be in the active state for the correctness of the system. So, it is very important to ensure that the NameNode that is transitioning from active to standby in HDFS High availability configuration is not active any more.

That is why there is a need to fence the Active NameNode during a failover.

Note that when using the Quorum Journal Manager, only one NameNode is allowed to write to the edit logs in JournalNodes, so there is no potential for corrupting the file system metadata. However, when a failover occurs, it is still possible that the previous Active NameNode could serve read requests to clients.

There are two methods which ship with Hadoop for fencing: shell and sshfence.

sshfence- SSH to the Active NameNode and kill the process. In order for this fencing option to work, it must be able to SSH to the target node without providing a passphrase.

<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>

<property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/exampleuser/.ssh/id_rsa</value>
</property>

shell- run an arbitrary shell command to fence the Active NameNode. The shell fencing method runs an arbitrary shell command. It may be configured like so:

<property>
  <name>dfs.ha.fencing.methods</name>
  <value>shell(/path/to/my/script.sh arg1 arg2 ...)</value>
</property>

Reference: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html

That's all for this topic HDFS High Availability. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. What is HDFS
  2. Replica Placement Policy in Hadoop Framework
  3. What is SafeMode in Hadoop
  4. HDFS Commands Reference List
  5. Java Program to Read File in HDFS

You may also like-

  1. Speculative Execution in Hadoop
  2. Data Locality in Hadoop
  3. YARN in Hadoop
  4. MapReduce Flow in YARN
  5. Uber Mode in Hadoop
  6. Compressing File in snappy Format in Hadoop - Java Program
  7. Java Collections Interview Questions
  8. Converting double to int - Java Program