Thursday, December 21, 2023

How to Read And Write SequenceFile in Hadoop

In this post we’ll see how you can read and write a sequence file using Java API in Hadoop. We’ll also see how to read and write sequence file using MapReduce.


Java program to write a sequence file

Using the createWriter() method of the SeqeunceFile you can get a writer that can then be used to write a SequenceFile. In this Java program a file from local file system is written as a SequenceFile into HDFS.

    
import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;

public class FileWriter {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  int i =0;
  try {
   FileSystem fs = FileSystem.get(conf);
   // path to input file – in local file system
   File file = new File("netjs/Hadoop/Data/log.txt");
   // Path for output file
   Path outFile = new Path(args[0]);
   IntWritable key = new IntWritable();
   Text value = new Text();
   SequenceFile.Writer writer = null;
   try {
      // creating writer
      writer = SequenceFile.createWriter(conf, Writer.file(outFile), 
      Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()), 
      Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()));
    for (String line : FileUtils.readLines(file)) {     
     key.set(i++);
     value.set(line);
     writer.append(key, value);
    }
   }finally {
    if(writer != null) {
     writer.close();
    }
   }
  
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

}
To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.
export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin  
Then you can run the Java program using the following command.
hadoop org.netjs.FileWriter /user/out/data.seq

Java program to read a sequence file

To read a SequenceFile using Java API in Hadoop create an instance of SequenceFile.Reader. Using that reader instance you can iterate the (key, value) pairs in the SequenceFile using the next() method.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
 
public class FileReader {
 public static void main(String[] args) {
  
  Configuration conf = new Configuration();
  try {
   Path inFile = new Path(args[0]);
   SequenceFile.Reader reader = null;
   try {
    IntWritable key = new IntWritable();
    Text value = new Text();
    reader = new SequenceFile.Reader(conf, Reader.file(inFile), Reader.bufferSize(4096));
    while(reader.next(key, value)) {
     System.out.println("Key " + key + "Value " + value);
    }
 
   }finally {
    if(reader != null) {
     reader.close();
    }
   }
  } catch (IOException e) {
   // TODO Auto-generated catch bloc
   e.printStackTrace();
  }
 }
}
Then you can read the previously written SequenceFile using the following command.
hadoop org.netjs.FileReader /user/out/data.seq

MapReduce job to write a SequenceFile

If you have a very big file and you want to take advantage of parallel processing then you can also use MapReduce to write a sequence file. Only change that is required is to set Output format class as SequenceFileOutputFormat. Also set number of reducers as 0 since you need Mapper only job.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class FileWriter extends Configured implements Tool{
    // Map function
    public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
                 context.write(key, value);
         }
    }
    public static void main(String[] args)  throws Exception{
         int flag = ToolRunner.run(new FileWriter(), args);
         System.exit(flag);
        
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "seqfilewrite");
        job.setJarByClass(FileWriter.class);
        job.setMapperClass(SFMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // If you want to compress
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
        int returnFlag = job.waitForCompletion(true) ? 0 : 1;
        return returnFlag;
    }    
}

MapReduce job to read a SequenceFile

In this case set Input format class as SequenceFileInputFormat.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class FileReader extends Configured implements Tool{
    // Map function
    public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
                 context.write(key, value);
         }
    }
    public static void main(String[] args)  throws Exception{
         int flag = ToolRunner.run(new FileReader(), args);
         System.exit(flag);
        
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "seqfileread");
        job.setJarByClass(FileReader.class);
        job.setMapperClass(SFMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        int returnFlag = job.waitForCompletion(true) ? 0 : 1;
        return returnFlag;
    }
}

That's all for this topic How to Read And Write SequenceFile in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Compressing File in snappy Format in Hadoop - Java Program
  2. Uber Mode in Hadoop
  3. Java Program to Read File in HDFS
  4. What is SafeMode in Hadoop
  5. Data Locality in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. How to Compress MapReduce Job Output in Hadoop
  3. Capacity Scheduler in YARN
  4. MapReduce Flow in YARN
  5. Java Collections Interview Questions
  6. How to Create Password Protected Zip File in Java
  7. Creating Tar File And GZipping Multiple Files - Java Program
  8. Ternary Operator in Java

No comments:

Post a Comment