Spring Boot + Spring Kafka with Zookeeper + JSON Serialization | Deserialization + Example

June 7, 2021   978 Views

Kafka with Zookeeper

In this blog, we will learn about Kafka with Zookeeper. How we can Serialize and Deserialize an object with the help of an example. Serialization is a mechanism of converting the state of an object into a byte stream.
Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object. #kafka consumer spring boot example #spring boot Kafka starter

What is Serialization and Deserialization in Java?

Serialization is a mechanism of converting the state of an object into a byte stream.

Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. This mechanism is used to persist the object.

What is Serialization and Deserialization in Java

Kafka Cluster/Architecture

Kafka is dependent on Zookeeper (a distributed configuration Management Tool).

Kafka Cluster/Architecture

Local ZooKeeper+Kafka setup for Windows

Kafka uses ZooKeeper to manage the cluster. ZooKeeper is used to coordinate the brokers/cluster topology.

  • Download Kafka 2.13-2.5.0 for windows https://apache.claz.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
  • Extract tgz file to C:/kafka_2.13-2.8.0; update the configuration files (in C:/kafka_2.13-2.8.0/config/zookeeper.properties and C:/kafka_2.13-2.8.0/config/server.properties) for logs file location before starting the nodes as shown below:

C:/kafka_2.13-2.8.0/config/zookeeper.properties

# the directory where the snapshot is stored.
dataDir=C:/tmp/zookeeper

C:/kafka_2.13-2.8.0/config/server.properties

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=C:/tmp/kafka-logs

listeners=PLAINTEXT://127.0.0.1:9092
auto.create.topics.enable=true

Zookeeper’s & Kafka Default Ports

Note: Kafka will run on default port 9092 and connect to Zookeeper’s default port, 2181.

Running Zookeeper

  • Change directory to C:/kafka_2.13-2.8.0/bin/windows
  • Run the zookeeper-server with command: zookeeper-server-start.bat ../../config/zookeeper.properties
[2021-06-07 20:30:44,314] INFO Server environment:java.io.tmpdir=C:\Users\AppData\Local\Temp\ (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,315] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,316] INFO Server environment:os.name=Windows 10 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,327] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,332] INFO Server environment:os.version=10.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,334] INFO Server environment:user.name=ABC (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,335] INFO Server environment:user.home=C:\Users (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,336] INFO Server environment:user.dir=C:\kafka\bin\windows (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,348] INFO Server environment:os.memory.free=496MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,349] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,351] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,361] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,365] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-06-07 20:30:44,368] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir C:\tmp\zookeeper\version-2 snapdir C:\tmp\zookeeper\version-2 (org.apache.zookeeper.server.ZooKeeperServer)

Running Kafka Server

  • Change directory to C:/kafka_2.13-2.8.0/bin/windows
  • Start Kafka Server with command: kafka-server-start.bat ../../config/server.properties
Kafka Server Logs, join group failed with org.apache.kafka.common.errors.memberidrequiredexception

Kerberos, Keytab, and krb5.conf Files

Note: Do NOT use Kerberos configs for the local hosts.

Create Topic (test)

Now your Kafka Server is up and running, you can create topics to store messages. Also, we can produce or consume data from Java or Scala code or directly from the command prompt.

  • Change directory to C:/kafka_2.13-2.8.0/bin/windows
  • kafka-topics.bat –create –zookeeper localhost:2181 -replication-factor 1 –partitions 1 –topic test
join group failed with org.apache.kafka.common.errors.memberidrequiredexception

CMD: Starting Producer and Consumer to Test the Server [Kafka Broker]

  • Change directory to C:/kafka_2.13-2.8.0/bin/windows

To start a producer type the following command:

  • kafka-console-producer.bat –broker-list localhost:9092 –topic test
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.

C:/kafka_2.13-2.8.0/bin/windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
>The Basic Tech Info ... Welcomed You !!!
>

Now start a consumer by typing the following command:

  • kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.

C:/kafka_2.13-2.8.0/bin/windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"make":"Passat","manufacturer":"Volkswagen","id":"ABC-123"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
{"empid":"2121","empname":"John","empdept":"Dept-A","empsalary":"3000","empage":"30","empaddress":"California"}
The Basic Tech Info ... Welcomed You !!!

Other Kafka Useful Tools/Commands (List, Describe, Delete Topic and Read Message from the beginning)

1. List Topics: kafka-topics.bat --list --zookeeper localhost:2181
2. Describe Topic: kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]
3. Read messages from the beginning: kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning
4. Delete Topic: kafka-run-class.bat kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181

Example: Spring Boot Kafka Topic Publisher

In this example we will implement the sendMessage with messageKey parameter using ListenableFuture interface. E.g.

Technology Stack

Java 8
Spring Boot 2.4.4
Apache Kafka Server 2.13
Zookeeper
Spring Kafka 2.6.7

Project Structure

High-level application structure would be like this.

Project Structure

Maven File (pom.xml)

Maven file would be as below

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>spring-boot-kafka-json</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>spring-boot-kafka-json</name>
	<description>Spring Kafka - JSON Serializer Deserializer Example</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.4</version>
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>
        <dependencies>
		<!-- spring-boot -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.github.embeddedkafka</groupId>
			<artifactId>embedded-kafka_2.12</artifactId>
			<version>2.5.0</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
        <build>
	    <plugins>
                <plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
		</plugin>
            </plugins>
        </build>
</project>

Application Properties (application.yml)

Kafka topic name is test.

kafka:
  bootstrap-servers: localhost:9092
  topic:
    json: test

Application Logger: logback.xml

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
  <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
  </pattern>
</encoder>
</appender>
<logger name="com.the.basic.tech.info.*" level="INFO" />
  <logger name="org.springframework" level="WARN" />
  <root level="INFO">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

CommandLineRunner: SpringKafkaApplication

We need to implement CommandLineRunner for launching the SpringKafkaApplication Publisher/Producer and Subscriber/Consumer

package com.the.basic.tech.info.kafka;

import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

	@Autowired
	Sender sndr;
	
	@Autowired
	Receiver rcvr;
	
  public static void main(String[] args) {
    SpringApplication.run(SpringKafkaApplication.class, args);
  }
  
	@Override
	public void run(String... arg0) throws Exception {
	    Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
	    sndr.send(employee);

	    rcvr.getLatch().await(10000, TimeUnit.MILLISECONDS);
	}
}

Data Model: Employee.java

package com.the.basic.tech.info.model;

public class Employee {

	private String empid;

	@Override
	public String toString() {
		return "Employee [empid=" + empid + ", empname=" + empname + ", empdept=" + empdept + ", empsalary=" + empsalary
				+ ", empage=" + empage + ", empaddress=" + empaddress + "]";
	}

	private String empname;
	private String empdept;
	private String empsalary;
	private String empage;
	private String empaddress;
	
	public Employee() {
	}

	public Employee(String empid, String empname, String empdept, String empsalary, String empage, String empaddress) {
		super();
		this.empid = empid;
		this.empname = empname;
		this.empdept = empdept;
		this.empsalary = empsalary;
		this.empage = empage;
		this.empaddress = empaddress;
	}

	public String getEmpid() {
		return empid;
	}

	public void setEmpid(String empid) {
		this.empid = empid;
	}

	public String getEmpname() {
		return empname;
	}

	public void setEmpname(String empname) {
		this.empname = empname;
	}

	public String getEmpdept() {
		return empdept;
	}

	public void setEmpdept(String empdept) {
		this.empdept = empdept;
	}

	public String getEmpsalary() {
		return empsalary;
	}

	public void setEmpsalary(String empsalary) {
		this.empsalary = empsalary;
	}

	public String getEmpage() {
		return empage;
	}

	public void setEmpage(String empage) {
		this.empage = empage;
	}

	public String getEmpaddress() {
		return empaddress;
	}

	public void setEmpaddress(String empaddress) {
		this.empaddress = empaddress;
	}

}

Kafka Publisher/Sender Config: SenderConfig.java with Serializer properties

package com.the.basic.tech.info.kafka.producer;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.the.basic.tech.info.model.Employee;

@Configuration
public class SenderConfig {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return props;
  }

  @Bean
  public ProducerFactory<String, Employee> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, Employee> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  @Bean
  public Sender sender() {
    return new Sender();
  }
}

Kafka Publisher/Sender: Sender.java

package com.the.basic.tech.info.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.the.basic.tech.info.model.Employee;

public class Sender {

	private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

	@Value("${kafka.topic.json}")
	private String jsonTopic;

	@Autowired
	private KafkaTemplate<String, Employee> kafkaTemplate;

	public void send(Employee employee) {
		LOG.info("sending car='{}'", employee.toString());
		kafkaTemplate.send(jsonTopic, employee);
		sendMessage(employee, "key1");
	}

	public void sendMessage(final Employee string, String messageKey) {

		final ListenableFuture<SendResult<String, Employee>> future;

		if (null != messageKey && !"".equals(messageKey)) {
			future = kafkaTemplate.send(jsonTopic, messageKey, string);
			LOG.info("streaming-event-publisher :: sendMessage with MESSAGE_KEY : {} ", messageKey);
		} else {
			future = kafkaTemplate.send(jsonTopic, string);
			LOG.info("streaming-event-publisher :: sendMessage without key.");
		}

		future.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
			@Override
			public void onSuccess(final SendResult<String, Employee> result) {
				LOG.info(
						"publisher :: Successfully Sent Message to topic, partition number [{}] and Offset [{}]",
						result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
			}

			@Override
			public void onFailure(final Throwable err) {
				LOG.warn("publisher :: Unable to deliver message [{}]. {}", string, err.getMessage());
			}
		});
	}
}

Note: We are sending messageKey as "Key1" for publishing all the message in the same partition.

Application Logs: Kafka Publisher/Sender

value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

18:47:22.873 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
18:47:22.875 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
18:47:22.877 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1623071842872
18:47:22.881 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-json-1, groupId=json] Subscribed to topic(s): test
18:47:22.907 [main] INFO  o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
18:47:22.951 [main] INFO  c.t.b.t.i.k.SpringKafkaApplication - Started SpringKafkaApplication in 3.92 seconds (JVM running for 4.979)
18:47:22.959 [main] INFO  c.t.b.t.info.kafka.producer.Sender - sending car='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
18:47:23.477 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-json-1, groupId=json] (Re-)joining group
18:47:23.495 [main] INFO  c.t.b.t.info.kafka.producer.Sender - streaming-event-publisher :: sendMessage with MESSAGE_KEY : key1
18:47:23.532 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Finished assignment for group at generation 23: {consumer-json-1-a18904d9-a431-43a6-b0ee-de0f7cc2cb72=Assignment(partitions=[test-0])}
18:47:23.545 [kafka-producer-network-thread | producer-1] INFO  c.t.b.t.info.kafka.producer.Sender - publisher :: Successfully Sent Message to topic, partition number [0] and Offset [16]

Example: Spring Boot Kafka Topic Receiver

Kafka Subscirber/Receiver Config: ReceiverConfig.java with Deserializer properties

package com.the.basic.tech.info.kafka.consumer;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.the.basic.tech.info.model.Employee;

@Configuration
@EnableKafka
public class ReceiverConfig {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");

    return props;
  }

  @Bean
  public ConsumerFactory<String, Employee> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Employee.class));
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Employee> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Employee> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }

  @Bean
  public Receiver receiver() {
    return new Receiver();
  }
}

Kafka Subscriber/Receiver: Receiver.java

package com.the.basic.tech.info.kafka.consumer;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import com.the.basic.tech.info.model.Employee;

public class Receiver {

  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

  private CountDownLatch latch = new CountDownLatch(1);

  public CountDownLatch getLatch() {
    return latch;
  }

  @KafkaListener(topics = "${kafka.topic.json}")
  public void receive(Employee employee) {
    LOGGER.info("received employee details='{}'", employee.toString());
    latch.countDown();
  }
}

Application Logs: Kafka Subscriber/Receiver

18:47:23.666 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.667 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.673 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:47:23.677 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.t.b.t.info.kafka.consumer.Receiver - received employee details='Employee [empid=2121, empname=John, empdept=Dept-A, empsalary=3000, empage=30, empaddress=California]'
18:48:23.458 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-json-1, groupId=json] Revoke previously assigned partitions test-0

Junit: SpringKafkaApplicationTest

package com.the.basic.tech.info.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;

import com.the.basic.tech.info.kafka.consumer.Receiver;
import com.the.basic.tech.info.kafka.producer.Sender;
import com.the.basic.tech.info.model.Employee;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {

  @Autowired
  private Sender sender;

  @Autowired
  private Receiver receiver;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @ClassRule
  public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "json.t");

  @Before
  public void setUp() throws Exception {
    // wait until the partitions are assigned
    for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
        .getListenerContainers()) {
      ContainerTestUtils.waitForAssignment(messageListenerContainer,
          embeddedKafka.getPartitionsPerTopic());
    }
  }

  @Test
  public void testReceive() throws Exception {
	Employee employee = new Employee("2121", "John", "Dept-A", "3000", "30", "California");
    sender.send(employee);

    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
    assertThat(receiver.getLatch().getCount()).isEqualTo(0);
  }
}

Download Source Code (Attached)

Keep learning. Have a great day 🙂