Sunday, December 31, 2023

Using Avro File With Hadoop MapReduce

In this post we’ll see how to use Avro file with Hadoop MapReduce.

Avro MapReduce jar

You will need to download following jar and put it into project’s class path.

avro-mapred-1.8.2.jar

Avro MapReduce example

In this MapReduce program we have to get total sales per item and the output of MapReduce is an Avro file. Records are in the given tab separated format.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3

Avro schema (SALES_SCHEMA) used in the program is inlined with in the MapReduce code. While creating output record this Avro schema is referred as given below-

GenericRecord record = new GenericData.Record(SALES_SCHEMA);
MapReduce code
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{
    /// Schema
    private static final Schema SALES_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"SalesRecord\",\n" +
            "    \"doc\":    \"Sales Records\",\n" +
            "    \"fields\":\n" + 
            "    [\n" + 
            "            {\"name\": \"item\",    \"type\":    \"string\"},\n"+ 
            "            {\"name\":    \"totalsales\", \"type\":    \"int\"}\n"+
            "    ]\n"+
            "}\n");

    //Mapper
    public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
            AvroValue<GenericRecord>>{
        private Text item = new Text();
        private GenericRecord record = new GenericData.Record(SALES_SCHEMA);
        public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
            //splitting record
            String[] salesArr = value.toString().split("\t");        
            item.set(salesArr[0]);
            record.put("item", salesArr[0]);
            record.put("totalsales", Integer.parseInt(salesArr[1]));
            context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
         }
    }
    
    // Reducer
    public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
                      AvroKey<GenericRecord>, NullWritable>{       
        public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, 
              Context context) throws IOException, InterruptedException {
          int sum = 0;
          for (AvroValue<GenericRecord> value : values) {
              GenericRecord    record = value.datum();
              sum += (Integer)record.get("totalsales");
          }
          GenericRecord record = new GenericData.Record(SALES_SCHEMA);
          record.put("item", key.datum());
          record.put("totalsales", sum);
          context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
        }
    }
    
    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new AvroMR(), args);
        System.exit(exitFlag);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "AvroMR");
        job.setJarByClass(getClass());
        job.setMapperClass(ItemMapper.class);    
        job.setReducerClass(SalesReducer.class);
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
        AvroJob.setOutputKeySchema(job, SALES_SCHEMA);    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}    

Running the MapReduce code using the following command.

hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR /test/input/sales.txt /test/out/sales 

That creates an Avro file as output, to see the content of the output file you can use the following command.

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sales/part-r-00000.avro 

That's all for this topic Using Avro File With Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Avro File in Hadoop
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. How to Compress Intermediate Map Output in Hadoop
  5. Shuffle And Sort Phases in Hadoop MapReduce

You may also like-

  1. How to Write a Map Only Job in Hadoop MapReduce
  2. Shuffle And Sort Phases in Hadoop MapReduce
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. YARN in Hadoop
  6. PermGen Space Removal in Java 8
  7. BlockingQueue in Java Concurrency
  8. How to Create Deadlock in Java Multi-Threading - Java Program

Saturday, December 30, 2023

How to Read And Write Avro File in Hadoop

In this post we’ll see a Java program to read and write Avro files in Hadoop environment.

Required jars

For reading and writing an Avro file using Java API in Hadoop you will need to download following jars and add them to your project's classpath.

  • avro-1.8.2.jar
  • avro-tools-1.8.2.jar
The Avro Java implementation also depends on the Jackson JSON library. so you'll also need
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar

Writing Avro file – Java program

To write an Avro file in Hadoop using Java API steps are as following.

  1. You need an Avro schema.
  2. In your program you will have to parse that scema.
  3. Then you need to create records referring that parsed schema.
  4. Write those records to file.

Avro Schema

Avro schema used for the program is called Person.avsc and it resides in the folder resources with in the project structure.

{
 "type": "record",
 "name": "personRecords",
 "doc": "Personnel Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Address",
   "type": "string"
  }
 ]
}

Java Code

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class AvroFileWrite {

    public static void main(String[] args) {
        Schema schema = parseSchema();
        writetoAvro(schema);
    }
    
    
    // parsing the schema
    private static Schema parseSchema() {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = null;
        try {
            // Path to schema file
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
               "resources/Person.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;        
    }
    
    private static void writetoAvro(Schema schema) {
        GenericRecord person1 = new GenericData.Record(schema);
        person1.put("id", 1);
        person1.put("Name", "Jack");
        person1.put("Address", "1, Richmond Drive");
        
        GenericRecord person2 = new GenericData.Record(schema);
        person2.put("id", 2);
        person2.put("Name", "Jill");
        person2.put("Address", "2, Richmond Drive");
                    
        DatumWriter<GenericRecord> datumWriter = new 
                     GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = null;
        try {
            //out file path in HDFS
            Configuration conf = new Configuration();
            // change the IP
            FileSystem fs = FileSystem.get(URI.create(
                       "hdfs://124.32.45.0:9000/test/out/person.avro"), conf);
            OutputStream out = fs.create(new Path(
                      "hdfs://124.32.45.0:9000/test/out/person.avro"));
                    
            dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
            // for compression
            //dataFileWriter.setCodec(CodecFactory.snappyCodec());
            dataFileWriter.create(schema, out);
            
            dataFileWriter.append(person1);
            dataFileWriter.append(person2);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            if(dataFileWriter != null) {
                try {
                    dataFileWriter.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
        }            
    }
}
To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.
$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 
Then you can run the Java program using the following command.
$ hadoop org.netjs.AvroFileWrite

Reading Avro file – Java program

Java program to read back the Avro file written in the above program in Hadoop environment.

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

public class AvroFileRead {

    public static void main(String[] args) {
        Schema schema = parseSchema();
        readFromAvroFile(schema);
    }
    
    // parsing the schema
    private static Schema parseSchema() {
        Schema.Parser parser = new    Schema.Parser();
        Schema schema = null;
        try {
            // Path to schema file
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
                      "resources/Person.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;        
    }
        
    private static void readFromAvroFile(Schema schema) {
        
        Configuration conf = new Configuration();
        DataFileReader<GenericRecord> dataFileReader = null;
        try {
            // change the IP
            FsInput in = new FsInput(new Path(
                    "hdfs://124.32.45.0:9000/user/out/person.avro"), conf);
            DatumReader<GenericRecord> datumReader = new 
                     GenericDatumReader<GenericRecord>(schema);
            dataFileReader = new DataFileReader<GenericRecord>(in, datumReader);
            GenericRecord person = null;
            while (dataFileReader.hasNext()) {
                person = dataFileReader.next(person);
                System.out.println(person);
            }
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            if(dataFileReader != null) {
                try {
                    dataFileReader.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
        }       
    }
}
You can run the Java program using the following command.
$ hadoop org.netjs.AvroFileRead

{"id": 1, "Name": "Jack", "Address": "1, Richmond Drive"}
{"id": 2, "Name": "Jill", "Address": "2, Richmond Drive"}

That's all for this topic How to Read And Write Avro File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Read And Write SequenceFile in Hadoop
  3. How to Configure And Use LZO Compression in Hadoop
  4. Java Program to Write File in HDFS
  5. How to Compress MapReduce Job Output in Hadoop

You may also like-

  1. How to Check Hadoop MapReduce Logs
  2. How MapReduce Works in Hadoop
  3. Predefined Mapper And Reducer Classes in Hadoop
  4. NameNode, DataNode And Secondary NameNode in HDFS
  5. What is SafeMode in Hadoop
  6. Java Lambda Expressions Interview Questions
  7. Transaction in Java-JDBC
  8. Different Bean Scopes in Spring

Friday, December 29, 2023

Converting Text File to Parquet File Using Hadoop MapReduce

This post shows how to convert existing data to Parquet file format using MapReduce in Hadoop. In the example given here Text file is converted to Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema directly Avro framework is used to create schema as it is more convenient. Then you can use Avro API classes to write and read files respectively. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

MapReduce code to convert file to Parquet format file

In the code Avro schema is defined inline. Program uses Avro genric API to create generic record. Also it’s a Mapper only job as just conversion is required, records are not aggregated.

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.example.data.Group;

public class ParquetConvert extends Configured implements Tool{
    
    /// Schema
    private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"TextFile\",\n" +
            "    \"doc\":    \"Text File\",\n" +
            "    \"fields\":\n" + 
            "    [\n" +  
            "            {\"name\":    \"line\", \"type\":    \"string\"}\n"+
            "    ]\n"+
            "}\n");
    
    // Map function
    public static class ParquetConvertMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
        
        private GenericRecord record = new GenericData.Record(MAPPING_SCHEMA);
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
             record.put("line", value.toString());
             context.write(null, record); 
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetConvert");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetConvertMapper.class);    
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Void.class);
        job.setOutputValueClass(Group.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        // setting schema
        AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetConvert(), args);
        System.exit(exitFlag);
    }
}   
On runnning the MapReduce code using the following command
hadoop jar /PATH_TO_JAR org.netjs.ParquetConvert /test/input /test/output
You can see that the Parquet file is written at the output location.
hdfs dfs -ls /test/output

Found 4 items
-rw-r--r--   1 netjs supergroup          0 2018-07-06 09:54 /test/output/_SUCCESS
-rw-r--r--   1 netjs supergroup        276 2018-07-06 09:54 /test/output/_common_metadata
-rw-r--r--   1 netjs supergroup        429 2018-07-06 09:54 /test/output/_metadata
-rw-r--r--   1 netjs supergroup        646 2018-07-06 09:54 /test/output/part-m-00000.parquet

Reading Parquet file using MapReduce

The following MapReduce program takes Parquet file as input and output a text file. In the Parquet file the records are in following format, so you need to write appropriate logic to extract the relevant part.

{"line": "Hello wordcount MapReduce Hadoop program."}
{"line": "This is my first MapReduce program."}
{"line": "This file will be converted to Parquet using MR."}
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.ExampleInputFormat;

public class ParquetRead extends Configured implements Tool{
    // Map function
    public static class ParquetMapper extends Mapper<NullWritable, Group, NullWritable, Text> {
         public void map(NullWritable key, Group value, Context context) 
                 throws IOException, InterruptedException {
             NullWritable outKey = NullWritable.get();
             String line = value.toString();
             
             String[] fields = line.split(": ");
             context.write(outKey, new Text(fields[1]));
                     
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetRead");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetMapper.class);    
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(ExampleInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetRead(), args);
        System.exit(exitFlag);
    }

}
If you want to read back the data you got using the writing to Parquet MapReduce program you can use the following command.
hadoop jar /PATH_TO_JAR org.netjs.ParquetRead /test/output/part-m-00000.parquet /test/out

That's all for this topic Converting Text File to Parquet File Using Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Read And Write Parquet File in Hadoop
  2. Using Avro File With Hadoop MapReduce
  3. How to Configure And Use LZO Compression in Hadoop
  4. MapReduce Flow in YARN
  5. Input Splits in Hadoop

You may also like-

  1. Data Locality in Hadoop
  2. Speculative Execution in Hadoop
  3. YARN in Hadoop
  4. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  5. Serialization in Java
  6. Nested Class And Inner Class in Java
  7. Reflection in Java
  8. Print Odd-Even Numbers Using Threads And wait-notify - Java Program

Thursday, December 28, 2023

Spring Boot Event Driven Microservice With Kafka

In this post we'll see how to write event driven microservice using Spring Boot and Kafka.

In the example we'll see how to use Kafka as a message broker that enables service to communicate with each other and exchange information.

Message brokers can validate, store, route, and deliver messages to the required destinations. They act as a middleware allowing producers to send messages without having any location information about the consumers of the messages, without knowing whether consumers are currently active or not or what is the count of consumers. This abstraction facilitates decoupling of services within systems and asynchronous communication among the services.


Kafka - Brief introduction

Apache Kafka is an open-source distributed event streaming platform which combines the following three capabilities-

  • To publish (write) and subscribe to (read) streams of events
  • To store streams of events durably and reliably for as long as you want.
  • To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

Some of the terminology used in Kafka is as given below-

  1. Events- An event records the fact that "something happened" in your application. When you read or write data to Kafka, you do this in the form of events. The message you write to Kafka may have a key and data which can be a String, a complex object.
  2. Producers- Those client applications that publish (write) events to Kafka,
  3. Consumers- Those applications that subscribe to (read and process) these events.
    In Kafka, producers and consumers are fully decoupled and agnostic of each other. This clear separation helps Kafka in achieving high scalability.
  4. Topics- Events are organized and durably stored in topics. Topics in Kafka are always multi-producer and multi-subscriber which means a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.
    Unlike traditional messaging systems, in Kafka, events are not deleted after consumption.
    Topics are partitioned, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. When a new event is published to a topic, it is actually appended to one of the topic's partitions. Which partition to send the event is determined by the key passed with the event. Events with the same event key (For example customer ID, account ID ) are written to the same partition, and Kafka guarantees that any consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written.
kafka topics

Events with the same key (denoted by their color in the figure) are written to the same partition.

Kafka as message broker

Kafka works well as a replacement for a more traditional message broker. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.

Spring Boot Microservice + Kafka Example

In the example we'll create an order-service which acts as a producer of orders and order-consumer-service which gets that order data and then do further processing of those orders (for example do payment and then change the order status to processed or rejected).

We'll configure a Kafka topic, order-service will write orders to that topic, order-consumer-service will read orders from the topic.

order-service Microservice

In STS create a new Spring Starter Project and give details as given here.

Name: order-service. Java version used is 17, packaging is Jar and project type is Maven. In the next window select the dependencies as shown in the following image.

Spring Boot Kafka

The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for @KafkaListener annotation.

With these dependencies added the created pom.xml should look like this-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.1</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.netjstech</groupId>
  <artifactId>order-service</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>order-service</name>
  <description>Order Producer Service
</description>
  <properties>
    <java.version>17</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

OrderDto class

POJO class used to produce and consume data.

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderDto {
  private List<String> items;
  private double amount;
  private String status;
}

Controller Class

Class that will receive request for the designated path and move it forward to Service class.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;
import com.netjstech.orderservice.service.OrderService;

@RestController
@RequestMapping("/order")
public class OrderController {
  @Autowired
  private OrderService orderService;
  
  @PostMapping
  public void createOrder(@RequestBody OrderDto order) throws JsonProcessingException {
    orderService.sendOrder(order);
  }
}

Order Service interface and Impl class

import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;

public interface OrderService {
	public String sendOrder(OrderDto order) throws JsonProcessingException ;
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;

@Service
public class OrderServiceImpl implements OrderService{
	@Autowired
	private OrderProducer orderProducer;
	
    public String sendOrder(OrderDto order) throws JsonProcessingException {
        return orderProducer.send(order);
    }
	
}

This is the code flow for sending data but we do need to do some configuration for Kafka. Let's start with application.yml file where we'll add the Kafka bootstrap servers and the topic name.

server:
  port: 8081
  
spring:
  application:
    name: order-service

  kafka:
    bootstrap-servers: localhost:9092  
    topic:
      name: order-topic

With bootstrap-servers property you can provided a comma separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Here we have only a single server.

Producer Configuration

You can provide producer configuration in the application.yml file or create a Java class annotated with @Configuration and provide configuration there. I prefer the latter though with properties file it will look something like this.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
    topic:
      name: order-topic

Here is the OrderProducerConfig.java class that is used in this example to provide producer configuration.

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;

@Configuration
public class OrderProducerConfig {
  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;
  @Value("${spring.kafka.topic.name}")
  private String topicName;

    @Bean
    public Map<String, Object> producerConfig() {
      Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        return props;
    }
    
    @Bean
    public ProducerFactory<String, Object> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder
                .name(topicName)
                .partitions(1)
                .replicas(1)
                .build();
    }
}

Description for the configuration values provided for the Kafka producers are as given below-

KEY_SERIALIZER_CLASS_CONFIG- Serializer class for key

VALUE_SERIALIZER_CLASS_CONFIG- Serializer class for value

LINGER_MS_CONFIG- Using this configuration you can delay the sending of records rather than sending them immediately. Producer waits for the given duration and group the records arriving during that time to send them together.

ACKS_CONFIG- The number of acknowledgments the producer requires the leader to have received before considering a request complete. acks=all means the leader will wait for the full set of in-sync replicas to acknowledge the record.

REQUEST_TIMEOUT_MS_CONFIG- The configuration controls the maximum amount of time the client will wait for the response of a request.

RETRIES_CONFIG- Setting a value greater than zero will cause the client to resend any record whose send fails.

Using these configurations DefaultKafkaProducerFactory creates a singleton shared Producer instance.

KafkaTemplate is a template for executing high-level operations. When used with a DefaultKafkaProducerFactory, the template is thread-safe.

A NewTopic instance is also created by passing the topic name which was configured in the application.yml file.

OrderProducer class

For sending message to a specific topic using KafkaTemplate is done in a separate OrderProducer class. ObjectMapper class' writeValueAsBytes() method is used to serialize Order object to byte array.

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netjstech.orderservice.dto.OrderDto;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class OrderProducer {
  private final KafkaTemplate<String, Object> kafkaTemplate;
  private final NewTopic topicName;
  private final ObjectMapper objectMapper;

  public OrderProducer(NewTopic topic, KafkaTemplate<String, Object> kafkaTemplate, ObjectMapper objectMapper) {
    this.topicName = topic;
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
  }

  public String send(OrderDto order) throws JsonProcessingException {     
    log.info("Sending message='{}' to topic='{}'", order.toString(), topicName);                 
    
    kafkaTemplate.send(topicName.name(), objectMapper.writeValueAsBytes(order));
    return "message sent";
  }
}

That concludes the Producer part before moving to the Consumer part let's see how to install Apache Kafka.

Installing Kafka

Simplest way to get Kafka is to run it as a container, here is a docker-compose.yml file that can be used to do that.

version: "3"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    ports:
      - "9092:9092"
    container_name: broker
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
  kafka_data:
    driver: local

Note that Kafka 3.3.1. onward Zookeeper is not required to store metadata about Kafka cluster. Apache Kafka Raft (KRaft) simplifies Kafka's architecture by consolidating responsibility for metadata into Kafka itself, rather than splitting it between two different systems: ZooKeeper and Kafka. That is why Zookeeper instance is not configured in the compose file.

Also note that the port in bootstrap-servers you provided in service (9092) matches the port in your compose file.

To build and start the container use the following command

docker compose up -d

-d switch to run it in detached mode.

After that you can use docker compose stop and docker compose start to stop and start these containers.

Just ensure that all the containers are up by running the following command.

docker ps -a

order-consumer-service Microservice

In STS create a new Spring Starter Project and give details as given here.

Name: order-consumer-service. Java version used is 17, packaging is Jar and project type is Maven. Dependencies are same as given in order-service.

Configuration properties

Since consumer needs to read the data written by producer so topic name for consumer should be same as producer.

application.yml

server:
  port: 8082
    
spring:
  application:
    name: order-consumer-service

  kafka:
    bootstrap-servers: localhost:9092  
    topic:
      name: order-topic

OrderDto class

POJO class used to produce and consume data. This class is same as in order-service.

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderDto {
  private List<String> items;
  private double amount;
  private String status;
}

Consumer Configuration

A Java configuration class which provides consumer configuration.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class OrderConsumerConfig {
  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;
  @Value("${spring.kafka.topic.name}")
  private String topicName;
  
    @Bean
    public Map<String, Object> consumerConfig() {
      Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Note the use of ConsumerConfig.GROUP_ID_CONFIG in the class which sets the consumer group ID. The Group ID determines which consumers belong to which group. If there are four consumers with the same Group ID assigned to the same topic, they will all share the work of reading from the same topic.

You can use a particular Group ID's offset to check whether there's been new data written to the partition. If there's an event with a larger offset, that means there's new data to read. If there are more consumers with the same group ID and same topic then any one of them can read the new data which is the functionality you may want. In our example which passes order, more consumers having the same group ID will work. Any order is read by only a single consumer. But there are scenarios where you may not want that, in that case group IDs for the consumer should not be same.

Using these configurations DefaultKafkaConsumerFactory produces new Consumer instances.

ConcurrentKafkaListenerContainerFactory is a KafkaListenerContainerFactory implementation to build a ConcurrentMessageListenerContainer. This factory is primarily for building containers for KafkaListener annotated methods but can also be used to create any container.

OrderConsumerService

Service class that has a method annotated with @KafkaListener which marks a method to be the target of a Kafka message listener on the specified topics. With @KafkaListener annotation we can specify a topic.

import java.io.IOException;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netjstech.orderconsumerservice.dto.OrderDto;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class OrderConsumerService {
  private final ObjectMapper objectMapper;
  public OrderConsumerService(ObjectMapper objectMapper) {
    this.objectMapper = objectMapper;
  }
  
  @KafkaListener(topics = "${spring.kafka.topic.name}")
  public void orderConsumer(byte[] message) throws IOException {
    OrderDto order = objectMapper.readValue(message, OrderDto.class);
    log.info("Order data- " + order.toString());
    // Order status can be changed once it fulfills some criteria
    order.setStatus("Processed");
    log.info("Order data- " + order.toString());
  }
}

Testing Kafka and microservices integration

Start both the Services and also ensure Kafka container is up. Using Postman send an order to the order-service listening on port 8081

Spring Boot Kafka Example

Once the order is sent to order-service it should be written to the configured topic by Kafka. Consumer reads the Order data from topic then you can further process the order by doing payment and changing the status to processed or rejected based on whether payment was successful or failed and pass that event to another microservice for further processing of order.

In the order-consumer-service you can verify the logs to see that the order is read by the consumer.

[2m2023-12-27T14:06:46.182+05:30 INFO 17940 --- [order-consumer-service] [ntainer#0-0-C-1] c.n.o.service.OrderConsumerService : Order data- OrderDto(items=[RAM, Laptop, Mouse], amount=725.5, status=ordered)
[2m2023-12-27T14:06:46.182+05:30 INFO 17940 --- [order-consumer-service] [ntainer#0-0-C-1] c.n.o.service.OrderConsumerService : Order data- OrderDto(items=[RAM, Laptop, Mouse], amount=725.5, status=Processed)

Reference: https://kafka.apache.org/documentation

That's all for this topic Spring Boot Event Driven Microservice With Kafka. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Spring Tutorial Page


Related Topics

  1. Spring Boot Microservice + API Gateway + Resilience4J
  2. Spring Boot Observability - Distributed Tracing, Metrics
  3. Spring Boot Microservice - Externalized Configuration With Spring Cloud Config
  4. Spring Boot Microservice - Load-Balancing With Spring Cloud LoadBalancer

You may also like-

  1. Spring MVC Excel Generation Example
  2. Spring JdbcTemplate Insert, Update And Delete Example
  3. BeanPostProcessor in Spring Framework
  4. Spring Object XML Mapping (OXM) JAXB Example
  5. Is String Thread Safe in Java
  6. Java Concurrency Interview Questions And Answers
  7. JavaScript Rest Parameter
  8. Angular Reactive Form Example

Wednesday, December 27, 2023

Multi-Catch Statement in Java Exception Handling

There has always been criticism of checked exceptions in Java exception handling for being verbose and cluttering the code with try-catch blocks.
In Java 7 two new features- try-with-resources (Automatic resource management) and multi-catch statement have been added to mitigate that problem to certain extent.

In this post we'll talk about multi-catch statement in Java exception handling along with examples to see how to handle more than one type of exception in single catch block using multi-catch statement.

Handling more than one type of exception without multi-catch statement

Before Java 7 multi-catch statement, if two or more exceptions were handled in the same way, we still had to write separate catch blocks for handling them.

catch(IOException exp){
  logger.error(exp);
  throw exp;
}catch(SQLException exp){
  logger.error(exp);
  throw exp;
}

It can be noted here that; though the catch blocks are having the same exception handling code, it is difficult to create a common catch block to eliminate duplicate code because variable exp has different types in both of the catch block.

Java multi-catch - Handling more than one type of exception

Java 7 onward it is possible to catch multiple exceptions in one catch block, which eliminates the duplicated code. Each exception type within the multi-catch statement is separated by Pipe symbol (|).

catch(IOException | SQLException exp){
  logger.error(exp);
  throw exp;
}

Note that if a catch block handles more than one exception type, then the catch parameter is implicitly final. In this example, the catch parameter exp is final therefore you cannot assign any values to it within the catch block.

Java Multi-catch statement means better code

According to JavaDoc- "Bytecode generated by compiling a catch block that handles multiple exception types will be smaller (and thus superior) than compiling many catch blocks that handle only one exception type each. A catch block that handles multiple exception types creates no duplication in the bytecode generated by the compiler; the bytecode has no replication of exception handlers."

Multi-catch statement & Exception hierarchy

While using multi-catch statement you will have to keep in mind the following rule.

If you specify two or more exceptions of the same hierarchy in the multi-catch statement, it will result in compile time error.
For example, following catch statement gives compiler error because FileNotFoundException is a subtype of the IOException class.

catch (FileNotFoundException | IOException ex) {    
 Logger.error(ex);   
}

Or the following statement, which also results in compile time error as Exception is super type of both ArithmeticException and ArrayIndexOutOfBoundsException.

// This will give compiler error
catch(Exception | ArithmeticException | ArrayIndexOutOfBoundsException ex){
  ex.printStackTrace();
}

Points to note

  1. With multi-catch statement in Java it is possible to catch multiple exceptions in one catch block, which eliminates the duplicated code.
  2. Each exception type within the multi-catch statement is separated by Pipe symbol (|).
  3. Bytecode generated by compiling a catch block that handles multiple exception types will be smaller (and thus superior) than compiling many catch blocks that handle only one exception type each.
  4. Specifying two or more exceptions of the same hierarchy in the multi-catch statement will result in compile time error.

Reference: https://docs.oracle.com/javase/8/docs/technotes/guides/language/catch-multiple.html

That's all for this topic Multi-Catch Statement in Java Exception Handling. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Try-With-Resources in Java With Examples
  2. Multiple Catch Blocks in Java Exception Handling
  3. Difference Between Checked And Unchecked Exceptions in Java
  4. Difference Between throw And throws in Java
  5. final Vs finally Vs finalize in Java

You may also like-

  1. Java Pass by Value or Pass by Reference
  2. Why Class Name And File Name Should be Same in Java
  3. Marker Interface in Java
  4. Difference Between Abstract Class And Interface in Java
  5. BigDecimal in Java With Examples
  6. Why wait(), notify() And notifyAll() Methods Are in Object Class And Not in Thread Class
  7. Deadlock in Java Multi-Threading
  8. Java Stream API Tutorial

Tuesday, December 26, 2023

Java Stream - distinct() With Examples

In this tutorial you will see some examples of distinct() method in Java Stream API. Stream.distinct() method is used to remove duplicate elements and the method returns a new stream consisting of the distinct elements. To determine which elements are duplicates distinct() method uses the equals() method of the Object class.

distinct() in Java Stream

Syntax of the distinct() method is as given below-

Stream<T> distinct()

Here are some important points about the distinct operation.

  1. It is a stateful intermediate operation which means it may incorporate state from previously seen elements when processing new elements.
  2. Method takes no arguments and returns a new Stream consisting of the distinct elements.
  3. For ordered streams, distinct operation is stable i.e. if there are duplicate elements the element appearing first in the encounter order is preserved.
  4. For unordered streams there are no stability guarantees.

Monday, December 25, 2023

Java Stream - limit() With Examples

In Java Stream API limit(long maxSize) method is used to truncate the stream so that it is not longer than maxSize in length and the method returns a stream consisting of that many elements of this stream.

Java Stream limit() method

Syntax of the limit method is as given below-

Stream<T> limit(long maxSize)

Here maxSize argument represents the number of elements the stream should be limited to.

If maxSize is negative then IllegalArgumentException is thrown otherwise a new Stream is returned which is no longer than maxSize in length.

Notes about limit() method

  • limit() is generally considered a cheap operation on sequential stream pipelines
  • limit() can be quite expensive on ordered parallel pipelines, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order.
  • It is a short-circuiting stateful intermediate operation. Since limit() method returns a new stream that makes it a stateful intermediate operation. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result.

limit() Java examples

1. Getting a sublist from a List by limiting the number of elements to the first n elements of the original list.

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class StreamLimit {
  public static void main(String[] args) {
    StreamLimit sl = new StreamLimit();
    List<Integer> numList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<Integer> subList = numList.stream().limit(5).collect(Collectors.toList());     
    System.out.println("Sublist after limiting elements- " + subList);
  }
}

Output

Sublist after limiting elements- [1, 2, 3, 4, 5]

2. Getting 10 random numbers by using limit() with generate() method.

public class StreamLimit {
  public static void main(String[] args) {
    Stream.generate(Math::random)
          .map(n -> (int)(n * 100))
          .limit(10)
          .forEach(System.out::println);
  }
}

That's all for this topic Java Stream - limit() With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Advanced Tutorial Page


Related Topics

  1. Java Stream - skip() With Examples
  2. Java Stream - distinct() With Examples
  3. Java Stream - max() With Examples
  4. Primitive Type Streams in Java Stream API
  5. Java Stream API Interview Questions And Answers

You may also like-

  1. Just In Time Compiler (JIT) in Java
  2. Byte Streams in Java IO
  3. How to Get The Inserted ID (Generated ID) in JDBC
  4. Method Reference in Java
  5. java.lang.ClassCastException - Resolving ClassCastException in Java
  6. Creating PDF in Java Using Apache PDFBox
  7. Method Overloading in Python
  8. Spring Boot Spring Initializr

Sunday, December 24, 2023

Java Stream - count() With Examples

In this tutorial you’ll learn about the count() method in the Java Stream API with the help of few examples.

count() method in Java

count() method returns the count of elements in the stream.

Syntax of count() method-

long count()

Some important points about count() method-

  1. It is a special case of a reduction operation which means it takes a sequence of input elements and reduce them into a single summary result.
  2. It is a terminal operation which means after count() operation the stream pipeline is considered consumed, and can no longer be used.

count() method Java examples

1. In the first example count() is used to get the count of elements in a List where List is used as a stream source.

public class StreamCountDemo {

  public static void main(String[] args) {
    List<Integer> numList = Arrays.asList(7, 5, 18, -11, 22, -8);
    long elementCount = numList.stream().count();
    System.out.println("Count of elements- " + elementCount);  
  }
}

Output

Count of elements- 6

2. Since count is a terminal operation so stream is considered closed after count operation but before count, intermediate operations like filter can be used to filter out certain elements and then get the count of the resulting stream. For example if we want count of positive elements in a list.

public class StreamCountDemo {

  public static void main(String[] args) {
    List<Integer> numList = Arrays.asList(7, 5, 18, -11, 22, -8);
    long elementCount = numList.stream().filter(n -> n > 0).count();
    System.out.println("Count of elements- " + elementCount);  
  }
}

Output

Count of elements- 4

3. In the following example count is used to get the count of employees having salary greater than 10000.

Employee class

public class Employee {
  private String empId;
  private int age;
  private String name;
  private char gender;
  private int salary;
  Employee(String empId, int age, String name, char gender, int salary){
    this.empId = empId;
    this.age = age;
    this.name = name;
    this.gender = gender;
    this.salary = salary;
  }
  public String getEmpId() {
    return empId;
  }

  public int getAge() {
    return age;
  }

  public String getName() {
    return name;
  }

  public char getGender() {
    return gender;
  }

  public int getSalary() {
    return salary;
  }
  @Override
  public String toString() {
      return "Emp Id: " +  getEmpId() + " Name: " + getName() + " Age: " + getAge();
  }
}
public class StreamCountDemo {

  public static void main(String[] args) {
    List<Employee> empList = Arrays.asList(new Employee("E001", 40, "Ram", 'M', 5000), 
                new Employee("E002", 35, "Shelly", 'F', 7000), 
                new Employee("E003", 40, "Remington", 'M', 5000), 
                new Employee("E004", 37, "Bianca", 'F', 11000),
                new Employee("E005", 35, "Dominic", 'M', 7000), 
                new Employee("E006", 28, "Amanda", 'F', 14000));
    long elementCount = empList.stream().filter(e -> e.getSalary() > 10000).count();
    System.out.println("Count of elements- " + elementCount);  
  }
}

Output

Count of elements- 2

That's all for this topic Java Stream - count() With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Advanced Tutorial Page


Related Topics

  1. Java Stream - boxed() With Examples
  2. Java Stream - sorted() With Examples
  3. Java Stream - distinct() With Examples
  4. Parallel Stream in Java Stream API
  5. Java Stream API Interview Questions And Answers

You may also like-

  1. Type Erasure in Java Generics
  2. SerialVersionUID And Versioning in Java Serialization
  3. Difference Between CountDownLatch And CyclicBarrier in Java
  4. Difference Between StackOverflowError and OutOfMemoryError in Java
  5. Docker Tutorial: Introduction to Docker
  6. Benefits, Disadvantages And Limitations of Autowiring in Spring
  7. Spring MVC JSON as Response Example
  8. Angular Access Control CanActivate Route Guard Example

Saturday, December 23, 2023

Java Stream API Examples

In the post Java Stream API Tutorial we have already got an introduction of Stream API. A Stream can be defined as a sequence of elements supporting sequential and parallel aggregate operations. Using these aggregation operations we can create a pipeline. Some of the aggregation operations provided are collect, concat, count, distinct, filter, forEach, limit, map, max, min, reduce, sorted. In this post we’ll see some Java stream API examples using these operations and also create pipeline consisting sequence of aggregate operations.


Java Stream API count() method example

count method returns the count of elements in the given stream.

Note that this is a special case of a reduction and it is a terminal operation.

List<Integer> myList = Arrays.asList(7, 18, 10, 24, 17, 5);  
long count = myList.stream().count();
System.out.println("Total elements in the list " + count);

This code snippet will give the count of the elements in the List.

Now if you want to get the count of the elements greater than 10 you can create a pipeline where you first filter on the predicate that you want those elements of the list whose value is greater than 10 and then count those elements.

List<Integer> myList = Arrays.asList(7, 18, 10, 24, 17, 5); 
long count = myList.stream().filter(i -> i > 10).count();
System.out.println("Total elements in the list with value greater than 10 " + count);

Java Stream API concat() method example

concat() method in Java Stream creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream.

List<String> myList = Arrays.asList("1", "2", "3", "4", "5");
  
String[] arr1 = { "a", "b", "c", "d" };
// concatenating two streams
Stream<String> stream = Stream.concat(myList.stream(), Arrays.stream(arr1));
stream.forEach(System.out::print);

Output

12345abcd

Here you can see the concatenated stream is returned. If you are wondering what is this System.out::print refer Method reference in Java 8. You may also want to read about forEach statement in Java 8.

Since parameters of the concat operations are streams so all the aggregation operations can be applied to them too. As example if there are two lists having name and you want a merged list with all the names that start with “A” that can be done as follows–

List<String> nameList1 = Arrays.asList("Ram", "Amit", "Ashok", "Manish", "Rajat");
  
List<String> nameList2 = Arrays.asList("Anthony", "Samir", "Akash", "Uttam");
  
String[] arr1 = { "a", "b", "c", "d" };
// concatenating two streams
Stream<String> stream = Stream.concat(nameList1.stream().filter(n -> n.startsWith("A")), nameList2.stream().filter(n -> n.startsWith("A")));

stream.forEach(System.out::println);

Java Stream API distinct() method example

Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.

Using distinct method of the Java Stream API, duplicate elements from a collection like list can be removed very easily by creating a pipeline where distinct method will return a stream having distinct elements only which can later be collected in a list using collect method.

List<Integer> myList = Arrays.asList(7, 18, 10, 7, 10, 24, 17, 5);
  
System.out.println("Original list: " + myList);
List<Integer> newList = myList.stream().distinct().collect(Collectors.toList());

System.out.println("new List : " + newList);

Java Stream API filter() method example

filter method returns a stream consisting of the elements of this stream that match the given predicate.

Here note that Predicate is a functional interface and can be implemented as a lambda expression. In the above examples we have already used filter method.

As an example let’s say we have a list of names and we want to print names which doesn’t start with “A”.

List<String> nameList = Arrays.asList("Ram", "Amit", "Ashok", "Manish", "Rajat");
  
nameList.stream().filter(n -> !n.startsWith("A")).collect(Collectors.toList()).forEach(System.out::println);

Output

Ram
Manish
Rajat

Java Stream API limit() method example

Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.

If you want 10 random numbers, then you can use limit method with the int stream.

Random random = new Random();
random.ints().limit(10).forEach(System.out::println);

Java Stream API map() method example

Returns a stream consisting of the results of applying the given function to the elements of this stream. So, whatever function is provided is applied on all the elements of the stream. Note that this is an intermediate operation.

As Example – If you have a list of salaries and you want to increase it by 10%.

List<Integer> myList = Arrays.asList(7000, 5000, 4000, 24000, 17000, 6000);
  
myList.stream().map(i -> (i+ i * 10/100)).forEach(System.out::println);

findFirst() and findAny() methods in Java Stream API

  • findFirst()- Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. If the stream has no encounter order (List or Array wil be ordered, where as set or map won’t), then any element may be returned.
  • findAny()- Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty. The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream. This is to allow for maximal performance in parallel operations; the cost is that multiple invocations on the same source may not return the same result. (If a stable result is desired, use findFirst() instead.)
List<String> nameList = Stream.of("amy", "nick", "margo", "desi");
Optional<String> name = nameList.stream().findFirst();
System.out.println("First Name " + name);

name = nameList.parallelStream().findAny();
System.out.println("First Name " + name);

Output

First Name Optional[amy]
First Name Optional[margo]

You can see in case of findFirst() method, first element of the list is displayed. Even with parallelStream, findFirst() will give the first element.

Whereas in case of findAny() method any random element is picked. You can see that findAny() method is used with parallelStream here.

max and min methods in Java Stream API

  • max- Returns the maximum element of this stream according to the provided Comparator.
  • min- Returns the minimum element of this stream according to the provided Comparator.

max and min are also reduction operations. Both of them are terminal operations.

List<Integer> myList = Arrays.asList(7000, 5000, 4000, 24000, 17000, 6000);
// Obtain a Stream to the array list.
Stream<Integer> myStream = myList.stream();
Optional<Integer> val = myStream.min(Integer::compare);
if(val.isPresent()){
 System.out.println("minimum value in the list " + val.get());
}  
Optional<Integer> val1 = myList.stream().max(Integer::compare);
if(val1.isPresent()){
 System.out.println("maximum value in the list " + val1.get());
}

Note that here Optional class is used. To know more about Optional class refer Optional class in Java 8.

Java Stream API sorted() method example

sorted method returns a stream consisting of the elements of this stream, sorted according to natural order or there is another variant where custom comparator can be provided.

List<Integer> myList = Arrays.asList(7000, 5000, 4000, 24000, 17000, 6000);
myList.stream().sorted().forEach(System.out::println);

Summary Statistics classes

A state object for collecting statistics such as count, min, max, sum, and average. There are different SummaryStatistics classes in Java Stream API like IntSummaryStatistics, DoubleSummaryStatistics, LongSummaryStatistics.

As example–

 
List<Integer> myList = Arrays.asList(7, 5, 4, 24, 17, 6);
IntSummaryStatistics stats = myList.stream().collect(Collectors.summarizingInt(i-> i));

System.out.println("Sum - " + stats.getSum());
System.out.println("Count " + stats.getCount());
System.out.println("Average " + stats.getAverage());
System.out.println("Max " + stats.getMax());
System.out.println("Min " + stats.getMin());

Here Collectors.summarizingInt method is used which applies an int-producing mapping function to each input element, and returns summary statistics for the resulting values.

In place of

IntSummaryStatistics stats = myList.stream().collect(Collectors.summarizingInt(i-> i));

Using mapToInt method it can also be written as -

IntSummaryStatistics stats = myList.stream().mapToInt(i -> i).summaryStatistics();

That's all for this topic Java Stream API Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Advanced Tutorial Page


Related Topics

  1. Parallel Stream in Java Stream API
  2. Primitive Type Streams in Java Stream API
  3. Reduction Operations in Java Stream API
  4. Lambda Expressions in Java 8
  5. Java Stream API Interview Questions And Answers

You may also like-

  1. How HashMap Works Internally in Java
  2. How ArrayList Works Internally in Java
  3. Java ReentrantLock With Examples
  4. Difference Between ArrayList And CopyOnWriteArrayList in Java
  5. Lock Striping in Java Concurrency
  6. Java ThreadLocal Class With Examples
  7. Spring NamedParameterJdbcTemplate Insert, Update And Delete Example
  8. How to Inject Prototype Scoped Bean into a Singleton Bean in Spring

Friday, December 22, 2023

What Are JVM, JRE And JDK in Java

This post gives a brief explanation of JVM, JRE and JDK in Java. Before going into that explanation you should also know what is bytecode in Java.

What is bytecode in Java

When a Java program is compiled, it is not directly compiled into machine language but into an intermediate code known as bytecode. Bytecode is platform independent and it still needs to be interpreted and executed by the JVM installed on the specific platform.

For example let's say you have a Java file called "Test.java". When you compile this file you get a file called "Test.class" which is the bytecode for your Java file. JVM interprets and executes this Test.class file.

JVM

JVM meaning Java Virtual Machine is an abstract layer between a Java program and the platform that Java Program is running on. JVM is platform dependent and different implementations of JVMs are available for specific platforms.

A Java program can run on a specific platform only when-

  • JVM has been implemented for a platform.
  • JVM has been installed on a platform.

The JVM doesn't understand Java program as we write it, it understands the ".class" file which we get by compiling the .java file. This ".class" file contains the bytecode understandable by the JVM. It is because of JVM that Java is called a "portable language" (write once, run anywhere)

Following figure shows the abstraction provided by JVM by sitting in between the bytecode and the specific platform.

Bytecode interpretation by JVM

JRE

JRE meaning Java Runtime Environment provides the libraries, the Java Virtual Machine, and other components to run applets and applications written in the Java programming language.

The compiled bytecode doesn't run on CPU directly, JVM sits in between and interpret the bytecode into readable machine language for the CPU. It is actually the JRE that enables Java bytecode to run on any platform. Bytecodes, which are interpreted by the JVM, simply call classes found in the JRE when they need to perform actions they cannot do by themselves

JVM and JRE in Java

JDK

JDK meaning Java Development Kit is a superset of the JRE, and contains everything that is in the JRE, plus development tools such as the compilers and debuggers necessary for developing applets and applications.

That's all for this topic What Are JVM, JRE And JDK in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Basics Tutorial Page


Related Topics

  1. Why main Method static in Java
  2. Why Class Name And File Name Should be Same in Java
  3. Java Pass by Value or Pass by Reference
  4. static Keyword in Java With Examples
  5. Core Java Basics Interview Questions And Answers

You may also like-

  1. Class in Java
  2. Object in Java
  3. Encapsulation in Java
  4. Constructor Chaining in Java
  5. Fail-Fast Vs Fail-Safe Iterator in Java
  6. Difference Between Comparable and Comparator in Java
  7. Java Semaphore With Examples
  8. Spring Bean Life Cycle