Making IBM MQ talk to Kafka (IBM Event Stream)
IBM MQ is a messaging platform that has been around for more than 25 years. Over the years, it has become a de-facto standard that provides asynchronous and reliable communication between applications for organizations — in particularly in the financial industry. (here)
Apache Kafka is an open-source stream processing platform developed by LinkedIn. It provides a high-throughput and low-latency ‘messaging’ platform. In recent years, Kafka is gaining popularity when organizations are modernization their applications — as their choice of asynchronous communication between microservices. IBM Event Streams is IBM’s implementation of Apache Kafka. (here)
In this article, you will explore the approach to make these two important messaging platform talk to one another. This article assumes you know something about IBM MQ and IBM Event Streams.
I have setup the following software — as a prerequisites.
- IBM Cloud Private v3.1.1.
- Gluster File System (required for IBM Event Stream for highly resilient file system) on IBM Cloud Private.
- IBM Event Streams v2018.3.1 (on IBM Cloud Private).
- IBM MQ v9.1 (on IBM Cloud Private).
We are going to configure a setup that looks like this.
Basically, we have 2 queues defined in the MQ queue manager and two topics defined in Event Streams. We have two applications that is instrumental in getting messages to/from the queues to the topics, and vice verse.
- The MQ Source connector (or Kafka Source connector for IBM MQ) connects to the queue manager, periodically reads from the TO.KAFKA.Q queue and publishes the messages to the FROM.MQ.TOPIC in Kafka. The source means it is a source of messages to a Kafka topic.
- The MQ Sink connector (or Kafka Sink connector for IBM MQ) connects to the Kafka bootstrap server, subscribes to the topic TO.MQ.TOPIC. On receiving messages from the topic, it puts the message into the queue FROM.KAFKA.Q of the queue manager. This sink means it is a sink for messages from a Kafka topic.
Step 1: Configure Queue Manager
- Create two local queues: TO.KAFKA.Q, FROM.KAFKA.Q
Step 2: Get connection information of IBM Event Streams
- In the IBM Event Streams console, click Connect to this cluster.
- Copy the IP address to the boot strap server
3. Download the Java trust store, as a file es-cert.jks.
Step 3: Configure IBM Event Streams
- Create two topics: FROM.MQ.TOPIC, TO.MQ.TOPIC
- Create API keys — that provides permission to access the IBM ES cluster and topics.
3. Check what is being created.
Kafka Source connector for IBM MQ
The following steps configure the Kafa Source connector for IBM MQ.
Step 1: Configure Kafka source connector for IBM MQ
- Download Kafka Connect source connector (JAR and properties file) for IBM MQ; from the Event Streams Toolbox.
- Configure the mq-source.properties file. This file tells the Source connector how to connect to the queue manager and which Kafka topic to publish to.
mq.queue.manager=qmgr1
mq.connection.name.list=<qmgr1_hostname>:<qmgr1_port>
mq.channel.name=DEV.APP.SVRCONN
mq.queue=TO.KAFKA.Q
mq.user.name=USERNAME_TO_ACCESS_QMGR
mq.password=PASSWORD_TO_ACCESS_QMGR
topic=FROM.MQ.TOPIC
Step 2: Configure Java Worker (source) application as standalone
- Download the kafka binary file (kafka_2.11–2.2.0.tgz) from Apache Kafka (http://kafka.apache.org/downloads) that contains the Java code to Kafka broker as a standalone application.
- Untar file into ~/kafka_2.11–2.2.0.
- Copy the connect-standalone.properties to connect-standalone-source.properties.
- Configure config/connect-standalone-source.properties.
bootstrap.servers=<bootstrap_server>:<bootstrap_port>
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/REPLACE_ME/es-cert.jks
ssl.truststore.password=REPLACE_ME
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_FOR_FROM.MQ.TOPIC_PRODUCER";
producer.security.protocol=SASL_SSL
producer.ssl.protocol=TLSv1.2
producer.ssl.endpoint.identification.algorithm=
producer.ssl.truststore.location=/REPLACE_ME/es-cert.jks
producer.ssl.truststore.password=REPLACE_ME
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_FOR_FROM.MQ.TOPIC_PRODUCER";
The following information are required:
- bootstrap.servers: hostname and port of the Event Streams bootstrap server.
- ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- ssl.truststore.password: The password of the trust store.
- sasl.jaas.config: the password is the API key for the FROM.MQ.TOPIC producer.
- producer.ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- producer.ssl.truststore.password: The password of the trust store.
- producer.sasl.jaas.config: the password is the API key for the FROM.MQ.TOPIC producer.
Step 3: Run Java Worker (source) application as standalone
- Make sure the following files are in these folders
~/kafka_2.11–2.2.0/libs/kafka-connect-mq-source-1.0.1-jar-with-dependencies.jar into .
~/kafka_2.11–2.2.0/config/mq-source.properties - Go to the folder ~/kafka_2.11–2.2.0 and run the Java worker application.
CLASSPATH=libs/kafka-connect-mq-source-1.0.1-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone-source.properties config/mq-source.properties
Step 4: Configure sample consumer standalone application
- Configure the properties
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/REPLACE_ME/es-cert.jks
ssl.truststore.password=PASSWORD_FOR_JKS_TRUSTSTORE
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_FOR_FROM.MQ.TOPIC_Consumer";
- ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- ssl.truststore.password: The password of the trust store.
- sasl.jaas.config: the password is the API key for the FROM.MQ.TOPIC consumer.
Step 5: Run sample consumer application
- Go to the folder ~/kafka_2.11–2.2.0 and run the consumer application. This application will display any messages on the console when they are received.
bin/kafka-console-consumer.sh --bootstrap-server <bootstrap_server_host>:<bootstrap_server_port> --topic FROM.MQ.TOPIC -consumer.config config/my-consumer.config --from-beginning
Try putting a MQ message into TO.KAFKA.MQ from MQ Console and see if the messages are received by the consumer application or in the Event Streams monitor console.
Kafka Sink connector for IBM MQ
Step 1: Configure Kafka Sink connector for IBM MQ
- Download Kafka Connect sink connector (JAR and properties file) for IBM MQ; from the Event Streams Toolbox.
- Configure the mq-sink.properties file. This file tells the Sink connector how to connect to the queue manager and which Kafka topic to subscribe from.
mq.queue.manager=qmgr1
mq.connection.name.list=<qmgr1_hostname>:<qmgr1_port>
mq.channel.name=DEV.APP.SVRCONN
mq.queue=FROM.KAFKA.Q
mq.user.name=USERNAME_TO_ACCESS_QMGR
mq.password=PASSWORD_TO_ACCESS_QMGR
topic=TO.MQ.TOPIC
Step 2: Configure Java Worker (sink) application as standalone
- Download the kafka binary file (kafka_2.11–2.2.0.tgz) from Apache Kafka (http://kafka.apache.org/downloads) that contains the Java code to Kafka broker as a standalone application (if you haven’t)
- Untar file into ~/kafka_2.11–2.2.0. (if you haven’t)
- Copy the connect-standalone.properties to connect-standalone-sink.properties.
- Configure config/connect-standalone-sink.properties.
bootstrap.servers=<bootstrap_server>:<bootstrap_port>
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/REPLACE_ME/es-cert.jks
ssl.truststore.password=REPLACE_ME
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_FOR_TO.MQ.TOPIC_Consumer";
consumer.security.protocol=SASL_SSL
consumer.ssl.protocol=TLSv1.2
consumer.ssl.endpoint.identification.algorithm=
consumer.ssl.truststore.location=/REPLACE_ME/es-cert.jks
consumer.ssl.truststore.password=REPLACE_ME
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_FOR_TO.MQ.TOPIC_Consumer";
The following information are required:
- bootstrap.servers: hostname and port of the Event Streams bootstrap server.
- ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- ssl.truststore.password: The password of the trust store.
- sasl.jaas.config: the password is the API key for the TO.MQ.TOPIC consumer.
- consumer.ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- consumer.ssl.truststore.password: The password of the trust store.
- consumer.sasl.jaas.config: the password is the API key for the TO.MQ.TOPIC consumer.
Step 3: Run Java Worker (sink) application as standalone
- Make sure the following files are in these folders
~/kafka_2.11–2.2.0/libs/kafka-connect-mq-sink-1.0.1-jar-with-dependencies.jar into .
~/kafka_2.11–2.2.0/config/mq-sink.properties - Go to the folder ~/kafka_2.11–2.2.0 and run the Java worker application.
CLASSPATH=libs/kafka-connect-mq-sink-1.0.1-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone-sink.properties config/mq-sink.properties
Step 4: Configure sample producer standalone application
- Configure the properties
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/REPLACE_ME/es-cert.jks
ssl.truststore.password=PASSWORD_FOR_JKS_TRUSTSTORE
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="API_KEY_TO_FROM.MQ.TOPIC_Producer";
- ssl.truststore.location: The Java trust store (es-cert.jks), you have downloaded
- ssl.truststore.password: The password of the trust store.
- sasl.jaas.config: the password is the API key for the TO.MQ.TOPIC producer.
Step 5: Run sample producer application
- Go to the folder ~/kafka_2.11–2.2.0 and run the consumer application. This application will allows you to type your messaging via the console.
bin/kafka-console-producer.sh --bootstrap-server <bootstrap_server_host>:<bootstrap_server_port> --topic TO.MQ.TOPIC -producer.config config/my-producer.config
Try publishing a message into typing them using the producer application. See if the messages are received in the queue from the IBM MQ console.
Thank you for reading & good luck trying!!
Disclaimer:
All opinions expressed here are very much my own and not of IBM. All codes/scripts/artifacts are provided as-is with no support unless otherwise stated.