发送消息时,我们除了可以向指定的Topic发送消息,还可以向指定的Queue发消息。有以下4种方法
DefaultMQProducer producer1 = new DefaultMQProducer();
//向固定的一个queue中写消息
producer1.send(msg, new MessageQueueSelector() {
/**
* Queue选择器,我们除了可以指定向哪个topic发消息,还可以直接指定向topic里的哪个queue发消息
*
* @param mqs 当前topic里的所有queue
* @param msg 发送的那条消息
* @param arg 对应send方法里的arg,也就是下面2
* @return
*/
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//获取queueId是2的queue
MessageQueue messageQueue = mqs.get(2);
return messageQueue;
}
},2, 2000);
//使用哈希的方式选择一个Queue发消息
producer1.send(msg, new SelectMessageQueueByHash(), 2);
//使用随机数的方式选择一个Queue发消息
producer1.send(msg, new SelectMessageQueueByRandom(), 2);
//根据机房选择Queue,默认无实现,需要自己写实现
producer1.send(msg, new SelectMessageQueueByMachineRoom(){
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
},2);
Consumer在消费消息时可以并行消费,也可以串行消费
//最大开启消费线程数
consumer.setConsumeThreadMax(5);
//最小开启消费线程数
consumer.setConsumeThreadMin(2);
//这实际上是并发消费(开多线程),一个queue用多个线程消费(可保证消费顺序),当有多个queue时,只能保证各queue里的顺序,queue的顺序无法保证
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//这是单线程消费,一个queue用一个线程消费
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20582.html