mq 使用Spring发送,消费topic和queue消息详解编程语言

简介

实战一 , 实战二介绍了ActiveMQ的基本概念和配置方式.

本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.

  

如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道.

  1. TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.
  2. QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息

Spring整合JMS

就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.

  1. ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
  2. Destination. 有topic和queue两种方式.
  3. JmsTemplate. spring提供的jms模板.
  4. MessageConverter. 消息转换器.
  5. MessageProducer. 消息生产者.
  6. MessageConsumer. 消息消费者.
  7. MessageListener. 消息监听器
  8. MessageListenerContainer. 消息监听容器

下面以实例的方式介绍上面8个部分.

1. ConnectionFactory

Xml代码 

  1. <amq:connectionFactory id=“jmsConnectionFactory” brokerURL=“vm://localhost” /> 
[xml] 
view plain
 copy

  1. <amq:connectionFactory id=“jmsConnectionFactory” brokerURL=“vm://localhost” />  

brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.

Xml代码 

  1. <!–  embedded ActiveMQ Broker –> 
  2.     <amq:broker useJmx=“false” persistent=“true”> 
  3.         <amq:persistenceAdapter> 
  4.             <amq:amqPersistenceAdapter directory=“d:/amq”/> 
  5.         </amq:persistenceAdapter> 
  6.         <amq:transportConnectors> 
  7.             <amq:transportConnector uri=“tcp://localhost:61616” /> 
  8.                        <amq:transportConnector uri=“vm://localhost:0” /> 
  9.         </amq:transportConnectors> 
  10.     </amq:broker> 
[xml] 
view plain
 copy

  1. <!–  embedded ActiveMQ Broker –>  
  2. <amq:broker useJmx=“false” persistent=“true”>  
  3. <amq:persistenceAdapter>  
  4. <amq:amqPersistenceAdapter directory=“d:/amq”/>  
  5. </amq:persistenceAdapter>  
  6. <amq:transportConnectors>  
  7. <amq:transportConnector uri=“tcp://localhost:61616” />  
  8. <amq:transportConnector uri=“vm://localhost:0” />  
  9. </amq:transportConnectors>  
  10. </amq:broker>  

2. Destination

在实例中我们使用了两种destination

Xml代码 

  1. <!–  ActiveMQ destinations  –> 
  2. <!–  使用topic方式–> 
  3. <amq:topic name=“TOPIC” physicalName=“JMS-TEST-TOPIC” /> 
  4. <!–  使用Queue方式–> 
  5. <amq:queue name=“QUEUE” physicalName=“JMS-TEST-QUEUE” /> 
[xml] 
view plain
 copy

  1. <!–  ActiveMQ destinations  –>  
  2. <!–  使用topic方式–>  
  3. <amq:topic name=“TOPIC” physicalName=“JMS-TEST-TOPIC” />  
  4. <!–  使用Queue方式–>  
  5. <amq:queue name=“QUEUE” physicalName=“JMS-TEST-QUEUE” />  

3. JmsTemplate

Xml代码 

  1. <!–  Spring JmsTemplate config –> 
  2.     <bean id=“jmsTemplate” class=“org.springframework.jms.core.JmsTemplate”> 
  3.         <property name=“connectionFactory”> 
  4.             <!–  lets wrap in a pool to avoid creating a connection per send –> 
  5.             <bean class=“org.springframework.jms.connection.SingleConnectionFactory”> 
  6.                 <property name=“targetConnectionFactory” ref=“jmsConnectionFactory” /> 
  7.             </bean> 
  8.         </property> 
  9.         <!– custom MessageConverter –> 
  10.         <property name=“messageConverter” ref=“defaultMessageConverter” /> 
  11.     </bean> 
[xml] 
view plain
 copy

  1. <!–  Spring JmsTemplate config –>  
  2. <bean id=“jmsTemplate” class=“org.springframework.jms.core.JmsTemplate”>  
  3. <property name=“connectionFactory”>  
  4. <!–  lets wrap in a pool to avoid creating a connection per send –>  
  5. <bean class=“org.springframework.jms.connection.SingleConnectionFactory”>  
  6. <property name=“targetConnectionFactory” ref=“jmsConnectionFactory” />  
  7. </bean>  
  8. </property>  
  9. <!– custom MessageConverter –>  
  10. <property name=“messageConverter” ref=“defaultMessageConverter” />  
  11. </bean>  

  4. MessageConverter

   MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.

Xml代码 

  1. <bean id=“defaultMessageConverter” class=“com.andyao.activemq.DefaultMessageConverter” /> 
[xml] 
view plain
 copy

  1. <bean id=“defaultMessageConverter” class=“com.andyao.activemq.DefaultMessageConverter” />  

  5. MessageProducer

   实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.

Xml代码 

  1. <!– POJO which send Message uses  Spring JmsTemplate –> 
  2.     <bean id=“topicMessageProducer” class=“com.andyao.activemq.TopicMessageProducer”> 
  3.         <property name=“template” ref=“jmsTemplate” /> 
  4.         <property name=“destination” ref=“TOPIC” /> 
  5.     </bean> 
  6.     <bean id=“queueMessageProducer” class=“com.andyao.activemq.QueuMessageProducer”> 
  7.         <property name=“template” ref=“jmsTemplate” /> 
  8.         <property name=“destination” ref=“QUEUE” /> 
  9.     </bean> 
[xml] 
view plain
 copy

  1. <!– POJO which send Message uses  Spring JmsTemplate –>  
  2. <bean id=“topicMessageProducer” class=“com.andyao.activemq.TopicMessageProducer”>  
  3. <property name=“template” ref=“jmsTemplate” />  
  4. <property name=“destination” ref=“TOPIC” />  
  5. </bean>  
  6. <bean id=“queueMessageProducer” class=“com.andyao.activemq.QueuMessageProducer”>  
  7. <property name=“template” ref=“jmsTemplate” />  
  8. <property name=“destination” ref=“QUEUE” />  
  9. </bean>  

6. MessageConsumer

TOPIC通道有两个消息消费者, QUEUE有一个消息消费者

Xml代码 

  1. <!–  Message Driven POJO (MDP) –> 
  2.     <!– consumer1 for topic a –> 
  3.     <bean id=“topicConsumerA” class=“com.andyao.activemq.TopicConsumerA” /> 
  4.     <!– consumer2 for topic a –> 
  5.     <bean id=“topicConsumerB” class=“com.andyao.activemq.TopicConsumerB” /> 
  6.     <!– consumer for queue –> 
  7.     <bean id=“queueConsumer” class=“com.andyao.activemq.QueueConsumer” /> 
[xml] 
view plain
 copy

  1. <!–  Message Driven POJO (MDP) –>  
  2. <!– consumer1 for topic a –>  
  3. <bean id=“topicConsumerA” class=“com.andyao.activemq.TopicConsumerA” />  
  4. <!– consumer2 for topic a –>  
  5. <bean id=“topicConsumerB” class=“com.andyao.activemq.TopicConsumerB” />  
  6. <!– consumer for queue –>  
  7. <bean id=“queueConsumer” class=“com.andyao.activemq.QueueConsumer” />  

  7. MessageListener

每一个消息消费者都对应一个MessageListener

Xml代码 

  1. <bean id=“topicListenerA” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”> 
  2.         <constructor-arg ref=“topicConsumerA” /> 
  3.         <!–  may be other method –> 
  4.         <property name=“defaultListenerMethod” value=“receive” /> 
  5.         <!– custom MessageConverter define –> 
  6.         <property name=“messageConverter” ref=“defaultMessageConverter” /> 
  7.     </bean> 
  8.  
  9.     <bean id=“topicListenerB” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”> 
  10.         <constructor-arg ref=“topicConsumerB” /> 
  11.         <!–  may be other method –> 
  12.         <property name=“defaultListenerMethod” value=“receive” /> 
  13.         <!– custom MessageConverter define –> 
  14.         <property name=“messageConverter” ref=“defaultMessageConverter” /> 
  15.     </bean> 
  16.  
  17.     <bean id=“queueListener” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”> 
  18.         <constructor-arg ref=“queueConsumer” /> 
  19.         <!–  may be other method –> 
  20.         <property name=“defaultListenerMethod” value=“receive” /> 
  21.         <!– custom MessageConverter define –> 
  22.         <property name=“messageConverter” ref=“defaultMessageConverter” /> 
  23.     </bean> 
[xml] 
view plain
 copy

  1. <bean id=“topicListenerA” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”>  
  2. <constructor-arg ref=“topicConsumerA” />  
  3. <!–  may be other method –>  
  4. <property name=“defaultListenerMethod” value=“receive” />  
  5. <!– custom MessageConverter define –>  
  6. <property name=“messageConverter” ref=“defaultMessageConverter” />  
  7. </bean>  
  8. <bean id=“topicListenerB” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”>  
  9. <constructor-arg ref=“topicConsumerB” />  
  10. <!–  may be other method –>  
  11. <property name=“defaultListenerMethod” value=“receive” />  
  12. <!– custom MessageConverter define –>  
  13. <property name=“messageConverter” ref=“defaultMessageConverter” />  
  14. </bean>  
  15. <bean id=“queueListener” class=“org.springframework.jms.listener.adapter.MessageListenerAdapter”>  
  16. <constructor-arg ref=“queueConsumer” />  
  17. <!–  may be other method –>  
  18. <property name=“defaultListenerMethod” value=“receive” />  
  19. <!– custom MessageConverter define –>  
  20. <property name=“messageConverter” ref=“defaultMessageConverter” />  
  21. </bean>  

8. MessageListenerContainer

有几个MessageListener既有几个MessageListenerContainer

Xml代码 

  1. <bean id=“topicListenerContainerA” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”> 
  2.         <property name=“connectionFactory” ref=“jmsConnectionFactory” /> 
  3.         <property name=“destination” ref=“TOPIC” /> 
  4.         <property name=“messageListener” ref=“topicListenerA” /> 
  5.     </bean> 
  6.  
  7.     <bean id=“topicListenerContainerB” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”> 
  8.         <property name=“connectionFactory” ref=“jmsConnectionFactory” /> 
  9.         <property name=“destination” ref=“TOPIC” /> 
  10.         <property name=“messageListener” ref=“topicListenerB” /> 
  11.     </bean> 
  12.       
  13.     <bean id=“queueListenerContainer” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”> 
  14.         <property name=“connectionFactory” ref=“jmsConnectionFactory” /> 
  15.         <property name=“destination” ref=“QUEUE” /> 
  16.         <property name=“messageListener” ref=“queueListener” /> 
  17.     </bean> 
[xml] 
view plain
 copy

  1. <bean id=“topicListenerContainerA” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”>  
  2. <property name=“connectionFactory” ref=“jmsConnectionFactory” />  
  3. <property name=“destination” ref=“TOPIC” />  
  4. <property name=“messageListener” ref=“topicListenerA” />  
  5. </bean>  
  6. <bean id=“topicListenerContainerB” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”>  
  7. <property name=“connectionFactory” ref=“jmsConnectionFactory” />  
  8. <property name=“destination” ref=“TOPIC” />  
  9. <property name=“messageListener” ref=“topicListenerB” />  
  10. </bean>  
  11. <bean id=“queueListenerContainer” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”>  
  12. <property name=“connectionFactory” ref=“jmsConnectionFactory” />  
  13. <property name=“destination” ref=“QUEUE” />  
  14. <property name=“messageListener” ref=“queueListener” />  
  15. </bean>  

  Summary

写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:

  1. 可以有一个或者多个消息生产者向同一个destination发送消息.
  2. queue类型的只能有一个消息消费者.
  3. topic类型的可以有多个消息消费者.
  4. 每个消费者对应一个MessageListener和一个MessageListenerContainer.

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/13697.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论