Red Hat JBoss Fuse ESB + Apache Camel + Kafka with Zookeeper + OSGI Blueprint DSL + Example

In this blog, we will learn camel-Kafka integration using OSGI Blueprint DSL with working examples.

Technology Stack

Java 8
Apache Kafka Server 2.13
Apache Camel (2.17.0.redhat-630187)
OSGI Blueprint DSL

Project Structure

First of all we need to create a Fuse Integration Project [fuse-kafka-apache-camel-integration] with Empty Blueprint

Maven File (pom.xml)

Note: Below maven dependency needs to be added in pom.xml for Kafka integration.


<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="" xmlns="" xmlns:xsi="">
	<name>Kafka - Camel Blueprint Quickstart</name>
	<description>Kafka - Camel Blueprint Example</description>
			<name>FuseSource Community Release Repository</name>
			<name>Red Hat GA Repository</name>
			<name>FuseSource Community Release Repository</name>
			<name>Red Hat GA Repository</name>
						<Bundle-Name>Kakfa Camel Blueprint Example [fuse-kafka-apache-camel-integration]</Bundle-Name>

Camel Routes: camel-context.xml

You can copy the below camel-context [blueprint] file under src/mail/resources/OSGI-INF/blueprint/camel-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="" xmlns:cm="" xmlns:xsi="" xsi:schemaLocation="">
    <camelContext id="_context1" xmlns="">
        <route id="kafka_consumer">
            <from id="_from1" uri="kafka:localhost:9092?topic=test&amp;groupId=test-consumer-group&amp;kerberosRenewWindowFactor=0&amp;kerberosRenewJitter=0"/>
            <log id="_log1" message="The message received from Kafka Server: ${body}"/>

Note: Above camel route will consume the message from Kafka broker topic [test] and log the message.

Kafka: Start Zookeeper & Kafka Server

Please refer the steps for Running Zookeeper and Running Kafka Server: Spring Boot + Spring Kafka with Zookeeper + JSON Serialization | Deserialization + Example

Build the Project [fuse-kafka-apache-camel-integration]

To build this project use

mvn install

Run the Project [fuse-kafka-apache-camel-integration]

To run the project you can execute the following Maven goal

mvn camel:run
[         Blueprint Extender: 1] BlueprintContainerImpl         INFO  Bundle fuse-kafka-apache-camel-integration/1.0.0.SNAPSHOT is waiting for namespace handlers []
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) is starting
[         Blueprint Extender: 3] ManagedManagementStrategy      INFO  JMX is enabled
[         Blueprint Extender: 3] DefaultRuntimeEndpointRegistry INFO  Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at
[         Blueprint Extender: 3] KafkaConsumer                  INFO  Starting Kafka consumer
[         Blueprint Extender: 3] ConsumerConfig                 INFO  ConsumerConfig values:
        metric.reporters = [] = 300000
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer = test-consumer-group
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] = 50
        sasl.kerberos.ticket.renew.window.factor = 0.0
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [localhost:9092] = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit = null
        sasl.kerberos.ticket.renew.jitter = 0.0
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX = true
        ssl.key.password = null = 500
        sasl.kerberos.min.time.before.relogin = 60000 = 540000
        ssl.truststore.password = null = 30000
        metrics.num.samples = 2 =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        check.crcs = true = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null = 3000 = 5000
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509 = 30000
        fetch.min.bytes = 1024
        send.buffer.bytes = 131072
        auto.offset.reset = latest

[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.ticket.renew.window.factor = 0.0 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.ticket.renew.jitter = 0.0 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.keystore.type = JKS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.trustmanager.algorithm = PKIX was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.min.time.before.relogin = 60000 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.protocol = TLS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.truststore.type = JKS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.keymanager.algorithm = SunX509 was supplied but isn't a known config.
[         Blueprint Extender: 3] AppInfoParser                  INFO  Kafka version :
[         Blueprint Extender: 3] AppInfoParser                  INFO  Kafka commitId : 23c69d62a0cabf06
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Route: kafka_consumer started and consuming from: Endpoint[kafka://localhost:9092?groupId=test-consumer-group&kerberosRenewJitter=0&kerberosRenewWindowFactor=0&topic=test]
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Total 1 routes, of which 1 are started.
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) started in 1.587 seconds

Deploy the Project [fuse-kafka-apache-camel-integration] in OSGi

You can run the following command from its Fuse shell:

osgi:install -s

Start Kafka Producer for sending messages to the topic if not running

Execute the below command and type the message to be published to Kafka topic [test]

kafka-console-producer.bat –broker-list localhost:9092 –topic test
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.

C:\Users\Software\kafka\bin\windows>kafka-console-producer.bat -broker-list localhost:9092 -topic test
>The Basic Tech Info Team Welcomed You!!!

Application Logs

Message will be received by topic consumer as below

Similarly, we can create one more route for the publisher and can deploy it as a bundle in the OSGI container.

Download Source Code (Attached)

Happy learning, have a nice day 🙂