Friday, March 29, 2019

How to Read And Write Parquet File in Hadoop

This post shows how to use Hadoop Java API to read and write Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema and using ParquetWriter and ParquetReader to write and read file respectively it is more convenient to use a framework like Avro to create schema. Then you can use AvroParquetWriter and AvroParquetReader to write and read Parquet files. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

Writing Parquet file – Java program

First thing you’ll need is the schema, since Avro is used so you will have to define Avro schema.

EmpSchema.avsc

{
 "type": "record",
 "name": "empRecords",
 "doc": "Employee Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Dept",
   "type": "string"
  }
 ]
}

Java program

The task needed in the program are as follows-

  1. First thing is to parse the schema.
  2. Then create a generic record using Avro genric API.
  3. Once you have the record write it to file using AvroParquetWriter.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class ParquetFileWrite {

    public static void main(String[] args) {
        // First thing - parse the schema as it will be used
        Schema schema = parseSchema();
        List<GenericData.Record> recordList = getRecords(schema);
        writeToParquet(recordList, schema);
    }
    
    private static Schema parseSchema() {
        Schema.Parser parser = new    Schema.Parser();
        Schema schema = null;
        try {
            // pass path to schema
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
                "resources/EmpSchema.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;
        
    }
    
    private static List<GenericData.Record> getRecords(Schema schema){
        List<GenericData.Record> recordList = new ArrayList<GenericData.Record>();
        GenericData.Record record = new GenericData.Record(schema);
        // Adding 2 records
        record.put("id", 1);
        record.put("Name", "emp1");
        record.put("Dept", "D1");
        recordList.add(record);
        
        record = new GenericData.Record(schema);
        record.put("id", 2);
        record.put("Name", "emp2");
        record.put("Dept", "D2");
        recordList.add(record);
        
        return recordList;
    }
    
    
    private static void writeToParquet(List<GenericData.Record> recordList, Schema schema) {
        // Path to Parquet file in HDFS
        Path path = new Path("/test/EmpRecord.parquet");
        ParquetWriter<GenericData.Record> writer = null;
        // Creating ParquetWriter using builder
        try {
            writer = AvroParquetWriter.
                <GenericData.Record>builder(path)
                .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
                .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
                .withSchema(schema)
                .withConf(new Configuration())
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withValidation(false)
                .withDictionaryEncoding(false)
                .build();
            
            for (GenericData.Record record : recordList) {
                writer.write(record);
            }
            
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
    

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.ParquetFileWrite 
18/07/05 19:56:41 INFO compress.CodecPool: Got brand-new compressor [.snappy] 18/07/05 19:56:41 INFO hadoop.InternalParquetRecordWriter:Flushing mem columnStore to file. allocated memory: 3072

Reading Parquet file – Java program

To read the parquet file created above you can use the following program.

import java.io.IOException;

import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetFileRead {

    public static void main(String[] args) {
        readParquetFile();
    }
        
    private static void readParquetFile() {
        ParquetReader<GenericData.Record> reader = null;
        Path path =    new    Path("/test/EmpRecord.parquet");
        try {
            reader = AvroParquetReader
                    .<GenericData.Record>builder(path)
                    .withConf(new Configuration())
                    .build();
            GenericData.Record record;
            while ((record = reader.read()) != null) {
                System.out.println(record);
            }
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}    

Using parquet-tools jar

You can also download parquet-tools jar and use it to see the content of a Parquet file, file metadata of the Parquet file, Parquet schema etc. As example to see the content of a Parquet file-

$ hadoop jar /parquet-tools-1.10.0.jar cat /test/EmpRecord.parquet 

Recommendations for learning

  1. The Ultimate Hands-On Hadoop
  2. Hive to ADVANCE Hive (Real time usage)
  3. Spark and Python for Big Data with PySpark
  4. Python for Data Science and Machine Learning
  5. Java Programming Masterclass Course

That's all for this topic How to Read And Write Parquet File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Read And Write SequenceFile in Hadoop
  3. How to Read And Write Avro File in Hadoop
  4. Java Program to Read File in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps

You may also like-

  1. How to Compress MapReduce Job Output in Hadoop
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. Uber Mode in Hadoop
  5. Interface Default Methods in Java 8
  6. Stream API in Java 8
  7. How to Read File From The Last Line in Java
  8. Insert\Update Using NamedParameterJDBCTemplate in Spring Framework