Red Hat JBoss Fuse ESB + Apache Camel + Kafka with Zookeeper + OSGI Blueprint DSL + Example

In this blog, we will learn camel-Kafka integration using OSGI Blueprint DSL with working examples.

Technology Stack

Java 8
jboss-fuse-6.3.0.redhat-187
Apache Kafka Server 2.13
Zookeeper
Apache Camel (2.17.0.redhat-630187)
OSGI Blueprint DSL

Project Structure

First of all we need to create a Fuse Integration Project [fuse-kafka-apache-camel-integration] with Empty Blueprint

Project Structure

Maven File (pom.xml)

Note: Below maven dependency needs to be added in pom.xml for Kafka integration.

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.the.basic.tech.info.kafka</groupId>
	<artifactId>kafka-camel-blueprint</artifactId>
	<version>1.0.0-SNAPSHOT</version>
	<packaging>bundle</packaging>
	<name>Kafka - Camel Blueprint Quickstart</name>
	<description>Kafka - Camel Blueprint Example</description>
	<properties>
		<camel.version>2.17.0.redhat-630187</camel.version>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<version.maven-bundle-plugin>3.2.0</version.maven-bundle-plugin>
		<jboss.fuse.bom.version>6.3.0.redhat-187</jboss.fuse.bom.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.jboss.fuse.bom</groupId>
				<artifactId>jboss-fuse-parent</artifactId>
				<version>${jboss.fuse.bom.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-blueprint</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-test-blueprint</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<artifactId>org.apache.felix.fileinstall</artifactId>
					<groupId>org.apache.felix</groupId>
				</exclusion>
			</exclusions>
		</dependency>
	</dependencies>
	<repositories>
		<repository>
			<releases>
				<enabled>true</enabled>
				<updatePolicy>never</updatePolicy>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>fuse-public-repository</id>
			<name>FuseSource Community Release Repository</name>
			<url>https://repo.fusesource.com/nexus/content/groups/public</url>
		</repository>
		<repository>
			<releases>
				<enabled>true</enabled>
				<updatePolicy>never</updatePolicy>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>red-hat-ga-repository</id>
			<name>Red Hat GA Repository</name>
			<url>https://maven.repository.redhat.com/ga</url>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<releases>
				<enabled>true</enabled>
				<updatePolicy>never</updatePolicy>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>fuse-public-repository</id>
			<name>FuseSource Community Release Repository</name>
			<url>https://repo.fusesource.com/nexus/content/groups/public</url>
		</pluginRepository>
		<pluginRepository>
			<releases>
				<enabled>true</enabled>
				<updatePolicy>never</updatePolicy>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>red-hat-ga-repository</id>
			<name>Red Hat GA Repository</name>
			<url>https://maven.repository.redhat.com/ga</url>
		</pluginRepository>
	</pluginRepositories>
	<build>
		<defaultGoal>install</defaultGoal>
		<plugins>
			<plugin>
				<groupId>org.apache.felix</groupId>
				<artifactId>maven-bundle-plugin</artifactId>
				<version>${version.maven-bundle-plugin}</version>
				<extensions>true</extensions>
				<configuration>
					<instructions>
						<Bundle-SymbolicName>fuse-kafka-apache-camel-integration</Bundle-SymbolicName>
						<Bundle-Name>Kakfa Camel Blueprint Example [fuse-kafka-apache-camel-integration]</Bundle-Name>
					</instructions>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-resources-plugin</artifactId>
				<version>3.0.1</version>
				<configuration>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.camel</groupId>
				<artifactId>camel-maven-plugin</artifactId>
				<version>${camel.version}</version>
				<configuration>
					<useBlueprint>true</useBlueprint>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Camel Routes: camel-context.xml

You can copy the below camel-context [blueprint] file under src/mail/resources/OSGI-INF/blueprint/camel-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint-2.18.3.xsd">
    <camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
        <route id="kafka_consumer">
            <from id="_from1" uri="kafka:localhost:9092?topic=test&amp;groupId=test-consumer-group&amp;kerberosRenewWindowFactor=0&amp;kerberosRenewJitter=0"/>
            <log id="_log1" message="The message received from Kafka Server: ${body}"/>
        </route>
    </camelContext>
</blueprint>

Note: Above camel route will consume the message from Kafka broker topic [test] and log the message.

Kafka: Start Zookeeper & Kafka Server

Please refer the steps for Running Zookeeper and Running Kafka Server: Spring Boot + Spring Kafka with Zookeeper + JSON Serialization | Deserialization + Example

Build the Project [fuse-kafka-apache-camel-integration]

To build this project use

mvn install

Run the Project [fuse-kafka-apache-camel-integration]

To run the project you can execute the following Maven goal

mvn camel:run
[         Blueprint Extender: 1] BlueprintContainerImpl         INFO  Bundle fuse-kafka-apache-camel-integration/1.0.0.SNAPSHOT is waiting for namespace handlers [http://camel.apache.org/schema/blueprint]
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) is starting
[         Blueprint Extender: 3] ManagedManagementStrategy      INFO  JMX is enabled
[         Blueprint Extender: 3] DefaultRuntimeEndpointRegistry INFO  Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[         Blueprint Extender: 3] KafkaConsumer                  INFO  Starting Kafka consumer
[         Blueprint Extender: 3] ConsumerConfig                 INFO  ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        group.id = test-consumer-group
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.0
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [localhost:9092]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.0
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        fetch.min.bytes = 1024
        send.buffer.bytes = 131072
        auto.offset.reset = latest

[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.ticket.renew.window.factor = 0.0 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.ticket.renew.jitter = 0.0 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.keystore.type = JKS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.trustmanager.algorithm = PKIX was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration sasl.kerberos.min.time.before.relogin = 60000 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.protocol = TLS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.truststore.type = JKS was supplied but isn't a known config.
[         Blueprint Extender: 3] ConsumerConfig                 WARN  The configuration ssl.keymanager.algorithm = SunX509 was supplied but isn't a known config.
[         Blueprint Extender: 3] AppInfoParser                  INFO  Kafka version : 0.9.0.1
[         Blueprint Extender: 3] AppInfoParser                  INFO  Kafka commitId : 23c69d62a0cabf06
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Route: kafka_consumer started and consuming from: Endpoint[kafka://localhost:9092?groupId=test-consumer-group&kerberosRenewJitter=0&kerberosRenewWindowFactor=0&topic=test]
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Total 1 routes, of which 1 are started.
[         Blueprint Extender: 3] BlueprintCamelContext          INFO  Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) started in 1.587 seconds

Deploy the Project [fuse-kafka-apache-camel-integration] in OSGi

You can run the following command from its Fuse shell:

osgi:install -s mvn:com.the.basic.tech.info.kafka/kafka-camel-blueprint/1.0.0-SNAPSHOT

Start Kafka Producer for sending messages to the topic if not running

Execute the below command and type the message to be published to Kafka topic [test]

kafka-console-producer.bat –broker-list localhost:9092 –topic test
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.

C:\Users\Software\kafka\bin\windows>kafka-console-producer.bat -broker-list localhost:9092 -topic test
>The Basic Tech Info Team Welcomed You!!!
>

Application Logs

Message will be received by topic consumer as below

Application Logs

Similarly, we can create one more route for the publisher and can deploy it as a bundle in the OSGI container.

Download Source Code (Attached)

Happy learning, have a nice day 🙂