Mule 4 + MuleSoft + Publish and Consume Messages Within Flows + Example

  • In this article we will discuss how to configure the Anypoint Connector for JMS (JMS Connector) Publish consume operation to publish and consume messages within two flows, and uses a correlation ID to ensure that the incoming messages are replies to a specific outgoing message.
JMS Connector Examples

Prerequisites (Setup MuleSoft (Mule 4), Maven 3.5 and JDK 1.8 on Windows OS)

We need Anypoint Studio, JDK and Maven for running below application on Mule 4 runtime or CloudHub instances.

You may go through my previous flow for setup MuleSoft along with Maven and JDK into local machine.

Mule 4 + MuleSoft as Microservices Architecture + HTTP Connector + Example

Create Mule project in Mule Studio

Now we will see how to create a new project in Mule Studio(Anypoint Studio).

Step 1. In Anypoint Studio, go to File -> New -> Mule Project
Step 2. Input Project Name: mule-synchronous-jms-example, Runtime is by default selected, tick on Use Maven; here the artifactId is automatically picked up from the Project Name:, the Group Id is picked up from the Default groupId for new projects and version is also a default value.
Step 3. Click Next and verify the JDK, mainly select Use default JRE(currently ‘jdk1.8.0_x’)
Step 4. Click on Next and click on Finish.

Maven File : pom.xml

We need to add following broker and client maven artifacts in pom.xml for JMS Connector example.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.mycompany</groupId>
	<artifactId>mule-synchronous-jms-example</artifactId>
	<version>1.0.0-SNAPSHOT</version>
	<packaging>mule-application</packaging>

	<name>mule-synchronous-jms-example</name>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<app.runtime>4.2.2</app.runtime>
		<mule.maven.plugin.version>3.3.5</mule.maven.plugin.version>
	</properties>

	<build>
		<plugins>
			<plugin>
				<groupId>org.mule.tools.maven</groupId>
				<artifactId>mule-maven-plugin</artifactId>
				<version>${mule.maven.plugin.version}</version>
				<extensions>true</extensions>
				<configuration>
				<sharedLibraries>
                        <sharedLibrary>
                            <groupId>org.apache.activemq</groupId>
                            <artifactId>activemq-broker</artifactId>
                        </sharedLibrary>
                    </sharedLibraries>
                </configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>org.mule.connectors</groupId>
			<artifactId>mule-http-connector</artifactId>
			<version>1.5.11</version>
			<classifier>mule-plugin</classifier>
		</dependency>
		<dependency>
			<groupId>org.mule.connectors</groupId>
			<artifactId>mule-sockets-connector</artifactId>
			<version>1.1.5</version>
			<classifier>mule-plugin</classifier>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.14.4</version>
			<type>jar</type>
		</dependency>
		<dependency>
			<groupId>org.mule.connectors</groupId>
			<artifactId>mule-jms-connector</artifactId>
			<version>1.6.3</version>
			<classifier>mule-plugin</classifier>
		</dependency>
		<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
            <version>5.14.4</version>
        </dependency>
    </dependencies>

	<repositories>
		<repository>
			<id>anypoint-exchange-v2</id>
			<name>Anypoint Exchange</name>
			<url>https://maven.anypoint.mulesoft.com/api/v2/maven</url>
			<layout>default</layout>
		</repository>
		<repository>
			<id>mulesoft-releases</id>
			<name>MuleSoft Releases Repository</name>
			<url>https://repository.mulesoft.org/releases/</url>
			<layout>default</layout>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<id>mulesoft-releases</id>
			<name>mulesoft release repository</name>
			<layout>default</layout>
			<url>https://repository.mulesoft.org/releases/</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</pluginRepository>
	</pluginRepositories>

</project>

Configure an ActiveMQ Connection

In the following example, we will configure an ActiveMQ Connection:

  1. In Studio, navigate to the Global Elements tab.
  2. Click Create.
  3. In the filter box type jms and select JMS Config.
  4. Click OK.
  5. In the Connection field select ActiveMQ Connection.
  6. In the Factory configuration field, select Edit Inline.
  7. Set the Broker url field value to the address of the broker to connect to, for example, tcp://localhost:61616.
  8. Click OK.
Configure ActiveMQ Connection

Note: Following maven dependencies need to be added in pom.xml

<dependency>
 <groupId>org.apache.activemq</groupId>
 <artifactId>activemq-client</artifactId>
 <version>5.14.4</version>
 <type>jar</type>
</dependency>
		
<dependency>
 <groupId>org.apache.activemq</groupId>
 <artifactId>activemq-broker</artifactId>
 <version>5.14.4</version>
</dependency>

Configure the In-Memory Broker

Also, for congifuring the In-Memory broker, we need to configure both the ActiveMQ KahaDB and ActiveMQ Broker libraries. The default URL is:

<jms:config name="JMS_Config" doc:name="JMS Config">
 <jms:active-mq-connection >
   <jms:factory-configuration brokerUrl="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
 </jms:active-mq-connection>
</jms:config>

Configure Message Redelivery

We need to configure below properties for enabling Message Redelivery in Global Element Properties.

Configure Message Redelivery

Configure SSL Connections

JMS Connector version 1.3.0 and later supports us to configure ActiveMQ connections with SSL configurations to establish secure and encrypted connections with the ActiveMQ broker. We need to configure below properties.

<!--  HTTP Requester Configuration -->
<http:request-config name="HTTP_Request_configuration">
  <http:request-connection tlsContext="TLS_Context" />
</http:request-config>

<!--  JMS Configuration -->
<jms:config name="JMS_Config">
 <jms:active-mq-connection tlsContext="TLS_Context"/>
</jms:config>

<!--  Reusable TLS Context -->
<tls:context name="TLS_Context">
 <tls:trust-store
  path="client.ts"
  password="password" />
  <tls:key-store
  path="client.ks"
  password="password"
  keyPassword="password"
  alias="client" />
</tls:context>

Publish and Consume Messages Within Flows

In flowing mule flows we are publishing and cosuming messages using correlation ID to ensure that the incoming messages are replies to a specific outgoing message.

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd">
	<http:listener-config name="HTTP_Listener_config">
		<http:listener-connection host="0.0.0.0" port="8088" />
	</http:listener-config>
	<jms:config name="JMS_Config_ActiveMQ">
		<jms:active-mq-connection username="admin" password="admin">
			<jms:factory-configuration brokerUrl="tcp://localhost:61616" />
		</jms:active-mq-connection>
	</jms:config>
	<jms:config name="JMS_Config_ActiveMQ_2">
		<jms:active-mq-connection username="admin" password="admin">
			<jms:factory-configuration brokerUrl="tcp://localhost:61616" />
		</jms:active-mq-connection>
	</jms:config>

	<flow name="jms-publish-consume-attributes_flow">
		<http:listener config-ref="HTTP_Listener_config" path="/publish-consume" />
		<jms:publish-consume destination="Queue1" config-ref="JMS_Config_ActiveMQ">
			<jms:message correlationId="0123456">
				<jms:reply-to destination="Queue2" />
			</jms:message>
			<jms:consume-configuration ackMode="IMMEDIATE" />
		</jms:publish-consume>
		<set-payload value="Message Consumed and Published to Queue2 with correlationId = 0123456 Successfully." doc:name="Set Payload" />
	</flow>
	<flow name="jms:publish-attributes_flow">
		<jms:listener doc:name="On New Message" config-ref="JMS_Config_ActiveMQ_2" destination="Queue1" />
		<jms:publish doc:name="Publish" config-ref="JMS_Config_ActiveMQ_2" destination="Queue2">
			<jms:message correlationId="#[attributes.headers.correlationId]" />
		</jms:publish>
	</flow>
</mule>

Download and Running the Active MQ broker

Download and Running the Active MQ broker
  • Extract the downloaded zip activemq in physical drive. Then open a command prompt and navigate to the <physical drive>apache-activemq-5.16.3 bin and execute the command activemq start
Running the Active MQ broker

Active MQ Broker Login Page (http://127.0.0.1:8161/admin)

Username: admin
Password: admin

Active MQ broker login page

Active MQ Broker Home Page (http://127.0.0.1:8161/admin/queues.jsp)

ActiveMQ Broker Queues

Running the application

Now do a right-click on the mule-synchronous-jms-example.xml file or on the mule project and click on Run As -> Mule Application. Then you will see something like below in Console when the application runs.

Running Mule4 Application
**********************************************************************
INFO  2021-09-12 22:19:21,277 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
**********************************************************************
* Started domain 'default'                                           *
**********************************************************************
INFO  2021-09-12 22:19:21,280 [WrapperListener_start_runner] org.mule.runtime.module.deployment.internal.ArtifactArchiveInstaller: Exploding a Mule artifact archive: 'D:\Software\AnypointStudio\plugins\org.mule.tooling.server.4.2.2.ee_7.3.5.202001031809\mule\apps\mule-synchronous-jms-example.jar'
INFO  2021-09-12 22:19:22,135 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ New app 'mule-synchronous-jms-example'                                       +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
INFO  2021-09-12 22:19:22,136 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Initializing app 'mule-synchronous-jms-example'                              +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Starting app 'mule-synchronous-jms-example'                                  +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
INFO  2021-09-12 22:19:30,382 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.internal.util.queue.QueueXaResourceManager: Starting ResourceManager
INFO  2021-09-12 22:19:30,383 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.internal.util.queue.QueueXaResourceManager: Started ResourceManager
INFO  2021-09-12 22:19:30,392 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.privileged.lifecycle.AbstractLifecycleManager: Starting Bean: org.mule.runtime.module.extension.internal.runtime.config.ConfigurationProviderToolingAdapter-HTTP_Listener_config
INFO  2021-09-12 22:19:30,416 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.privileged.lifecycle.AbstractLifecycleManager: Starting Bean: org.mule.runtime.module.extension.internal.runtime.config.ConfigurationProviderToolingAdapter-JMS_Config_ActiveMQ
INFO  2021-09-12 22:19:30,796 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.privileged.lifecycle.AbstractLifecycleManager: Starting Bean: org.mule.runtime.module.extension.internal.runtime.config.ConfigurationProviderToolingAdapter-JMS_Config_ActiveMQ_2
INFO  2021-09-12 22:19:30,807 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.internal.construct.FlowConstructLifecycleManager: Starting flow: jms-publish-consume-attributes_flow
INFO  2021-09-12 22:19:31,099 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.internal.construct.FlowConstructLifecycleManager: Starting flow: jms:publish-attributes_flow
INFO  2021-09-12 22:19:31,108 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.privileged.lifecycle.AbstractLifecycleManager: Starting Bean: listener
INFO  2021-09-12 22:19:31,110 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.privileged.lifecycle.AbstractLifecycleManager: Starting Bean: listener
INFO  2021-09-12 22:19:31,113 [WrapperListener_start_runner] [event: ] org.mule.runtime.core.internal.logging.LogUtil: 
**********************************************************************
* Application: mule-synchronous-jms-example                          *
* OS encoding: UTF-8, Mule encoding: UTF-8                           *
*                                                                    *
**********************************************************************
INFO  2021-09-12 22:19:31,114 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
**********************************************************************
* Started app 'mule-synchronous-jms-example'                         *
* Application plugins:                                               *
*  - JMS : 1.6.3                                                     *
*  - Sockets : 1.1.5                                                 *
*  - HTTP : 1.5.11                                                   *
* Application libraries:                                             *
*  - activemq-client-5.14.4.jar                                      *
*  - geronimo-j2ee-management_1.1_spec-1.0.1.jar                     *
*  - hawtbuf-1.11.jar                                                *
*  - slf4j-api-1.7.13.jar                                            *
*  - geronimo-jms_1.1_spec-1.1.1.jar                                 *
*  - activemq-openwire-legacy-5.14.4.jar                             *
*  - activemq-broker-5.14.4.jar                                      *
**********************************************************************
INFO  2021-09-12 22:19:31,202 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Mule is up and kicking (every 5000ms)                                        +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
INFO  2021-09-12 22:19:31,223 [WrapperListener_start_runner] org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@169459f{HTTP/1.1,[http/1.1]}{0.0.0.0:64528}
INFO  2021-09-12 22:19:31,226 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil: 
**********************************************************************
*              - - + DOMAIN + - -               * - - + STATUS + - - *
**********************************************************************
* default                                       * DEPLOYED           *
**********************************************************************

*******************************************************************************************************
*            - - + APPLICATION + - -            *       - - + DOMAIN + - -       * - - + STATUS + - - *
*******************************************************************************************************
* mule-synchronous-jms-example                  * default                        * DEPLOYED           *
*******************************************************************************************************

Active MQ Broker Home Page (http://127.0.0.1:8161/admin/queues.jsp)

Once, mule application comsumer will be connected to ActiveMQ broker successfully, we can see Queue1 under Queues tab on Active MQ Broker.

Test the application (http://localhost:8088/publish-consume)

Hit on browser : http://localhost:8088/publish-consume for triggering the Mule flow.

JMS Connector publish-consume flow

Application Logs

INFO  2021-09-12 22:33:00,536 [http.listener.01 SelectorRunner] org.mule.service.http.impl.service.util.DefaultRequestMatcherRegistry: No listener found for request: (GET)/favicon.ico
INFO  2021-09-12 22:33:00,545 [http.listener.01 SelectorRunner] org.mule.service.http.impl.service.util.DefaultRequestMatcherRegistry: Available listeners are: [([*])/publish-consume/]

Active MQ Broker Home Page (http://127.0.0.1:8161/admin/queues.jsp)

On Active MQ Broker Home Page under Queues tab, we can see Queue2 along with same Correlation ID = 0123456 (has been set in publish-consume operation in multiple flows). E.g.

<flow name="jms-publish-consume-attributes_flow">
		<http:listener config-ref="HTTP_Listener_config" path="/publish-consume" />
		<jms:publish-consume destination="Queue1" config-ref="JMS_Config_ActiveMQ">
			<jms:message correlationId="0123456">
				<jms:reply-to destination="Queue2" />
			</jms:message>
			<jms:consume-configuration ackMode="IMMEDIATE" />
		</jms:publish-consume>
		<set-payload value="Message Consumed and Published to Queue2 with correlationId = 0123456 Successfully." doc:name="Set Payload" />
</flow>
ActiveMQ publish-consume operation with correlation ID

Thanks for reading. Keep learning 🙂