In this blog, we will learn camel-Kafka integration using OSGI Blueprint DSL with working examples.
Java 8
jboss-fuse-6.3.0.redhat-187
Apache Kafka Server 2.13
Zookeeper
Apache Camel (2.17.0.redhat-630187)
OSGI Blueprint DSL
First of all we need to create a Fuse Integration Project [fuse-kafka-apache-camel-integration] with Empty Blueprint
Note: Below maven dependency needs to be added in pom.xml for Kafka integration.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>com.the.basic.tech.info.kafka</groupId>
<artifactId>kafka-camel-blueprint</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Kafka - Camel Blueprint Quickstart</name>
<description>Kafka - Camel Blueprint Example</description>
<properties>
<camel.version>2.17.0.redhat-630187</camel.version>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.maven-bundle-plugin>3.2.0</version.maven-bundle-plugin>
<jboss.fuse.bom.version>6.3.0.redhat-187</jboss.fuse.bom.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.jboss.fuse.bom</groupId>
<artifactId>jboss-fuse-parent</artifactId>
<version>${jboss.fuse.bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-blueprint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-blueprint</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>org.apache.felix.fileinstall</artifactId>
<groupId>org.apache.felix</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>fuse-public-repository</id>
<name>FuseSource Community Release Repository</name>
<url>https://repo.fusesource.com/nexus/content/groups/public</url>
</repository>
<repository>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>red-hat-ga-repository</id>
<name>Red Hat GA Repository</name>
<url>https://maven.repository.redhat.com/ga</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>fuse-public-repository</id>
<name>FuseSource Community Release Repository</name>
<url>https://repo.fusesource.com/nexus/content/groups/public</url>
</pluginRepository>
<pluginRepository>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>red-hat-ga-repository</id>
<name>Red Hat GA Repository</name>
<url>https://maven.repository.redhat.com/ga</url>
</pluginRepository>
</pluginRepositories>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>${version.maven-bundle-plugin}</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>fuse-kafka-apache-camel-integration</Bundle-SymbolicName>
<Bundle-Name>Kakfa Camel Blueprint Example [fuse-kafka-apache-camel-integration]</Bundle-Name>
</instructions>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.1</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>${camel.version}</version>
<configuration>
<useBlueprint>true</useBlueprint>
</configuration>
</plugin>
</plugins>
</build>
</project>
You can copy the below camel-context [blueprint] file under src/mail/resources/OSGI-INF/blueprint/camel-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint-2.18.3.xsd">
<camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
<route id="kafka_consumer">
<from id="_from1" uri="kafka:localhost:9092?topic=test&groupId=test-consumer-group&kerberosRenewWindowFactor=0&kerberosRenewJitter=0"/>
<log id="_log1" message="The message received from Kafka Server: ${body}"/>
</route>
</camelContext>
</blueprint>
Note: Above camel route will consume the message from Kafka broker topic [test] and log the message.
Please refer the steps for Running Zookeeper and Running Kafka Server: Spring Boot + Spring Kafka with Zookeeper + JSON Serialization | Deserialization + Example
To build this project use
mvn install
To run the project you can execute the following Maven goal
mvn camel:run
[ Blueprint Extender: 1] BlueprintContainerImpl INFO Bundle fuse-kafka-apache-camel-integration/1.0.0.SNAPSHOT is waiting for namespace handlers [http://camel.apache.org/schema/blueprint]
[ Blueprint Extender: 3] BlueprintCamelContext INFO Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) is starting
[ Blueprint Extender: 3] ManagedManagementStrategy INFO JMX is enabled
[ Blueprint Extender: 3] DefaultRuntimeEndpointRegistry INFO Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[ Blueprint Extender: 3] BlueprintCamelContext INFO AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[ Blueprint Extender: 3] BlueprintCamelContext INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[ Blueprint Extender: 3] KafkaConsumer INFO Starting Kafka consumer
[ Blueprint Extender: 3] ConsumerConfig INFO ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = test-consumer-group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.0
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.0
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration sasl.kerberos.ticket.renew.window.factor = 0.0 was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration sasl.kerberos.ticket.renew.jitter = 0.0 was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.keystore.type = JKS was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.trustmanager.algorithm = PKIX was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration sasl.kerberos.min.time.before.relogin = 60000 was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.protocol = TLS was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.truststore.type = JKS was supplied but isn't a known config.
[ Blueprint Extender: 3] ConsumerConfig WARN The configuration ssl.keymanager.algorithm = SunX509 was supplied but isn't a known config.
[ Blueprint Extender: 3] AppInfoParser INFO Kafka version : 0.9.0.1
[ Blueprint Extender: 3] AppInfoParser INFO Kafka commitId : 23c69d62a0cabf06
[ Blueprint Extender: 3] BlueprintCamelContext INFO Route: kafka_consumer started and consuming from: Endpoint[kafka://localhost:9092?groupId=test-consumer-group&kerberosRenewJitter=0&kerberosRenewWindowFactor=0&topic=test]
[ Blueprint Extender: 3] BlueprintCamelContext INFO Total 1 routes, of which 1 are started.
[ Blueprint Extender: 3] BlueprintCamelContext INFO Apache Camel 2.17.0.redhat-630187 (CamelContext: _context1) started in 1.587 seconds
You can run the following command from its Fuse shell:
osgi:install -s mvn:com.the.basic.tech.info.kafka/kafka-camel-blueprint/1.0.0-SNAPSHOT
Execute the below command and type the message to be published to Kafka topic [test]
kafka-console-producer.bat –broker-list localhost:9092 –topic test
Microsoft Windows [Version 10.0.17763.1935]
(c) 2018 Microsoft Corporation. All rights reserved.
C:\Users\Software\kafka\bin\windows>kafka-console-producer.bat -broker-list localhost:9092 -topic test
>The Basic Tech Info Team Welcomed You!!!
>
Message will be received by topic consumer as below
Similarly, we can create one more route for the publisher and can deploy it as a bundle in the OSGI container.
Happy learning, have a nice day 🙂