如何实现ActiveMq的Topic的持久订阅详解编程语言

(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可




。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。




(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。




首先,假设消费者都是普通的消费者,


————————


<1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么




,这个消息就被抛弃了。




<2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~




activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。




<3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~




activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。




<4>消费者1关掉了。




activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。




<5>消费者1又启动了。




activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。


—————————–


总结一下:


activemq只是向当前启动的消费者发送消息。


关掉的消费者,会错过很多消息,并无法再次接收这些消息。




如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。




那么,如何让消费者重新启动时,接收到错过的消息呢?




答案是持久订阅。




(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。


持久订阅,就要记录消费者的名字了。


张三说,我是张三,有馒头给我留着,我回来拿。


李四说,我是李四,有馒头给我留着,我回来拿。


activemq就记下张三,李四两个名字。




那么,分馒头时,还是一个人头给一个馒头。


分完了,一看张三没说话,说明他不在,给他留一个。


李四说话了,那就不用留了。




张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。


可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。




activemq区分消费者,是通过clientID和订户名称来区分的。


// 创建connection

connection = connectionFactory.createConnection();

connection.setClientID(“bbb”); //持久订阅需要设置这个。

connection.start();

// 创建session

Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

// 创建destination

Topic topic = session.createTopic(“userSyncTopic”); //Topic名称

//MessageConsumer consumer = session.createConsumer(topic); //普通订阅

MessageConsumer consumer = session.createDurableSubscriber(topic,”bbb”); //持久订阅





(4)还有一点,消息的生产者,发送消息时用使用持久模式


MessageProducer producer = …;


producer.setDeliveryMode(DeliveryMode.PERSISTENT);


不设置,默认就是持久的




(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。




(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。




可以访问
http://localhost:8161/admin/index.jsp


查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。




可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/13696.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论