Red Hat JBoss Fuse ESB + Apache Camel + Aggregate EIP + MySQL + OSGI Blueprint

In this blog we will implement Apache Camel Enterprise Integration Patterns Aggregate. It concatenates number of messages together into a single message.

In this example we will insert multiple records in MySQL database in batch using Aggregation Strategy.
An AggregationStrategy is used to combine old exchange (lookup by its correlation id) and new exchange into a single exchange. We can use multiple options in AggregationStrategy for customizing it as per our requirements. In this working example we will use following options.

completionInterval – Time in milliseconds by which the aggregator will complete all current aggregated exchanges. We can’t use this option with completionTimeout, only one of them can be used at a time.
completionSize
– Number of messages aggregated exchanges before the aggregation is complete. E.g. completionSize=5. It means 5 messages/exchanges will be aggregated in to single message before the insert operation in DB.

Note: It’s basically the first which triggers that wins – for example if completionSize triggers first with 5 messages then aggregator will combine the exchanges and completionInterval (30 seconds) option will be skipped. I mean it will not wait for 30 seconds for combining the other exchanges.

Application Architecture Overview

  • In this working example we are integrating Apache Camel with MySQL database.
  • We have insert and select two operations along with unmashalling and marshalling operations.
  • For inserting records in batch/bulk in MySQL database, here we are going to implement AggregationStrategy with completionSize and completionInterval options.

An AggregationStrategy is used to combine old exchange (lookup by its correlation id) and new exchange into a single exchange.

Apache Camel Enterprise Integration Patterns Aggregator, Aggregation, AggregationStrategy Architecture

Technology Stack

Java 8
jboss-fuse-6.3.0.redhat-187
Apache Camel (2.17.0.redhat-630187)
OSGI Blueprint DSL
MySQL 8.0
MySQL Connector 8.0.22
Apache ActiveMQ
Camel-Sql
Camel-JaxB
Camel-Jackson
Jasypt
XPath
Enterprise Integration Patterns - Aggregate, Aggregator, AggregationStrategy

Project Structure

High level project structure would be looked like as below.

Basic Configurations and Files

The basic architecture has been covered in my previous blog with source code. Reference Link: Red Hat JBoss Fuse ESB + Apache Camel + MySQL + OSGI Blueprint DSL

Only, Enterprise Integration Patterns (Aggregate) added in this Blog.

Implement CustomAggregationStrategy

We need to implement AggregationStrategy interface for customizing the aggregate() method as below.

package com.the.basic.tech.info.processor;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;

public class CustomAggregationStrategy implements AggregationStrategy {

	private static Logger logger = LoggerFactory.getLogger(CustomAggregationStrategy.class);

	@SuppressWarnings("unchecked")
	@Override
	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

		logger.info("Inside aggregate method.");
		Map<String, Object> newHeader = newExchange.getIn().getHeaders();
		ArrayList<Object> list = null;

		if (oldExchange == null) {
			list = new ArrayList<Object>();
			list.add(newHeader);
			newExchange.getIn().setBody(list);
			logger.debug("headers map for oldExchange is null - {}", list);
			return newExchange;
		} else {
			list = oldExchange.getIn().getBody(ArrayList.class);
			list.add(newHeader);
			logger.debug("headers map for else oldExchange - {}", list);
			return oldExchange;
		}

	}

}

Define AggregationStrategy bean

Following bean we need to add in camel-beans.xml file. So, it can be referenced in our camel-context.xml file under route.

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:camel="http://camel.apache.org/schema/blueprint" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" xmlns:enc="http://karaf.apache.org/xmlns/jasypt/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd        http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
    <cm:property-placeholder id="camel.blueprint.mysql.bundle" persistent-id="camel.blueprint.mysql.bundle" update-strategy="reload"/>
    <cm:property-placeholder id="camel.blueprint.mysql.bundle.enc" persistent-id="camel.blueprint.mysql.bundle.enc" placeholder-prefix="$[" placeholder-suffix="]" update-strategy="reload"/>
    <enc:property-placeholder>
        <enc:encryptor class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor ">
            <property name="config">
                <bean class="org.jasypt.encryption.pbe.config.EnvironmentStringPBEConfig">
                    <property name="algorithm" value="PBEWITHHMACSHA512ANDAES_256"/>
                    <property name="passwordEnvName" value="ENCRYPTION_PASSWORD"/>
                </bean>
            </property>
        </enc:encryptor>
    </enc:property-placeholder>
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		<property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
		<property name="url" value="${db.url}" />
		<property name="username" value="${db.username}" />
		<property name="password" value="$[db.password]" />
	</bean>

	<bean id="sqlComponent" class="org.apache.camel.component.sql.SqlComponent">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="activemq-connection-factory">
        <property name="brokerURL" value="${broker.url}"/>
        <property name="userName" value="${broker.username}"/>
        <property name="password" value="$[broker.password]"/>
    </bean>
    <bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" id="pooledConnectionFactory" init-method="start">
        <property name="maxConnections" value="1"/>
        <property name="blockIfSessionPoolIsFull" value="true"/>
        <property name="createConnectionOnStartup" value="true"/>
        <property name="connectionFactory" ref="activemq-connection-factory"/>
    </bean>
    <bean class="org.apache.camel.component.jms.JmsConfiguration" id="jmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
        <property name="maxConcurrentConsumers" value="60"/>
    </bean>
    <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="activemq">
        <property name="configuration" ref="jmsConfiguration"/>
    </bean>
    <bean id= "aggregationStrategy" class="com.the.basic.tech.info.processor.CustomAggregationStrategy"></bean>
</blueprint>

Camel Route : _aggregateRecordsRoute

We need to add camel route in camel-context file for aggregating the exchanges into single exchange on the basis of completionInterval and completionSize options.

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:prop="http://camel.apache.org/schema/placeholder"
	xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd                            http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
	<camelContext id="fuse-apache-camel-mysql-integration-context" xmlns="http://camel.apache.org/schema/blueprint">
		<dataFormats>
			<jaxb contextPath="com.the.basic.tech.info.jaxb.beans" id="jaxData" />
			<json enableJaxbAnnotationModule="true" id="jack" library="Jackson" prettyPrint="true" unmarshalTypeName="com.the.basic.tech.info.beans.pojo.Employees" />
		</dataFormats>
		<onException>
			<exception>java.lang.Exception</exception>
			<handled>
				<constant>true</constant>
			</handled>
			<log message="Exception while processing Message. Preparing Error response. \n ${exception.stacktrace}" />
		</onException>
		<route id="_inputQueue" streamCache="true">
			<from id="listenToIncomingMessage" uri="activemq:com.the.basic.tech.info.inputQueue" />
			<log id="logToIncomingMessage" message="Input Message Body :: ${body}" />
			<setHeader headerName="Correlationid" id="_setHeaderCorrelationid">
				<simple>${header.JMSTimestamp}</simple>
			</setHeader>
			<setHeader headerName="EmpId" id="_setHeaderEmpId">
				<xpath resultType="String">/employees/employee/id</xpath>
			</setHeader>
			<setHeader headerName="EmpName" id="_setHeaderEmpName">
				<xpath resultType="String">/employees/employee/name</xpath>
			</setHeader>
			<setHeader headerName="XmlPayload" id="_setHeaderXmlPayload">
				<simple>${body}</simple>
			</setHeader>
			<unmarshal id="_unmarshal1" ref="jaxData" />
			<to id="toXmlToJson" uri="direct:xmlToJSON" />
			<to id="aggregateRecordsRoute" uri="seda:aggregateRecords" />
			<to id="selectDbRecordsRoute" uri="direct:selectDbRecords" />
		</route>
		<route id="_xmlToJSON">
			<from id="_from2" uri="direct:xmlToJSON" />
			<marshal id="_marshal1" ref="jack" />
			<convertBodyTo id="_convertBodyTo1" type="String" />
			<setHeader headerName="JsonPayload" id="_setHeaderJsonPayload">
				<simple>${body}</simple>
			</setHeader>
			<log id="_logXmlToJson" message="Xml to JSON is :: \n${body}" />
		</route>
		<route id="_aggregateRecordsRoute">
			<from id="_fromAggregateRecords" uri="seda:aggregateRecords" />
			<log id="_logAggregateRecords" message="Aggregation Records route called, completionInterval-{{completionInterval}}, completionSize-{{completionSize}} " />
			<aggregate id="_aggregate1" prop:completionInterval="{{completionInterval}}" prop:completionSize="{{completionSize}}" strategyRef="aggregationStrategy">
				<correlationExpression>
					<constant>true</constant>
				</correlationExpression>
				<log id="_logAggregateRecordsAfter" message="After Aaggregation Strategy....!!!" />
				<to id="_toInsertDbRecordsRoute" uri="seda:insertDbRecords" />
			</aggregate>
		</route>
		<route id="_insertDbRecordsRoute">
			<from id="_fromInsertDbRecords" uri="seda:insertDbRecords" />
			<toD id="insertMessageIntoDB" uri="sqlComponent:{{sql.insertEmployee}}?batch=true" />
			<log id="_logMessage" message="Record(s) has been inserted successfully." />
		</route>
		<route id="_selectDbRecordsRoute">
			<from id="_fromSelectDbRecords" uri="direct:selectDbRecords" />
			<toD id="selectMessageFromDB" uri="sqlComponent:{{sql.getAllEmployees}}" />
			<log id="_logSelectRecords" message="Record(s) have been fetched successfully. Records :: \n${body}" />
		</route>

	</camelContext>
</blueprint>

completionInterval and completionSize config parameters

We can make completionInterval and completionSize parameters to config driven by adding them into camel.blueprint.mysql.bundle.cfg file

Note: Please ensure that these updated file must be present into the /etc/ directory for Red Hat JBoss Fuse ESB.

#ActiveMQ Broker

broker.url=tcp://localhost:61616
broker.username=admin

#Database
db.url=jdbc:mysql://localhost:3306/bootdb?useUnicode=true;characterEncoding=UTF-8;zeroDateTimeBehavior=convertToNull;serverTimezone=GMT
db.username=root

#Database Query
#CREATE TABLE employees (empId VARCHAR(10) NOT NULL, empName VARCHAR(100) NOT NULL, xmlPayload VARCHAR(1000), jsonPayload VARCHAR(1000), created timestamp, correlationid VARCHAR(200));
sql.insertEmployee=INSERT INTO employees(EmpId, EmpName, XmlPayload, JsonPayload, Created, Correlationid) VALUES (:#EmpId, :#EmpName, :#XmlPayload, :#JsonPayload, CURRENT_TIMESTAMP, :#Correlationid)
sql.getAllEmployees=select * from employees

#30 seconds completionInterval
completionInterval=30000

#5 messages completionSize
completionSize=5

Note: There is no any change in Maven File pom.xml and other Marshalling and Unmarshalling java classes and camel routes.

Application Testing & Console Logs

We can monitor application/bundle logs in fuse.log (D:/Software/jboss-fuse-6.3.0.redhat-187/data/log/fuse.log)

Build the bundle

Following command to be run for build our fuse bundle.

D:\localworkspace\fuse-apache-camel-mysql-integration>mvn clean install
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Camel Blueprint MySQL Quickstart 1.0.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ camel-blueprint-mysql ---
[INFO] Deleting D:\localworkspace\fuse-apache-camel-mysql-integration\target
[INFO]
[INFO] --- maven-resources-plugin:3.0.1:resources (default-resources) @ camel-blueprint-mysql ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 8 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ camel-blueprint-mysql ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 5 source files to D:\localworkspace\fuse-apache-camel-mysql-integration\target\classes
[INFO]
[INFO] --- maven-resources-plugin:3.0.1:testResources (default-testResources) @ camel-blueprint-mysql ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory D:\localworkspace\fuse-apache-camel-mysql-integration\src\test\resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:testCompile (default-testCompile) @ camel-blueprint-mysql ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.19.1:test (default-test) @ camel-blueprint-mysql ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-bundle-plugin:3.2.0:bundle (default-bundle) @ camel-blueprint-mysql ---
[INFO]
[INFO] --- maven-install-plugin:2.4:install (default-install) @ camel-blueprint-mysql ---
[INFO] Installing D:\localworkspace\fuse-apache-camel-mysql-integration\target\camel-blueprint-mysql-1.0.0-SNAPSHOT.jar to C:\Users\.m2\repository\com\the\basic\tech\info\camel-blueprint-mysql\1.0.0-SNAPSHOT\camel-blueprint-mysql-1.0.0-SNAPSHOT.jar
[INFO] Installing D:\localworkspace\fuse-apache-camel-mysql-integration\pom.xml to C:\Users\.m2\repository\com\the\basic\tech\info\camel-blueprint-mysql\1.0.0-SNAPSHOT\camel-blueprint-mysql-1.0.0-SNAPSHOT.pom
[INFO]
[INFO] --- maven-bundle-plugin:3.2.0:install (default-install) @ camel-blueprint-mysql ---
[INFO] Installing com/the/basic/tech/info/camel-blueprint-mysql/1.0.0-SNAPSHOT/camel-blueprint-mysql-1.0.0-SNAPSHOT.jar
[INFO] Writing OBR metadata
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.086 s
[INFO] Finished at: 2021-07-26T16:57:17+05:30
[INFO] Final Memory: 33M/251M
[INFO] ------------------------------------------------------------------------

D:\localworkspace\fuse-apache-camel-mysql-integration>

Update the bundle

We need to update the bundle for reflecting the changes in apache karaf.

Just list the bundles using list command on karaf command prompt: JBossFuse:karaf@root> list

You will see your bundle ID under list. E.g.

[ 315] [Active     ] [Created     ] [       ] [   80] Camel Blueprint MySQL Example [fuse-apache-camel-mysql-integration] (1.0.0.SNAPSHOT)

You can update bundle now, executing below command.

JBossFuse:karaf@root> update 315

If it is updated successfully, we can see updated routes information in fuse logs.

Publish inputMessage.xml payload in com.the.basic.tech.info.inputQueue

For the testing, how does Aggregation Strategy (Aggregator) works we need to publish N-Number of messages. E.g. I have published 5 messages in bulk in following format.

<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>
Red Hat JBoss Fuse admin console
Red Hat JBoss Fuse admin console ActiveMQ

fuse/bundle logs (/jboss-fuse-6.3.0.redhat-187/data/log/fuse.log)

2021-07-26 14:05:01,819 | INFO  | l Console Thread | DefaultRuntimeEndpointRegistry   | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2021-07-26 14:05:01,878 | INFO  | l Console Thread | SedaEndpoint                     | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Endpoint Endpoint[seda://aggregateRecords] is using shared queue: seda://aggregateRecords with size: 2147483647
2021-07-26 14:05:01,931 | INFO  | l Console Thread | SedaEndpoint                     | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Endpoint Endpoint[seda://insertDbRecords] is using shared queue: seda://insertDbRecords with size: 2147483647
2021-07-26 14:05:01,949 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
2021-07-26 14:05:01,951 | INFO  | l Console Thread | DefaultStreamCachingStrategy     | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | StreamCaching in use with spool directory: D:\Software\jboss-fuse-6.3.0.redhat-187\bin\..\data\tmp\camel\camel-tmp-d5023927-367d-4e13-a589-0ba352f3e9a9 and rules: [Spool > 128K body size]
2021-07-26 14:05:02,009 | INFO  | l Console Thread | AggregateProcessor               | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Defaulting to MemoryAggregationRepository
2021-07-26 14:05:02,011 | INFO  | l Console Thread | AggregateProcessor               | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Using CompletionInterval to run every 30000 millis.
2021-07-26 14:05:02,072 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Route: _inputQueue started and consuming from: Endpoint[activemq://com.the.basic.tech.info.inputQueue]
2021-07-26 14:05:02,077 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Route: _xmlToJSON started and consuming from: Endpoint[direct://xmlToJSON]
2021-07-26 14:05:02,087 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Route: _aggregateRecordsRoute started and consuming from: Endpoint[seda://aggregateRecords]
2021-07-26 14:05:02,099 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Route: _insertDbRecordsRoute started and consuming from: Endpoint[seda://insertDbRecords]
2021-07-26 14:05:02,100 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Route: _selectDbRecordsRoute started and consuming from: Endpoint[direct://selectDbRecords]
2021-07-26 14:05:02,100 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Total 5 routes, of which 5 are started.
2021-07-26 14:05:02,100 | INFO  | l Console Thread | BlueprintCamelContext            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Apache Camel 2.17.0.redhat-630187 (CamelContext: fuse-apache-camel-mysql-integration-context) started in 0.302 seconds
2021-07-26 14:05:22,509 | INFO  | qtp50593884-77   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq started
2021-07-26 14:05:22,532 | INFO  | info.inputQueue] | _inputQueue                      | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Input Message Body :: <?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10012</id>
      <name>Mark</name>
   </employee>
</employees>




2021-07-26 14:05:22,539 | INFO  | qtp50593884-77   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq stopped
2021-07-26 14:05:22,563 | INFO  | info.inputQueue] | _xmlToJSON                       | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Xml to JSON is ::
{
  "employee" : {
    "id" : 10012,
    "name" : "Mark"
  }
}
2021-07-26 14:05:22,568 | INFO  | aggregateRecords | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Aggregation Records route called, completionInterval-30000, completionSize-5
2021-07-26 14:05:22,577 | INFO  | aggregateRecords | CustomAggregationStrategy        | 315 - fuse-apache-camel-mysql-integration - 1.0.0.SNAPSHOT | Inside aggregate method.
2021-07-26 14:05:22,667 | INFO  | info.inputQueue] | _selectDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) have been fetched successfully. Records ::
[{empId=10001, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>, jsonPayload={
  "employee" : {
    "id" : 10001,
    "name" : "Mark"
  }
}, created=2021-07-24 23:33:45.0, correlationid=1627149825593}]
2021-07-26 14:05:32,016 | INFO  | teTimeoutChecker | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | After Aaggregation Strategy....!!!
2021-07-26 14:05:32,092 | INFO  | /insertDbRecords | _insertDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) has been inserted successfully.
2021-07-26 14:05:33,071 | INFO  | qtp50593884-77   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq started
2021-07-26 14:05:33,105 | INFO  | info.inputQueue] | _inputQueue                      | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Input Message Body :: <?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10013</id>
      <name>Mark</name>
   </employee>
</employees>
2021-07-26 14:05:33,112 | INFO  | qtp50593884-77   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq stopped
2021-07-26 14:05:33,130 | INFO  | info.inputQueue] | _xmlToJSON                       | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Xml to JSON is ::
{
  "employee" : {
    "id" : 10013,
    "name" : "Mark"
  }
}
2021-07-26 14:05:33,138 | INFO  | aggregateRecords | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Aggregation Records route called, completionInterval-30000, completionSize-5
2021-07-26 14:05:33,138 | INFO  | aggregateRecords | CustomAggregationStrategy        | 315 - fuse-apache-camel-mysql-integration - 1.0.0.SNAPSHOT | Inside aggregate method.
2021-07-26 14:05:33,144 | INFO  | info.inputQueue] | _selectDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) have been fetched successfully. Records ::
[{empId=10001, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>, jsonPayload={
  "employee" : {
    "id" : 10001,
    "name" : "Mark"
  }
}, created=2021-07-24 23:33:45.0, correlationid=1627149825593}, {empId=10012, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10012</id>
      <name>Mark</name>
   </employee>
</employees>



, jsonPayload={
  "employee" : {
    "id" : 10012,
    "name" : "Mark"
  }
}, created=2021-07-26 14:05:32.0, correlationid=1627288522528}]
2021-07-26 14:05:40,601 | INFO  | qtp50593884-79   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq started
2021-07-26 14:05:40,625 | INFO  | info.inputQueue] | _inputQueue                      | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Input Message Body :: <?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10014</id>
      <name>Mark</name>
   </employee>
</employees>
2021-07-26 14:05:40,629 | INFO  | qtp50593884-79   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq stopped
2021-07-26 14:05:40,642 | INFO  | info.inputQueue] | _xmlToJSON                       | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Xml to JSON is ::
{
  "employee" : {
    "id" : 10014,
    "name" : "Mark"
  }
}
2021-07-26 14:05:40,648 | INFO  | aggregateRecords | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Aggregation Records route called, completionInterval-30000, completionSize-5
2021-07-26 14:05:40,648 | INFO  | aggregateRecords | CustomAggregationStrategy        | 315 - fuse-apache-camel-mysql-integration - 1.0.0.SNAPSHOT | Inside aggregate method.
2021-07-26 14:05:40,654 | INFO  | info.inputQueue] | _selectDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) have been fetched successfully. Records ::
[{empId=10001, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>, jsonPayload={
  "employee" : {
    "id" : 10001,
    "name" : "Mark"
  }
}, created=2021-07-24 23:33:45.0, correlationid=1627149825593}, {empId=10012, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10012</id>
      <name>Mark</name>
   </employee>
</employees>



, jsonPayload={
  "employee" : {
    "id" : 10012,
    "name" : "Mark"
  }
}, created=2021-07-26 14:05:32.0, correlationid=1627288522528}]
2021-07-26 14:05:48,387 | INFO  | qtp50593884-82   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq started
2021-07-26 14:05:48,413 | INFO  | info.inputQueue] | _inputQueue                      | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Input Message Body :: <?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10015</id>
      <name>Mark</name>
   </employee>
</employees>
2021-07-26 14:05:48,423 | INFO  | qtp50593884-82   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq stopped
2021-07-26 14:05:48,427 | INFO  | info.inputQueue] | _xmlToJSON                       | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Xml to JSON is ::
{
  "employee" : {
    "id" : 10015,
    "name" : "Mark"
  }
}
2021-07-26 14:05:48,432 | INFO  | info.inputQueue] | _selectDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) have been fetched successfully. Records ::
[{empId=10001, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>, jsonPayload={
  "employee" : {
    "id" : 10001,
    "name" : "Mark"
  }
}, created=2021-07-24 23:33:45.0, correlationid=1627149825593}, {empId=10012, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10012</id>
      <name>Mark</name>
   </employee>
</employees>



, jsonPayload={
  "employee" : {
    "id" : 10012,
    "name" : "Mark"
  }
}, created=2021-07-26 14:05:32.0, correlationid=1627288522528}]
2021-07-26 14:05:48,434 | INFO  | aggregateRecords | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Aggregation Records route called, completionInterval-30000, completionSize-5
2021-07-26 14:05:48,434 | INFO  | aggregateRecords | CustomAggregationStrategy        | 315 - fuse-apache-camel-mysql-integration - 1.0.0.SNAPSHOT | Inside aggregate method.
2021-07-26 14:05:56,151 | INFO  | qtp50593884-82   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq started
2021-07-26 14:05:56,192 | INFO  | info.inputQueue] | _inputQueue                      | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Input Message Body :: <?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10016</id>
      <name>Mark</name>
   </employee>
</employees>
2021-07-26 14:05:56,194 | INFO  | qtp50593884-82   | TransportConnector               | 219 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630187 | Connector vm://amq stopped
2021-07-26 14:05:56,198 | INFO  | info.inputQueue] | _xmlToJSON                       | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Xml to JSON is ::
{
  "employee" : {
    "id" : 10016,
    "name" : "Mark"
  }
}
2021-07-26 14:05:56,201 | INFO  | aggregateRecords | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Aggregation Records route called, completionInterval-30000, completionSize-5
2021-07-26 14:05:56,203 | INFO  | aggregateRecords | CustomAggregationStrategy        | 315 - fuse-apache-camel-mysql-integration - 1.0.0.SNAPSHOT | Inside aggregate method.
2021-07-26 14:05:56,204 | INFO  | info.inputQueue] | _selectDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) have been fetched successfully. Records ::
[{empId=10001, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10001</id>
      <name>Mark</name>
   </employee>
</employees>, jsonPayload={
  "employee" : {
    "id" : 10001,
    "name" : "Mark"
  }
}, created=2021-07-24 23:33:45.0, correlationid=1627149825593}, {empId=10012, empName=Mark, xmlPayload=<?xml version="1.0" encoding="UTF-8"?>
<employees>
   <employee>
      <id>10012</id>
      <name>Mark</name>
   </employee>
</employees>



, jsonPayload={
  "employee" : {
    "id" : 10012,
    "name" : "Mark"
  }
}, created=2021-07-26 14:05:32.0, correlationid=1627288522528}]
2021-07-26 14:06:02,015 | INFO  | teTimeoutChecker | _aggregateRecordsRoute           | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | After Aaggregation Strategy....!!!
2021-07-26 14:06:02,050 | INFO  | /insertDbRecords | _insertDbRecordsRoute            | 232 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Record(s) has been inserted successfully.

Conclusion : Aggregation Strategy

Here in logs we can see that inputMessage.xml requests are marshal and unmarshal successfully. Then aggregation strategy has been excuted and exchanges are aggregated in to single exchange before insertion in MySQL database.

Aggregation Strategy

Download the Source Code (Attached)

Have a great day. Keep smiling 🙂