Thursday, February 29, 2024

Spring Boot Event Driven Microservice With Kafka

In this post we'll see how to write event driven microservice using Spring Boot and Kafka. In the example we'll see how to use Kafka as a message broker that enables services to communicate with each other and exchange information.

Message brokers can validate, store, route, and deliver messages to the required destinations. They act as a middleware allowing producers to send messages without having any location information about the consumers of the messages, without knowing whether consumers are currently active or not or what is the count of consumers. This abstraction facilitates decoupling of services within systems and asynchronous communication among the services.


Kafka - Brief introduction

Apache Kafka is an open-source distributed event streaming platform which combines the following three capabilities-

  • To publish (write) and subscribe to (read) streams of events
  • To store streams of events durably and reliably for as long as you want.
  • To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

Some of the terminology used in Kafka is as given below-

  1. Events- An event records the fact that "something happened" in your application. When you read or write data to Kafka, you do this in the form of events. The message you write to Kafka may have a key and data which can be a String, a complex object.
  2. Producers- Those client applications that publish (write) events to Kafka,
  3. Consumers- Those applications that subscribe to (read and process) these events.
    In Kafka, producers and consumers are fully decoupled and agnostic of each other. This clear separation helps Kafka in achieving high scalability.
  4. Topics- Events are organized and durably stored in topics. Topics in Kafka are always multi-producer and multi-subscriber which means a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.
    Unlike traditional messaging systems, in Kafka, events are not deleted after consumption.
    Topics are partitioned, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. When a new event is published to a topic, it is actually appended to one of the topic's partitions. Which partition to send the event is determined by the key passed with the event. Events with the same event key (For example customer ID, account ID ) are written to the same partition, and Kafka guarantees that any consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written.
kafka topics

Events with the same key (denoted by their color in the figure) are written to the same partition.

Kafka as message broker

Kafka works well as a replacement for a more traditional message broker. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.

Spring Boot Microservice + Kafka Example

In the example we'll create an order-service which acts as a producer of orders and order-consumer-service which gets that order data and then do further processing of those orders (for example do payment and then change the order status to processed or rejected).

We'll configure a Kafka topic, order-service will write orders to that topic, order-consumer-service will read orders from the topic.

order-service Microservice

In STS create a new Spring Starter Project and give details as given here.

Name: order-service. Java version used is 17, packaging is Jar and project type is Maven. In the next window select the dependencies as shown in the following image.

Spring Boot Kafka

The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for @KafkaListener annotation.

With these dependencies added the created pom.xml should look like this-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.1</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.netjstech</groupId>
  <artifactId>order-service</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>order-service</name>
  <description>Order Producer Service
</description>
  <properties>
    <java.version>17</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

OrderDto class

POJO class used to produce and consume data.

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderDto {
  private List<String> items;
  private double amount;
  private String status;
}

Controller Class

Class that will receive request for the designated path and move it forward to Service class.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;
import com.netjstech.orderservice.service.OrderService;

@RestController
@RequestMapping("/order")
public class OrderController {
  @Autowired
  private OrderService orderService;
  
  @PostMapping
  public void createOrder(@RequestBody OrderDto order) throws JsonProcessingException {
    orderService.sendOrder(order);
  }
}

Order Service interface and Impl class

import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;

public interface OrderService {
	public String sendOrder(OrderDto order) throws JsonProcessingException ;
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.netjstech.orderservice.dto.OrderDto;

@Service
public class OrderServiceImpl implements OrderService{
	@Autowired
	private OrderProducer orderProducer;
	
    public String sendOrder(OrderDto order) throws JsonProcessingException {
        return orderProducer.send(order);
    }
	
}

This is the code flow for sending data but we do need to do some configuration for Kafka. Let's start with application.yml file where we'll add the Kafka bootstrap servers and the topic name.

server:
  port: 8081
  
spring:
  application:
    name: order-service

  kafka:
    bootstrap-servers: localhost:9092  
    topic:
      name: order-topic

With bootstrap-servers property you can provided a comma separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Here we have only a single server.

Producer Configuration

You can provide producer configuration in the application.yml file or create a Java class annotated with @Configuration and provide configuration there. I prefer the latter though with properties file it will look something like this.

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
    topic:
      name: order-topic

Here is the OrderProducerConfig.java class that is used in this example to provide producer configuration.

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;

@Configuration
public class OrderProducerConfig {
  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;
  @Value("${spring.kafka.topic.name}")
  private String topicName;

    @Bean
    public Map<String, Object> producerConfig() {
      Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        return props;
    }
    
    @Bean
    public ProducerFactory<String, Object> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder
                .name(topicName)
                .partitions(1)
                .replicas(1)
                .build();
    }
}

Description for the configuration values provided for the Kafka producers are as given below-

KEY_SERIALIZER_CLASS_CONFIG- Serializer class for key

VALUE_SERIALIZER_CLASS_CONFIG- Serializer class for value

LINGER_MS_CONFIG- Using this configuration you can delay the sending of records rather than sending them immediately. Producer waits for the given duration and group the records arriving during that time to send them together.

ACKS_CONFIG- The number of acknowledgments the producer requires the leader to have received before considering a request complete. acks=all means the leader will wait for the full set of in-sync replicas to acknowledge the record.

REQUEST_TIMEOUT_MS_CONFIG- The configuration controls the maximum amount of time the client will wait for the response of a request.

RETRIES_CONFIG- Setting a value greater than zero will cause the client to resend any record whose send fails.

Using these configurations DefaultKafkaProducerFactory creates a singleton shared Producer instance.

KafkaTemplate is a template for executing high-level operations. When used with a DefaultKafkaProducerFactory, the template is thread-safe.

A NewTopic instance is also created by passing the topic name which was configured in the application.yml file.

OrderProducer class

For sending message to a specific topic using KafkaTemplate is done in a separate OrderProducer class. ObjectMapper class' writeValueAsBytes() method is used to serialize Order object to byte array.

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netjstech.orderservice.dto.OrderDto;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class OrderProducer {
  private final KafkaTemplate<String, Object> kafkaTemplate;
  private final NewTopic topicName;
  private final ObjectMapper objectMapper;

  public OrderProducer(NewTopic topic, KafkaTemplate<String, Object> kafkaTemplate, ObjectMapper objectMapper) {
    this.topicName = topic;
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
  }

  public String send(OrderDto order) throws JsonProcessingException {     
    log.info("Sending message='{}' to topic='{}'", order.toString(), topicName);                 
    
    kafkaTemplate.send(topicName.name(), objectMapper.writeValueAsBytes(order));
    return "message sent";
  }
}

That concludes the Producer part before moving to the Consumer part let's see how to install Apache Kafka.

Installing Kafka

Simplest way to get Kafka is to run it as a container, here is a docker-compose.yml file that can be used to do that.

version: "3"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    ports:
      - "9092:9092"
    container_name: broker
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
  kafka_data:
    driver: local

Note that Kafka 3.3.1. onward Zookeeper is not required to store metadata about Kafka cluster. Apache Kafka Raft (KRaft) simplifies Kafka's architecture by consolidating responsibility for metadata into Kafka itself, rather than splitting it between two different systems: ZooKeeper and Kafka. That is why Zookeeper instance is not configured in the compose file.

Also note that the port in bootstrap-servers you provided in service (9092) matches the port in your compose file.

To build and start the container use the following command

docker compose up -d

-d switch to run it in detached mode.

After that you can use docker compose stop and docker compose start to stop and start these containers.

Just ensure that all the containers are up by running the following command.

docker ps -a

order-consumer-service Microservice

In STS create a new Spring Starter Project and give details as given here.

Name: order-consumer-service. Java version used is 17, packaging is Jar and project type is Maven. Dependencies are same as given in order-service.

Configuration properties

Since consumer needs to read the data written by producer so topic name for consumer should be same as producer.

application.yml

server:
  port: 8082
    
spring:
  application:
    name: order-consumer-service

  kafka:
    bootstrap-servers: localhost:9092  
    topic:
      name: order-topic

OrderDto class

POJO class used to produce and consume data. This class is same as in order-service.

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderDto {
  private List<String> items;
  private double amount;
  private String status;
}

Consumer Configuration

A Java configuration class which provides consumer configuration.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class OrderConsumerConfig {
  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;
  @Value("${spring.kafka.topic.name}")
  private String topicName;
  
    @Bean
    public Map<String, Object> consumerConfig() {
      Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Note the use of ConsumerConfig.GROUP_ID_CONFIG in the class which sets the consumer group ID. The Group ID determines which consumers belong to which group. If there are four consumers with the same Group ID assigned to the same topic, they will all share the work of reading from the same topic.

You can use a particular Group ID's offset to check whether there's been new data written to the partition. If there's an event with a larger offset, that means there's new data to read. If there are more consumers with the same group ID and same topic then any one of them can read the new data which is the functionality you may want. In our example which passes order, more consumers having the same group ID will work. Any order is read by only a single consumer. But there are scenarios where you may not want that, in that case group IDs for the consumer should not be same.

Using these configurations DefaultKafkaConsumerFactory produces new Consumer instances.

ConcurrentKafkaListenerContainerFactory is a KafkaListenerContainerFactory implementation to build a ConcurrentMessageListenerContainer. This factory is primarily for building containers for KafkaListener annotated methods but can also be used to create any container.

OrderConsumerService

Service class that has a method annotated with @KafkaListener which marks a method to be the target of a Kafka message listener on the specified topics. With @KafkaListener annotation we can specify a topic.

import java.io.IOException;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netjstech.orderconsumerservice.dto.OrderDto;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class OrderConsumerService {
  private final ObjectMapper objectMapper;
  public OrderConsumerService(ObjectMapper objectMapper) {
    this.objectMapper = objectMapper;
  }
  
  @KafkaListener(topics = "${spring.kafka.topic.name}")
  public void orderConsumer(byte[] message) throws IOException {
    OrderDto order = objectMapper.readValue(message, OrderDto.class);
    log.info("Order data- " + order.toString());
    // Order status can be changed once it fulfills some criteria
    order.setStatus("Processed");
    log.info("Order data- " + order.toString());
  }
}

Testing Kafka and microservices integration

Start both the Services and also ensure Kafka container is up. Using Postman send an order to the order-service listening on port 8081

Spring Boot Kafka Example

Once the order is sent to order-service it should be written to the configured topic by Kafka. Consumer reads the Order data from topic then you can further process the order by doing payment and changing the status to processed or rejected based on whether payment was successful or failed and pass that event to another microservice for further processing of order.

In the order-consumer-service you can verify the logs to see that the order is read by the consumer.

[2m2023-12-27T14:06:46.182+05:30 INFO 17940 --- [order-consumer-service] [ntainer#0-0-C-1] c.n.o.service.OrderConsumerService : Order data- OrderDto(items=[RAM, Laptop, Mouse], amount=725.5, status=ordered)
[2m2023-12-27T14:06:46.182+05:30 INFO 17940 --- [order-consumer-service] [ntainer#0-0-C-1] c.n.o.service.OrderConsumerService : Order data- OrderDto(items=[RAM, Laptop, Mouse], amount=725.5, status=Processed)

Reference: https://kafka.apache.org/documentation

That's all for this topic Spring Boot Event Driven Microservice With Kafka. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Spring Tutorial Page


Related Topics

  1. Spring Boot Microservice + API Gateway + Resilience4J
  2. Spring Boot Observability - Distributed Tracing, Metrics
  3. Spring Boot Microservice - Externalized Configuration With Spring Cloud Config
  4. Spring Boot Microservice - Load-Balancing With Spring Cloud LoadBalancer
  5. Spring Boot Microservice Circuit Breaker Using Resilience4j

You may also like-

  1. Spring MVC Excel Generation Example
  2. Spring JdbcTemplate Insert, Update And Delete Example
  3. BeanPostProcessor in Spring Framework
  4. Spring Object XML Mapping (OXM) JAXB Example
  5. Is String Thread Safe in Java
  6. Java Concurrency Interview Questions And Answers
  7. JavaScript Rest Parameter
  8. Angular Reactive Form Example

No comments:

Post a Comment