Friday, March 29, 2019

How to Read And Write Parquet File in Hadoop

This post shows how to use Hadoop Java API to read and write Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema and using ParquetWriter and ParquetReader to write and read file respectively it is more convenient to use a framework like Avro to create schema. Then you can use AvroParquetWriter and AvroParquetReader to write and read Parquet files. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

Writing Parquet file – Java program

First thing you’ll need is the schema, since Avro is used so you will have to define Avro schema.

EmpSchema.avsc

{
 "type": "record",
 "name": "empRecords",
 "doc": "Employee Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Dept",
   "type": "string"
  }
 ]
}

Java program

The task needed in the program are as follows-

  1. First thing is to parse the schema.
  2. Then create a generic record using Avro genric API.
  3. Once you have the record write it to file using AvroParquetWriter.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class ParquetFileWrite {

    public static void main(String[] args) {
        // First thing - parse the schema as it will be used
        Schema schema = parseSchema();
        List<GenericData.Record> recordList = getRecords(schema);
        writeToParquet(recordList, schema);
    }
    
    private static Schema parseSchema() {
        Schema.Parser parser = new    Schema.Parser();
        Schema schema = null;
        try {
            // pass path to schema
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
                "resources/EmpSchema.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;
        
    }
    
    private static List<GenericData.Record> getRecords(Schema schema){
        List<GenericData.Record> recordList = new ArrayList<GenericData.Record>();
        GenericData.Record record = new GenericData.Record(schema);
        // Adding 2 records
        record.put("id", 1);
        record.put("Name", "emp1");
        record.put("Dept", "D1");
        recordList.add(record);
        
        record = new GenericData.Record(schema);
        record.put("id", 2);
        record.put("Name", "emp2");
        record.put("Dept", "D2");
        recordList.add(record);
        
        return recordList;
    }
    
    
    private static void writeToParquet(List<GenericData.Record> recordList, Schema schema) {
        // Path to Parquet file in HDFS
        Path path = new Path("/test/EmpRecord.parquet");
        ParquetWriter<GenericData.Record> writer = null;
        // Creating ParquetWriter using builder
        try {
            writer = AvroParquetWriter.
                <GenericData.Record>builder(path)
                .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
                .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
                .withSchema(schema)
                .withConf(new Configuration())
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withValidation(false)
                .withDictionaryEncoding(false)
                .build();
            
            for (GenericData.Record record : recordList) {
                writer.write(record);
            }
            
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
    

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.ParquetFileWrite 
18/07/05 19:56:41 INFO compress.CodecPool: Got brand-new compressor [.snappy] 18/07/05 19:56:41 INFO hadoop.InternalParquetRecordWriter:Flushing mem columnStore to file. allocated memory: 3072

Reading Parquet file – Java program

To read the parquet file created above you can use the following program.

import java.io.IOException;

import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetFileRead {

    public static void main(String[] args) {
        readParquetFile();
    }
        
    private static void readParquetFile() {
        ParquetReader<GenericData.Record> reader = null;
        Path path =    new    Path("/test/EmpRecord.parquet");
        try {
            reader = AvroParquetReader
                    .<GenericData.Record>builder(path)
                    .withConf(new Configuration())
                    .build();
            GenericData.Record record;
            while ((record = reader.read()) != null) {
                System.out.println(record);
            }
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}    

Using parquet-tools jar

You can also download parquet-tools jar and use it to see the content of a Parquet file, file metadata of the Parquet file, Parquet schema etc. As example to see the content of a Parquet file-

$ hadoop jar /parquet-tools-1.10.0.jar cat /test/EmpRecord.parquet 

Recommendations for learning

  1. The Ultimate Hands-On Hadoop
  2. Hive to ADVANCE Hive (Real time usage)
  3. Spark and Python for Big Data with PySpark
  4. Python for Data Science and Machine Learning
  5. Java Programming Masterclass Course

That's all for this topic How to Read And Write Parquet File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Read And Write SequenceFile in Hadoop
  3. How to Read And Write Avro File in Hadoop
  4. Java Program to Read File in HDFS
  5. File Write in HDFS - Hadoop Framework Internal Steps

You may also like-

  1. How to Compress MapReduce Job Output in Hadoop
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. Uber Mode in Hadoop
  5. Interface Default Methods in Java 8
  6. Stream API in Java 8
  7. How to Read File From The Last Line in Java
  8. Insert\Update Using NamedParameterJDBCTemplate in Spring Framework

11 comments:

  1. How can I append records to an existing parquet file?

    ReplyDelete
  2. Hi, i get java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. Could you please help me?

    ReplyDelete
    Replies
    1. Check in this post for configuration settings- https://netjs.blogspot.com/2018/02/installing-hadoop-single-node-pseudo-distributed-mode.html

      Delete
  3. Hi, I am getting below error,

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

    ReplyDelete
    Replies
    1. Apart from the jars mentioned here ensure that yo have the other jars mentioned in this post- https://netjs.blogspot.com/2018/02/word-count-mapreduce-program-in-hadoop.html

      Delete
  4. Can we append more data in already existing parquet file by using above method ?

    ReplyDelete
  5. Hi,
    is it possible to append in existing parquet file ?

    ReplyDelete
    Replies
    1. Appending data to a HDFS file, though not considered a good practice, is possible. There are few configuration steps that are required you'll have to look for those steps and act accordingly.

      Delete
  6. Hi,
    Is there any way to clean/flush the writer ?

    ReplyDelete
  7. I'm getting NullPointerException when run builder method with HadoopInputFile param. because with Path param , this method is deprecated

    ReplyDelete
  8. Hi,
    With above approach i am able to create parquet file.
    Is there a way can i encrypt this parquet file and eventually load it in spark dataframes?

    ReplyDelete