In this blog, we will learn about Kafka with Zookeeper. How we can Serialize and Deserialize an object with the help of an example. Serialization is a mechanism of converting the state of an object into a byte stream.
Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object. #kafka consumer spring boot example #spring boot Kafka starter
Serialization is a mechanism of converting the state of an object into a byte stream.
Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object.
Kafka is dependent on Zookeeper (a distributed configuration Management Tool).
Kafka uses ZooKeeper to manage the cluster. ZooKeeper is used to coordinate the brokers/cluster topology.
C:/kafka_2.13-2.8.0/config/zookeeper.properties
# the directory where the snapshot is stored.
dataDir=C:/tmp/zookeeper
C:/kafka_2.13-2.8.0/config/server.properties
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=C:/tmp/kafka-logs
listeners=PLAINTEXT://127.0.0.1:9092
auto.create.topics.enable=true
Note: Kafka will run on default port 9092 and connect to Zookeeper’s default port, 2181.
[2021-06-07 20:30:44,314] INFO Server environment:java.io.tmpdir=C:\Users\AppData\Local\Temp\ (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,315] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,316] INFO Server environment:os.name=Windows 10 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,327] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,332] INFO Server environment:os.version=10.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,334] INFO Server environment:user.name=ABC (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,335] INFO Server environment:user.home=C:\Users (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,336] INFO Server environment:user.dir=C:\kafka\bin\windows (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,348] INFO Server environment:os.memory.free=496MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,349] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,351] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,361] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,365] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,368] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir C:\tmp\zookeeper\version-2 snapdir C:\tmp\zookeeper\version-2 (org.apache.zookeeper.server.ZooKeeperServer)
Note: Do NOT use Kerberos configs for the local hosts.
Now your Kafka Server is up and running, you can create topics to store messages. Also, we can produce or consume data from Java or Scala code or directly from the command prompt.
To start a producer type the following command:
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.
C:/kafka_2.13-2.8.0/bin/windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
>The Basic Tech Info ... Welcomed You !!!
>
Now start a consumer by typing the following command:
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.
C:/kafka_2.13-2.8.0/bin/windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
The Basic Tech Info ... Welcomed You !!!
1. List Topics: kafka-topics.bat --list --zookeeper localhost:2181
2. Describe Topic: kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]
3. Read messages from the beginning: kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning
4. Delete Topic: kafka-run-class.bat kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
In this example we will implement the sendMessage with messageKey parameter using ListenableFuture interface. E.g.
Java 8
Spring Boot 2.4.4
Apache Kafka Server 2.13
Zookeeper
Spring Kafka 2.6.7
High-level application structure would be like this.
Maven file would be as below
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-boot-kafka-json</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-kafka-json</name>
<description>Spring Kafka - JSON Serializer Deserializer Example</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.embeddedkafka</groupId>
<artifactId>embedded-kafka_2.12</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Kafka topic name is test.
kafka:
bootstrap-servers: localhost:9092
topic:
json: test
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<logger name="com.the.basic.tech.info.*" level="INFO" />
<logger name="org.springframework" level="WARN" />
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
We need to implement CommandLineRunner for launching the SpringKafkaApplication Publisher/Producer and Subscriber/Consumer
package com.the.basic.tech.info.kafka;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
@Autowired
Sender sndr;
@Autowired
Receiver rcvr;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Override
public void run(String... arg0) throws Exception {
Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
sndr.send(employee);
rcvr.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
package com.the.basic.tech.info.model;
public class Employee {
private String empid;
@Override
public String toString() {
return "Employee [empid=" + empid + ", empname=" + empname + ", empdept=" + empdept + ", empsalary=" + empsalary
+ ", empage=" + empage + ", empaddress=" + empaddress + "]";
}
private String empname;
private String empdept;
private String empsalary;
private String empage;
private String empaddress;
public Employee() {
}
public Employee(String empid, String empname, String empdept, String empsalary, String empage, String empaddress) {
super();
this.empid = empid;
this.empname = empname;
this.empdept = empdept;
this.empsalary = empsalary;
this.empage = empage;
this.empaddress = empaddress;
}
public String getEmpid() {
return empid;
}
public void setEmpid(String empid) {
this.empid = empid;
}
public String getEmpname() {
return empname;
}
public void setEmpname(String empname) {
this.empname = empname;
}
public String getEmpdept() {
return empdept;
}
public void setEmpdept(String empdept) {
this.empdept = empdept;
}
public String getEmpsalary() {
return empsalary;
}
public void setEmpsalary(String empsalary) {
this.empsalary = empsalary;
}
public String getEmpage() {
return empage;
}
public void setEmpage(String empage) {
this.empage = empage;
}
public String getEmpaddress() {
return empaddress;
}
public void setEmpaddress(String empaddress) {
this.empaddress = empaddress;
}
}
package com.the.basic.tech.info.kafka.producer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.the.basic.tech.info.model.Employee;
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Employee> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Employee> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
package com.the.basic.tech.info.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.the.basic.tech.info.model.Employee;
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Value("${kafka.topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void send(Employee employee) {
LOG.info("sending car='{}'", employee.toString());
kafkaTemplate.send(jsonTopic, employee);
sendMessage(employee, "key1");
}
public void sendMessage(final Employee string, String messageKey) {
final ListenableFuture<SendResult<String, Employee>> future;
if (null != messageKey && !"".equals(messageKey)) {
future = kafkaTemplate.send(jsonTopic, messageKey, string);
LOG.info("streaming-event-publisher :: sendMessage with MESSAGE_KEY : {} ", messageKey);
} else {
future = kafkaTemplate.send(jsonTopic, string);
LOG.info("streaming-event-publisher :: sendMessage without key.");
}
future.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(final SendResult<String, Employee> result) {
LOG.info(
"publisher :: Successfully Sent Message to topic, partition number [{}] and Offset [{}]",
result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable err) {
LOG.warn("publisher :: Unable to deliver message [{}]. {}", string, err.getMessage());
}
});
}
}
Note: We are sending messageKey as "Key1" for publishing all the message in the same partition.
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
18:47:22.873 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
18:47:22.875 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
18:47:22.877 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1623071842872
18:47:22.881 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-json-1, groupId=json] Subscribed to topic(s): test
18:47:22.907 [main] INFO o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
18:47:22.951 [main] INFO c.t.b.t.i.k.SpringKafkaApplication - Started SpringKafkaApplication in 3.92 seconds (JVM running for 4.979)
18:47:22.959 [main] INFO c.t.b.t.info.kafka.producer.Sender - sending car='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] (Re-)joining group
18:47:23.495 [main] INFO c.t.b.t.info.kafka.producer.Sender - streaming-event-publisher :: sendMessage with MESSAGE_KEY : key1
18:47:23.532 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Finished assignment for group at generation 23: {consumer-json-1-a18904d9-a431-43a6-b0ee-de0f7cc2cb72=Assignment(partitions=[test-0])}
18:47:23.545 [kafka-producer-network-thread | producer-1] INFO c.t.b.t.info.kafka.producer.Sender - publisher :: Successfully Sent Message to topic, partition number [0] and Offset [16]
package com.the.basic.tech.info.kafka.consumer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.the.basic.tech.info.model.Employee;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
return props;
}
@Bean
public ConsumerFactory<String, Employee> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Employee.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Employee> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Employee> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
package com.the.basic.tech.info.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import com.the.basic.tech.info.model.Employee;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.json}")
public void receive(Employee employee) {
LOGGER.info("received employee details='{}'", employee.toString());
latch.countDown();
}
}
18:47:23.666 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.667 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.673 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:48:23.458 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Revoke previously assigned partitions test-0
package com.the.basic.tech.info.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "json.t");
@Before
public void setUp() throws Exception {
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testReceive() throws Exception {
Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
sender.send(employee);
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
Keep learning. Have a great day 🙂