Spring Boot JMS + ActiveMQ + Publish-Subscribe (Pub-Sub) Pattern + Example

June 8, 2021   2,229 Views

ActiveMQ spring boot, ActiveMQ spring, spring boot configure ActiveMQ, spring boot ActiveMQ

In this post, we will learn Asynchronous messaging with the help of the Spring Boot JMS Pub-Sub model. This post will focus on implementing JMS with Spring Boot , which doesn’t take long at all to setup.

JMS and message queues, in general, bring some certain advantages over using RESTful services such as:

What is Loose coupling?

The services do not interact directly and only know where the message queue is, where one service sends messages(Publisher) and the other(Subscribers) receives them. Subscribers/Publishers can be added/removed dynamically.

What is Asynchronous messaging?

As the processing time of the message cannot be guaranteed, the client that sent the message can carry on asynchronously to the completion of the transaction. Due to this, the queue should be used to write data (POST if you’re thinking in a RESTful mindset).

What is Redundancy?

A message must confirm that it has completed its transaction and that it can now be removed from the queue, but if the transaction fails it can be reprocessed. The messages can also be stored in a database allowing them to continue later on even if the server stops.

Why ActiveMQ-TOPIC?

ActiveMQ provides the Publish-Subscribe pattern (pub-sub) for building Jms message distributed systems.

How ActiveMQ-TOPIC works?

When you publish a message, all active subscribers will receive a copy of the message.

spring boot jms topic subscriber example - spring boot activemq topic example - activemq pub sub

ActiveMQ Broker

You may download ActiveMQ Broker from: https://activemq.apache.org/activemq-5013000-release

ActiveMQ Broker - spring boot activemq topic example - spring boot embedded activemq

Once ActiveMQ is started successfully, open the ActiveMQ Admin URL http://localhost:8161/admin/

You can access Subscribers from its tab

spring boot activemq topic example - activemq topic subscriber example

Spring Boot JMS Implementation

Spring Boot Producer: SpringActiveMqTopicProducer

In this application, we will publish a message on a topic.

Project Structure

The project structure would be as below.

Project Structure

Maven File (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example.activemq</groupId>
	<artifactId>SpringActiveMqTopicProducer</artifactId>
	<version>0.0.1</version>
	<packaging>jar</packaging>

	<name>SpringActiveMqTopicProducer</name>
	<description>SpringActiveMqTopicProducer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.4</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
		
		<dependency>
		         <groupId>com.fasterxml.jackson.core</groupId>
		         <artifactId>jackson-databind</artifactId>
		</dependency>
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20210307</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Application Configuration (application.properties)

jsa.activemq.broker.url=tcp://localhost:61616
jsa.activemq.borker.username=admin
jsa.activemq.borker.password=admin
jsa.activemq.topic=jsa-topic
spring.jms.pub-sub-domain=true

Application Runner: SpringActiveMqTopicProducerApplication

run method has been overridden in this java class as below for publishing messages to the topic ‘jsa-topic’

package com.the.basic.tech.info.activemq;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.the.basic.tech.info.activemq.jms.JmsPublisher;
import com.the.basic.tech.info.activemq.models.Company;
import com.the.basic.tech.info.activemq.models.Product;

@SpringBootApplication
public class SpringActiveMqTopicProducerApplication implements CommandLineRunner {

	@Autowired
	JmsPublisher publisher;

	public static void main(String[] args) {
		SpringApplication.run(SpringActiveMqTopicProducerApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		/*
		 * Apple company & products
		 */
		// initial company and products
		Product iphone7 = new Product("Iphone 7");
		Product iPadPro = new Product("IPadPro");

		List<Product> appleProducts = new ArrayList<Product>(Arrays.asList(iphone7, iPadPro));

		Company apple = new Company("Apple", appleProducts);

		// send message to ActiveMQ
		publisher.send(apple);

		/*
		 * Samsung company and products
		 */
		Product galaxyS8 = new Product("Galaxy S8");
		Product gearS3 = new Product("Gear S3");

		List<Product> samsungProducts = new ArrayList<Product>(Arrays.asList(galaxyS8, gearS3));

		Company samsung = new Company("Samsung", samsungProducts);

		/*
		 * send message to ActiveMQ
		 */
		publisher.send(samsung);
	}
}

ConnectionFactoryConfig

package com.the.basic.tech.info.activemq.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
@EnableJms
public class ConnectionFactoryConfig {
	@Value("${jsa.activemq.broker.url}")
	String brokerUrl;
	
	@Value("${jsa.activemq.borker.username}")
	String userName;
	
	@Value("${jsa.activemq.borker.password}")
	String password;

	/*
	 * Initial ConnectionFactory
	 */
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    
	@Bean // Serialize message content to json using TextMessage
	public MessageConverter jacksonJmsMessageConverter() {
	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
	    converter.setTargetType(MessageType.TEXT);
	    converter.setTypeIdPropertyName("_type");
	    return converter;
	}
    
    /*
     * Used for sending Messages.
     */
	@Bean
	public JmsTemplate jmsTemplate(){
	    JmsTemplate template = new JmsTemplate();
	    template.setConnectionFactory(connectionFactory());
	    template.setMessageConverter(jacksonJmsMessageConverter());
	    template.setPubSubDomain(true);
	    return template;
	}
}

Model Layer

Company.java

package com.the.basic.tech.info.activemq.models;

import java.util.List;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
 
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Company.class)
public class Company {
    private String name;
 
    private List<Product> products;
	
    public Company(){
    }
    
    public Company(String name, List<Product> products){
    	this.name = name;
    	this.products = products;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setProducts(List<Product> products){
    	this.products = products;
    }
    
    public List<Product> getProducts(){
    	return this.products;
    }
 
	/**
	 * 
	 * Show Detail View
	 */
	public String toString(){
		JSONObject jsonInfo = new JSONObject();
		
		try {
			jsonInfo.put("name", this.name);
 
			JSONArray productArray = new JSONArray();
			if (this.products != null) {
				this.products.forEach(product -> {
					JSONObject subJson = new JSONObject();
					try {
						subJson.put("name", product.getName());
					} catch (JSONException e) {}
					
					productArray.put(subJson);
				});
			}
			jsonInfo.put("products", productArray);
		} catch (JSONException e1) {}
		return jsonInfo.toString();
	}
 
}

Product.java

package com.the.basic.tech.info.activemq.models;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class)
public class Product {
    private String name;
    
    private Company company;
	
    public Product(){
    }
    
    public Product(String name){
    	this.name = name;
    }
    
    public Product(String name, Company company){
    	this.name = name;
    	this.company = company;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setCompany(Company company){
    	this.company = company;
    }
    
    public Company getCompany(){
    	return this.company;
    }
}

JMS Publisher (JmsPublisher.java)

For publishing messages to the ActiveMQ topic

package com.the.basic.tech.info.activemq.jms;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.the.basic.tech.info.activemq.models.Company;

@Component
public class JmsPublisher {
	private static final Logger logger = LoggerFactory.getLogger(JmsPublisher.class);
	@Autowired
	JmsTemplate jmsTemplate;
	
	@Value("${jsa.activemq.topic}")
	String topic;
	
	public void send(Company apple){
		jmsTemplate.convertAndSend(topic, apple);
		logger.info("Message : {} published to topic: {} successfully.", apple.toString(), topic);
		
	}
}

Spring Boot Consumer: SpringActiveMqTopicConsumer

In this application, we will consume a message from the topic.

Project Structure

The project structure would be like as below.

Maven File (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example.activemq</groupId>
	<artifactId>SpringActiveMqTopicConsumer</artifactId>
	<version>0.0.1</version>
	<packaging>jar</packaging>

	<name>SpringActiveMqTopicConsumer</name>
	<description>SpringActiveMqTopicConsumer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.4</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>

		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20210307</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Application Configuration (application.properties)

jsa.activemq.broker.url=tcp://localhost:61616
jsa.activemq.borker.username=admin
jsa.activemq.borker.password=admin
jsa.activemq.topic=jsa-topic
spring.jms.pub-sub-domain=true

Application Runner: SpringActiveMqTopicConsumerApplication

package com.the.basic.tech.info.activemq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringActiveMqTopicConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringActiveMqTopicConsumerApplication.class, args);
	}
}

ConnectionFactoryConfig

package com.the.basic.tech.info.activemq.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
public class ConnectionFactoryConfig {
	@Value("${jsa.activemq.broker.url}")
	String brokerUrl;
	
	@Value("${jsa.activemq.borker.username}")
	String userName;
	
	@Value("${jsa.activemq.borker.password}")
	String password;

	/*
	 * Initial ConnectionFactory
	 */
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    
	@Bean // Serialize message content to json using TextMessage
	public MessageConverter jacksonJmsMessageConverter() {
	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
	    converter.setTargetType(MessageType.TEXT);
	    converter.setTypeIdPropertyName("_type");
	    return converter;
	}
    
	@Bean
	public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
	                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
	    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	    factory.setPubSubDomain(true);
	    factory.setMessageConverter(jacksonJmsMessageConverter());
	    configurer.configure(factory, connectionFactory);
	    return factory;
	}
}

Model Layer

Company.java

package com.the.basic.tech.info.activemq.models;

import java.util.List;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;

@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Company.class)
public class Company {
	private String name;
	private List<Product> products;

	public Company() {
	}

	public Company(String name, List<Product> products) {
		this.name = name;
		this.products = products;
	}

	// name
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	// products
	public void setProducts(List<Product> products) {
		this.products = products;
	}

	public List<Product> getProducts() {
		return this.products;
	}

	@Override
	public String toString() {
		return "Company [name=" + name + ", products=" + products + "]";
	}

}

Product.java

package com.the.basic.tech.info.activemq.models;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class)
public class Product {
    private String name;    
    @Override
	public String toString() {
		return "Product [name=" + name + ", company=" + company + "]";
	}

	private Company company;
	
    public Product(){
    }
    
    public Product(String name){
    	this.name = name;
    }
    
    public Product(String name, Company company){
    	this.name = name;
    	this.company = company;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setCompany(Company company){
    	this.company = company;
    }
    
    public Company getCompany(){
    	return this.company;
    }
}

JMS Subcriber(JmsSubcriber.java)

For consuming messages from ActiveMQ topic

package com.the.basic.tech.info.activemq.jms;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import com.the.basic.tech.info.activemq.models.Company;


@Component
public class JmsSubcriber {
	private static final Logger logger = LoggerFactory.getLogger(JmsSubcriber.class);
	
	@JmsListener(destination = "${jsa.activemq.topic}")
	public void receive(Company msg){
		logger.info("Recieved Message:  {}", msg.toString());
	}
}

Spring Boot JMS: Application Logs

JmsPublisher Logs:

2021-06-08 20:52:36.628  INFO 20636 --- [           main] a.SpringActiveMqTopicProducerApplication : No active profile set, falling back to default profiles: default
2021-06-08 20:52:38.079  INFO 20636 --- [           main] a.SpringActiveMqTopicProducerApplication : Started SpringActiveMqTopicProducerApplication in 2.611 seconds (JVM running for 3.531)
2021-06-08 20:52:38.671  INFO 20636 --- [           main] c.t.b.t.info.activemq.jms.JmsPublisher   : Message : {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]} published to topic: jsa-topic successfully.
2021-06-08 20:52:38.703  INFO 20636 --- [           main] c.t.b.t.info.activemq.jms.JmsPublisher   : Message : {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]} published to topic: jsa-topic successfully.

Jms Subscriber Logs:

2021-06-08 20:54:13.700  INFO 2864 --- [           main] a.SpringActiveMqTopicConsumerApplication : No active profile set, falling back to default profiles: default
2021-06-08 20:54:15.137  INFO 2864 --- [           main] a.SpringActiveMqTopicConsumerApplication : Started SpringActiveMqTopicConsumerApplication in 2.208 seconds (JVM running for 2.93)
2021-06-08 20:54:33.900  INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber   : Recieved Message:  Company [name=Apple, products=[Product [name=Iphone 7, company=null], Product [name=IPadPro, company=null]]]
2021-06-08 20:54:33.916  INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber   : Recieved Message:  Company [name=Samsung, products=[Product [name=Galaxy S8, company=null], Product [name=Gear S3, company=null]]]

Download Source Code (Attached)

Have a great day ahead 🙂