Friday, December 29, 2023

Converting Text File to Parquet File Using Hadoop MapReduce

This post shows how to convert existing data to Parquet file format using MapReduce in Hadoop. In the example given here Text file is converted to Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema directly Avro framework is used to create schema as it is more convenient. Then you can use Avro API classes to write and read files respectively. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

MapReduce code to convert file to Parquet format file

In the code Avro schema is defined inline. Program uses Avro genric API to create generic record. Also it’s a Mapper only job as just conversion is required, records are not aggregated.

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.example.data.Group;

public class ParquetConvert extends Configured implements Tool{
    
    /// Schema
    private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"TextFile\",\n" +
            "    \"doc\":    \"Text File\",\n" +
            "    \"fields\":\n" + 
            "    [\n" +  
            "            {\"name\":    \"line\", \"type\":    \"string\"}\n"+
            "    ]\n"+
            "}\n");
    
    // Map function
    public static class ParquetConvertMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
        
        private GenericRecord record = new GenericData.Record(MAPPING_SCHEMA);
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
             record.put("line", value.toString());
             context.write(null, record); 
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetConvert");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetConvertMapper.class);    
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Void.class);
        job.setOutputValueClass(Group.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        // setting schema
        AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetConvert(), args);
        System.exit(exitFlag);
    }
}   
On runnning the MapReduce code using the following command
hadoop jar /PATH_TO_JAR org.netjs.ParquetConvert /test/input /test/output
You can see that the Parquet file is written at the output location.
hdfs dfs -ls /test/output

Found 4 items
-rw-r--r--   1 netjs supergroup          0 2018-07-06 09:54 /test/output/_SUCCESS
-rw-r--r--   1 netjs supergroup        276 2018-07-06 09:54 /test/output/_common_metadata
-rw-r--r--   1 netjs supergroup        429 2018-07-06 09:54 /test/output/_metadata
-rw-r--r--   1 netjs supergroup        646 2018-07-06 09:54 /test/output/part-m-00000.parquet

Reading Parquet file using MapReduce

The following MapReduce program takes Parquet file as input and output a text file. In the Parquet file the records are in following format, so you need to write appropriate logic to extract the relevant part.

{"line": "Hello wordcount MapReduce Hadoop program."}
{"line": "This is my first MapReduce program."}
{"line": "This file will be converted to Parquet using MR."}
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.ExampleInputFormat;

public class ParquetRead extends Configured implements Tool{
    // Map function
    public static class ParquetMapper extends Mapper<NullWritable, Group, NullWritable, Text> {
         public void map(NullWritable key, Group value, Context context) 
                 throws IOException, InterruptedException {
             NullWritable outKey = NullWritable.get();
             String line = value.toString();
             
             String[] fields = line.split(": ");
             context.write(outKey, new Text(fields[1]));
                     
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetRead");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetMapper.class);    
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(ExampleInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetRead(), args);
        System.exit(exitFlag);
    }

}
If you want to read back the data you got using the writing to Parquet MapReduce program you can use the following command.
hadoop jar /PATH_TO_JAR org.netjs.ParquetRead /test/output/part-m-00000.parquet /test/out

That's all for this topic Converting Text File to Parquet File Using Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Parquet File in Hadoop
  2. Using Avro File With Hadoop MapReduce
  3. How to Configure And Use LZO Compression in Hadoop
  4. MapReduce Flow in YARN
  5. Input Splits in Hadoop

You may also like-

  1. Data Locality in Hadoop
  2. Speculative Execution in Hadoop
  3. YARN in Hadoop
  4. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  5. Serialization in Java
  6. Nested Class And Inner Class in Java
  7. Reflection in Java
  8. Print Odd-Even Numbers Using Threads And wait-notify - Java Program

No comments:

Post a Comment