How to Create Kafka Streams using Spring Cloud Stream

Blog > How to Create Kafka Streams using Spring Cloud Stream

Create Kafka Streams using Spring Cloud Stream

How to Create Kafka Streams using Spring Cloud Stream?

In this blog, we will discuss how we can create Kafka streams using Spring Cloud. Apache Kafka needs no introduction over a period of years; Kafka has become an essential part of designing event stream applications.

Kafka is now being used for capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications. This data is being captured in the form of streams of events and stored in Kafka for retrieving, manipulating, processing, and reacting to the event streams in real-time. Kafka-based applications ensure a continuous flow and interpretation of data so that the right information is at the right place, at the right time.

Spring Cloud Streams

On the other side, Spring Bot transforms the way we develop production-grade spring-based micro-servers. In the recent past, Spring Boot was put together with the Spring integration to create a new project which is Spring Cloud Streams.

Spring Cloud extends Spring Boot’s capabilities to apply a micro-service architecture pattern for creating event-centric applications.

What if these two master technologies are put together. Spring Cloud Stream joins hands with Kafka Streams DSL, and now the user can use it to create stateless and stateful event stream processing micro-services.

Here we are going to create a simple stream listener. The listener is a Kafka Message consumer who will listen to a Kafka topic, read all incoming messages, and log it. Kafka and spring are highly configurable systems, so; every application has a configuration file. Here application configurations are defined in a hierarchy.

String Cloud Stream Configuration code:

spring:

cloud:

stream:

bindings:

input-channel-1:

destination: users

kafka:

streams:

binder:

applicationId: hellostreams

brokers: localhost: 9092

configuration:

default:

key:

serde: org.apache.kafka.common.serialization.Serdeos$StriingSerde

value:

serde: org.apache.kafka.common.serialization.Serdeos$StriingSerde

 

In this code there are just two configurations defined. The first one is known as the input-output channel binding, and the second one is the binder. The input-output channel binding defines the lists of source and destination. The binder will define source and technology. The spring cloud offers a bunch of binder technologies.

The user can use Apache Kafka, RabbitMQ, Amazon Kinesis, Google Pubsub, and Azure events hub, and many more.

Spring Cloud offers two types of Kafka bindings such as:

Apache Kafka Binder: The Apache Kafka binder implements the Kafka client APIs.

Apache Kafka Streams Binder: The Kafka streams binder is explicitly designed for Kafka streams API.

Here is the code to create a Kafka listener service that binds to a Kafka input topic and listens to all the incoming messages.

Method signature topic for input stream builder:

public interface KafkaListenerBinding {

@input(“input-channel-1”)

KStream<String, String> inputStream

}

This method will read from a Kafka topic and return a KStream. The KStream is a Kafka message stream made of stream key and stream value.

Kafka Listener Service

This class will be a service that will trigger the cloud stream framework connected to the Kafka input channel using the Kafka stream API and start consuming the input messages as a KStream.

Here is the code to bind this class with the spring cloud stream infrastructure and pass each message in the KStream.

import Lombok.ecten.log4j;

import org.springframework.cloud.annotation;

import org.springframework.stereotype;

 

(KafkaListnerBinding.)

KafkaListnerService{

(“input-channel-1”)

process(KStream<String, String> input){

input.foreach ((k, v)-> log.info(string.format(“Key: %s, Value: %s”, k, v)));

}

 

}

 

Summary

  1. We started to define some application configurations. In the above configuration code, we configured an input channel with the destination as users Kafka topic because we wanted to connect to the users Kafka topic and read all the messages.
  2. Then we told spring cloud stream that we want to use Kafka stream binder. So, the spring cloud stream should connect to Kafka broker using the given hostname and port.
  3. We also configured the message key and the message value type.
  4. Then we define the Listener service. The will trigger the spring framework. The spring cloud framework will implement the binding interface and create a Kafka stream. The listener method will receive the input stream and send it to the log.

Note: The spring cloud framework must have picked the input channel. And bound to the users’ Kafka topic.

 

Author: SVCIT Editorial

Copyright Silicon Valley Cloud IT, LLC.

Svcit Silicon Valley Cloud IT LLC. + 1 (855)-MYSVCIT Customers@SiliconValleyCloudIT.com