Spring JMS Listener Example

spring jms listener

In this post, I will show you how to receive messages using a Spring JMS Listener.

First, I’ll explain the different options available.

Then, I’ll build a detailed example.

Sound good? Let’s dive right in…

If you want to learn more about Spring JMS – head on over to the Spring JMS tutorials page.

1. What is a Spring JMS Listener?

In order to asynchronously receive JMS messages, Spring offers a solution to create message-driven POJOs (MDP).

A message listener container is used to receive messages from a JMS broker. The container is a wrapper of sorts that calls a simple POJO listener class when a message arrives.

The one restriction on an MDP is that it must implement the MessageListener interface.

Note that you can also synchronously receive JMS messages using the JmsTemplate.

Let’s show how the above concepts work.

We start from a previous Spring JMS Example using ActiveMQ.

We adapt it so that an order message is sent to an order queue. A JMS listener will pick up the message and send a status message to two different status queues. On each queue, a different message listener container will read the status.

2. General Project Overview

We will use the following tools/frameworks:

  • Spring JMS 5.1
  • Spring Boot 2.1
  • ActiveMQ 5.15
  • Maven 3.6

Our project has the following directory structure:

spring jms listener maven project

3. Configure a Spring JMS Listener Container

A message listener container handles all the complexity of receiving JMS messages.

It is responsible for all threading of message reception and dispatches into the listener for processing. The container is the intermediary between an MDP and a messaging provider. It pulls the messages off a queue/topic and feeds them to your message listener.

There are two standard JMS message listener containers packaged with Spring:

  1. The DefaultMessageListenerContainer (DMLC): uses a pull approach. It sits in infinite loop to receive messages.
  2. The SimpleMessageListenerContainer (SMLC): uses a push approach.

The DefaultMessageListenerContainer is considered a recommendable approach in many environments.

It is the only listener container that does not impose the thread management onto the JMS provider (as it does not use/block JMS provider threads). The DMLC is also able to gracefully recover from JMS provider failure, such as connection loss. And it is the only variant that supports external transaction managers, in particular for XA transactions.

Alternatively, consider using SimpleMessageListenerContainer, but only for native JMS usage without XA, and only if your JMS provider gracefully handles thread management and connection recovery.

Let’s create both types of listener containers in the ReceiverConfig class.

Create a DefaultJmsListenerContainerFactory and a SimpleJmsListenerContainerFactory. Note that they both require a ConnectionFactory.

We use the setConcurrency() method on the DMLC to set the “lower-upper” limits. The listener container created will always hold on to the minimum number of consumers and will slowly scale up to the maximum number of consumers in case of an increasing load.

We then use the factories to create a DefaultMessageListenerContainer and a SimpleMessageListenerContainer.

The SimpleJmsListenerEndpoint defines Destination and the MessageListener to invoke to process an incoming message. For testing purposes, we reuse the StatusMessageListener for both containers but give them a different identifier.

package com.codenotfound.jms;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;

@Configuration
@EnableJms
public class ReceiverConfig {

  @Value("${activemq.broker-url}")
  private String brokerUrl;

  @Value("${destination.status1}")
  private String status1Destination;

  @Value("${destination.status2}")
  private String status2Destination;

  @Bean
  public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory =
        new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(brokerUrl);

    return activeMQConnectionFactory;
  }

  @Bean
  public DefaultJmsListenerContainerFactory orderDefaultJmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
        new DefaultJmsListenerContainerFactory();
    factory
        .setConnectionFactory(receiverActiveMQConnectionFactory());
    factory.setConcurrency("3-10");

    return factory;
  }

  @Bean
  public SimpleJmsListenerContainerFactory orderSimpleJmsListenerContainerFactory() {
    SimpleJmsListenerContainerFactory factory =
        new SimpleJmsListenerContainerFactory();
    factory
        .setConnectionFactory(receiverActiveMQConnectionFactory());

    return factory;
  }

  @Bean
  public DefaultMessageListenerContainer orderMessageListenerContainer() {
    SimpleJmsListenerEndpoint endpoint =
        new SimpleJmsListenerEndpoint();
    endpoint.setMessageListener(new StatusMessageListener("DMLC"));
    endpoint.setDestination(status1Destination);

    return orderDefaultJmsListenerContainerFactory()
        .createListenerContainer(endpoint);
  }

  @Bean
  public SimpleMessageListenerContainer statusMessageListenerContainer() {
    SimpleJmsListenerEndpoint endpoint =
        new SimpleJmsListenerEndpoint();
    endpoint.setMessageListener(new StatusMessageListener("SMLC"));
    endpoint.setDestination(status2Destination);

    return orderSimpleJmsListenerContainerFactory()
        .createListenerContainer(endpoint);
  }
}

4. Create a Spring JMS Listener

To create a JMS listener you need to implement the MessageListener interface. It has an onMessage() method that is triggered for each message that is received.

The below StatusMessageListener tries to cast the received message to a TextMessage. If successful it logs the content and lowers a CountDownLatch that we will use for testing purposes.

As we set a StatusMessageListener instance on both the DMLC and SMLC containers we add an id. This allows us to log in which container a message is received.

package com.codenotfound.jms;

import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatusMessageListener implements MessageListener {

  private static final Logger LOGGER =
      LoggerFactory.getLogger(Sender.class);

  private String id;

  private CountDownLatch latch = new CountDownLatch(1);

  public StatusMessageListener(String id) {
    super();
    this.id = id;
  }

  @Override
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        String text = ((TextMessage) message).getText();
        LOGGER.info("id='{}' received status='{}'", id, text);
        latch.countDown();
      } catch (JMSException e) {
        LOGGER.error("unable to read message payload", e);
      }
    } else {
      LOGGER.error("received unsupported message type");
    }
  }

  public CountDownLatch getLatch() {
    return latch;
  }
}

There is even an easier way to create a Spring JMS listener.

Simply decorate a Bean method with the @JmsListener annotation. This causes a listener container to be created on the specified destination using a ContainerFactory.

If not set, a default container factory is assumed to be available with a bean name of jmsListenerContainerFactory unless an explicit default has been provided through configuration.

Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the @EnableJms annotation.

So for this example we create an OrderReceiver class that contains a receiveOrder() method annotated with @JmsListener. The method receives a simple order and logs it. We then use an auto-wired JmsTemplate to send a status message to the status1 and status2 destinations.

Note that @EnableJms was specified on the earlier defined ReceiverConfig class.

package com.codenotfound.jms;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderReceiver {

  private static final Logger LOGGER =
      LoggerFactory.getLogger(OrderReceiver.class);

  @Value("${destination.status1}")
  private String status1Destination;

  @Value("${destination.status2}")
  private String status2Destination;

  @Autowired
  JmsTemplate jmsTemplate;

  @JmsListener(destination = "${destination.order}",
      containerFactory = "orderDefaultJmsListenerContainerFactory")
  public void receiveOrder(String order) {
    LOGGER.info("received order='{}'", order);
    jmsTemplate.convertAndSend(status1Destination, "Accepted");
    jmsTemplate.convertAndSend(status2Destination, "Accepted");
  }
}

5. Testing the JMS listener

We change the existing test case to check if our different message listeners work.

Send an order message to the order destination. Then check if the CountDownLatch was lowered in both the DMLC and SMLC message listeners.

package com.codenotfound;

import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import com.codenotfound.jms.Sender;
import com.codenotfound.jms.StatusMessageListener;

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class SpringJmsApplicationTest {

  @ClassRule
  public static EmbeddedActiveMQBroker broker =
      new EmbeddedActiveMQBroker();

  @Autowired
  private Sender sender;

  @Autowired
  private DefaultMessageListenerContainer dmlc;

  @Autowired
  private SimpleMessageListenerContainer smlc;

  @Test
  public void testReceive() throws Exception {
    sender.send("order-002");

    StatusMessageListener status1MessageListener =
        (StatusMessageListener) dmlc.getMessageListener();
    status1MessageListener.getLatch().await(10000,
        TimeUnit.MILLISECONDS);
    assertThat(status1MessageListener.getLatch().getCount())
        .isEqualTo(0);

    StatusMessageListener status2MessageListener =
        (StatusMessageListener) smlc.getMessageListener();
    status2MessageListener.getLatch().await(10000,
        TimeUnit.MILLISECONDS);
    assertThat(status2MessageListener.getLatch().getCount())
        .isEqualTo(0);
  }
}

Open a command prompt in the project root directory and launch the test case.

mvn test

In the output logs, we can see that the order message is received by the annotated listener. The status messages arrive in the DMLC and SMLC as shown below.

 .   ____          _            __ _ _
/// / ___'_ __ _ _(_)_ __  __ _ / / / /
( ( )/___ | '_ | '_| | '_ // _` | / / / /
///  ___)| |_)| | | | | || (_| |  ) ) ) )
 '  |____| .__|_| |_|_| |_/__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot ::        (v2.1.5.RELEASE)

2019-05-30 14:01:04.096  INFO 11360 --- [           main] c.codenotfound.SpringJmsApplicationTest  : Starting SpringJmsApplicationTest on DESKTOP-2RB3C1U with PID 11360 (started by Codenotfound in C:/Users/Codenotfound/repos/spring-jms/spring-jms-listener)
2019-05-30 14:01:04.098  INFO 11360 --- [           main] c.codenotfound.SpringJmsApplicationTest  : No active profile set, falling back to default profiles: default
2019-05-30 14:01:05.298  INFO 11360 --- [           main] c.codenotfound.SpringJmsApplicationTest  : Started SpringJmsApplicationTest in 1.501 seconds (JVM running for 3.267)
2019-05-30 14:01:05.715  INFO 11360 --- [           main] com.codenotfound.jms.Sender              : sending message='order-002'
2019-05-30 14:01:05.747  INFO 11360 --- [enerContainer-3] com.codenotfound.jms.OrderReceiver       : received order='order-002'
2019-05-30 14:01:05.754  INFO 11360 --- [enerContainer-1] com.codenotfound.jms.Sender              : id='DMLC' received status='Accepted'
2019-05-30 14:01:05.774  INFO 11360 --- [ Session Task-1] com.codenotfound.jms.Sender              : id='SMLC' received status='Accepted'
2019-05-30 14:01:06.791  INFO 11360 --- [           main] o.a.a.junit.EmbeddedActiveMQBroker       : Stopping Embedded ActiveMQ Broker: embedded-broker
2019-05-30 14:01:06.795  INFO 11360 --- [           main] o.a.activemq.broker.TransportConnector   : Connector vm://embedded-broker stopped
2019-05-30 14:01:06.795  INFO 11360 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.9 (embedded-broker, ID:DESKTOP-2RB3C1U-59326-1559217663431-0:1) is shutting down
2019-05-30 14:01:06.812  INFO 11360 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.9 (embedded-broker, ID:DESKTOP-2RB3C1U-59326-1559217663431-0:1) uptime 3.666 seconds
2019-05-30 14:01:06.812  INFO 11360 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.9 (embedded-broker, ID:DESKTOP-2RB3C1U-59326-1559217663431-0:1) is shutdown
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.394 s - in com.codenotfound.SpringJmsApplicationTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  9.908 s
[INFO] Finished at: 2019-05-30T14:01:07+02:00
[INFO] ------------------------------------------------------------------------

github mark
If you would like to run the above code sample you can get the full source code here.

In this guide, we took a look at the different JMS listeners that the Spring framework offers.

I hope you found the example useful.

If so, leave a comment below.

Thanks!

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/271038.html

(0)
上一篇 2022年7月3日
下一篇 2022年7月3日

相关推荐

发表回复

登录后才能评论