In this blog, we will learn about Kafka with Zookeeper. How we can Serialize and Deserialize an object with the help of an example. Serialization is a mechanism of converting the state of an object into a byte stream.
Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object. #kafka consumer spring boot example #spring boot Kafka starter
- What is Serialization and Deserialization in Java?
- Kafka Cluster/Architecture
- Local ZooKeeper+Kafka setup for Windows
- Zookeeper’s & Kafka Default Ports
- Running Zookeeper
- Running Kafka Server
- Kerberos, Keytab, and krb5.conf Files
- Create Topic (test)
- CMD: Starting Producer and Consumer to Test the Server [Kafka Broker]
- Other Kafka Useful Tools/Commands (List, Describe, Delete Topic and Read Message from the beginning)
- Example: Spring Boot Kafka Topic Publisher
- Technology Stack
- Project Structure
- Maven File (pom.xml)
- Application Properties (application.yml)
- Application Logger: logback.xml
- CommandLineRunner: SpringKafkaApplication
- Data Model: Employee.java
- Kafka Publisher/Sender Config: SenderConfig.java with Serializer properties
- Kafka Publisher/Sender: Sender.java
- Application Logs: Kafka Publisher/Sender
- Example: Spring Boot Kafka Topic Receiver
- Download Source Code (Attached)
What is Serialization and Deserialization in Java?
Serialization is a mechanism of converting the state of an object into a byte stream.
Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object.

Kafka Cluster/Architecture
Kafka is dependent on Zookeeper (a distributed configuration Management Tool).

Local ZooKeeper+Kafka setup for Windows
Kafka uses ZooKeeper to manage the cluster. ZooKeeper is used to coordinate the brokers/cluster topology.
- Download Kafka 2.13-2.5.0 for windows https://apache.claz.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
- Extract tgz file to C:/kafka_2.13-2.8.0; update the configuration files (in C:/kafka_2.13-2.8.0/config/zookeeper.properties and C:/kafka_2.13-2.8.0/config/server.properties) for logs file location before starting the nodes as shown below:
C:/kafka_2.13-2.8.0/config/zookeeper.properties
# the directory where the snapshot is stored.
dataDir=C:/tmp/zookeeper
C:/kafka_2.13-2.8.0/config/server.properties
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=C:/tmp/kafka-logs
listeners=PLAINTEXT://127.0.0.1:9092
auto.create.topics.enable=true
Zookeeper’s & Kafka Default Ports
Note: Kafka will run on default port 9092 and connect to Zookeeper’s default port, 2181.
Running Zookeeper
- Change directory to C:/kafka_2.13-2.8.0/bin/windows
- Run the zookeeper-server with command: zookeeper-server-start.bat ../../config/zookeeper.properties
[2021-06-07 20:30:44,314] INFO Server environment:java.io.tmpdir=C:\Users\AppData\Local\Temp\ (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,315] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,316] INFO Server environment:os.name=Windows 10 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,327] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,332] INFO Server environment:os.version=10.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,334] INFO Server environment:user.name=ABC (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,335] INFO Server environment:user.home=C:\Users (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,336] INFO Server environment:user.dir=C:\kafka\bin\windows (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,348] INFO Server environment:os.memory.free=496MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,349] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,351] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,361] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,365] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,368] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir C:\tmp\zookeeper\version-2 snapdir C:\tmp\zookeeper\version-2 (org.apache.zookeeper.server.ZooKeeperServer)
Running Kafka Server
- Change directory to C:/kafka_2.13-2.8.0/bin/windows
- Start Kafka Server with command: kafka-server-start.bat ../../config/server.properties

Kerberos, Keytab, and krb5.conf Files
Note: Do NOT use Kerberos configs for the local hosts.
Create Topic (test)
Now your Kafka Server is up and running, you can create topics to store messages. Also, we can produce or consume data from Java or Scala code or directly from the command prompt.
- Change directory to C:/kafka_2.13-2.8.0/bin/windows
- kafka-topics.bat –create –zookeeper localhost:2181 -replication-factor 1 –partitions 1 –topic test

CMD: Starting Producer and Consumer to Test the Server [Kafka Broker]
- Change directory to C:/kafka_2.13-2.8.0/bin/windows
To start a producer type the following command:
- kafka-console-producer.bat –broker-list localhost:9092 –topic test
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.
C:/kafka_2.13-2.8.0/bin/windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
>The Basic Tech Info ... Welcomed You !!!
>
Now start a consumer by typing the following command:
- kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.
C:/kafka_2.13-2.8.0/bin/windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
The Basic Tech Info ... Welcomed You !!!
Other Kafka Useful Tools/Commands (List, Describe, Delete Topic and Read Message from the beginning)
1. List Topics: kafka-topics.bat --list --zookeeper localhost:2181
2. Describe Topic: kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]
3. Read messages from the beginning: kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning
4. Delete Topic: kafka-run-class.bat kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Example: Spring Boot Kafka Topic Publisher
In this example we will implement the sendMessage with messageKey parameter using ListenableFuture interface. E.g.
Technology Stack
Java 8
Spring Boot 2.4.4
Apache Kafka Server 2.13
Zookeeper
Spring Kafka 2.6.7
Project Structure
High-level application structure would be like this.

Maven File (pom.xml)
Maven file would be as below
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-boot-kafka-json</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-kafka-json</name>
<description>Spring Kafka - JSON Serializer Deserializer Example</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.embeddedkafka</groupId>
<artifactId>embedded-kafka_2.12</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Application Properties (application.yml)
Kafka topic name is test.
kafka:
bootstrap-servers: localhost:9092
topic:
json: test
Application Logger: logback.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<logger name="com.the.basic.tech.info.*" level="INFO" />
<logger name="org.springframework" level="WARN" />
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
CommandLineRunner: SpringKafkaApplication
We need to implement CommandLineRunner for launching the SpringKafkaApplication Publisher/Producer and Subscriber/Consumer
package com.the.basic.tech.info.kafka;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
@Autowired
Sender sndr;
@Autowired
Receiver rcvr;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Override
public void run(String... arg0) throws Exception {
Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
sndr.send(employee);
rcvr.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
Data Model: Employee.java
package com.the.basic.tech.info.model;
public class Employee {
private String empid;
@Override
public String toString() {
return "Employee [empid=" + empid + ", empname=" + empname + ", empdept=" + empdept + ", empsalary=" + empsalary
+ ", empage=" + empage + ", empaddress=" + empaddress + "]";
}
private String empname;
private String empdept;
private String empsalary;
private String empage;
private String empaddress;
public Employee() {
}
public Employee(String empid, String empname, String empdept, String empsalary, String empage, String empaddress) {
super();
this.empid = empid;
this.empname = empname;
this.empdept = empdept;
this.empsalary = empsalary;
this.empage = empage;
this.empaddress = empaddress;
}
public String getEmpid() {
return empid;
}
public void setEmpid(String empid) {
this.empid = empid;
}
public String getEmpname() {
return empname;
}
public void setEmpname(String empname) {
this.empname = empname;
}
public String getEmpdept() {
return empdept;
}
public void setEmpdept(String empdept) {
this.empdept = empdept;
}
public String getEmpsalary() {
return empsalary;
}
public void setEmpsalary(String empsalary) {
this.empsalary = empsalary;
}
public String getEmpage() {
return empage;
}
public void setEmpage(String empage) {
this.empage = empage;
}
public String getEmpaddress() {
return empaddress;
}
public void setEmpaddress(String empaddress) {
this.empaddress = empaddress;
}
}
Kafka Publisher/Sender Config: SenderConfig.java with Serializer properties
package com.the.basic.tech.info.kafka.producer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.the.basic.tech.info.model.Employee;
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Employee> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Employee> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
Kafka Publisher/Sender: Sender.java
package com.the.basic.tech.info.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.the.basic.tech.info.model.Employee;
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Value("${kafka.topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void send(Employee employee) {
LOG.info("sending car='{}'", employee.toString());
kafkaTemplate.send(jsonTopic, employee);
sendMessage(employee, "key1");
}
public void sendMessage(final Employee string, String messageKey) {
final ListenableFuture<SendResult<String, Employee>> future;
if (null != messageKey && !"".equals(messageKey)) {
future = kafkaTemplate.send(jsonTopic, messageKey, string);
LOG.info("streaming-event-publisher :: sendMessage with MESSAGE_KEY : {} ", messageKey);
} else {
future = kafkaTemplate.send(jsonTopic, string);
LOG.info("streaming-event-publisher :: sendMessage without key.");
}
future.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(final SendResult<String, Employee> result) {
LOG.info(
"publisher :: Successfully Sent Message to topic, partition number [{}] and Offset [{}]",
result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable err) {
LOG.warn("publisher :: Unable to deliver message [{}]. {}", string, err.getMessage());
}
});
}
}
Note: We are sending messageKey as "Key1" for publishing all the message in the same partition.
Application Logs: Kafka Publisher/Sender
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
18:47:22.873 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
18:47:22.875 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
18:47:22.877 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1623071842872
18:47:22.881 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-json-1, groupId=json] Subscribed to topic(s): test
18:47:22.907 [main] INFO o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
18:47:22.951 [main] INFO c.t.b.t.i.k.SpringKafkaApplication - Started SpringKafkaApplication in 3.92 seconds (JVM running for 4.979)
18:47:22.959 [main] INFO c.t.b.t.info.kafka.producer.Sender - sending car='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] (Re-)joining group
18:47:23.495 [main] INFO c.t.b.t.info.kafka.producer.Sender - streaming-event-publisher :: sendMessage with MESSAGE_KEY : key1
18:47:23.532 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Finished assignment for group at generation 23: {consumer-json-1-a18904d9-a431-43a6-b0ee-de0f7cc2cb72=Assignment(partitions=[test-0])}
18:47:23.545 [kafka-producer-network-thread | producer-1] INFO c.t.b.t.info.kafka.producer.Sender - publisher :: Successfully Sent Message to topic, partition number [0] and Offset [16]
Example: Spring Boot Kafka Topic Receiver
Kafka Subscirber/Receiver Config: ReceiverConfig.java with Deserializer properties
package com.the.basic.tech.info.kafka.consumer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.the.basic.tech.info.model.Employee;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
return props;
}
@Bean
public ConsumerFactory<String, Employee> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Employee.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Employee> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Employee> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
Kafka Subscriber/Receiver: Receiver.java
package com.the.basic.tech.info.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import com.the.basic.tech.info.model.Employee;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.json}")
public void receive(Employee employee) {
LOGGER.info("received employee details='{}'", employee.toString());
latch.countDown();
}
}
Application Logs: Kafka Subscriber/Receiver
18:47:23.666 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.667 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.673 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:48:23.458 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Revoke previously assigned partitions test-0
Junit: SpringKafkaApplicationTest
package com.the.basic.tech.info.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "json.t");
@Before
public void setUp() throws Exception {
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testReceive() throws Exception {
Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
sender.send(employee);
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
Download Source Code (Attached)
Keep learning. Have a great day 🙂





