RocketMQ——消息选择器详解编程语言

发送消息时,我们除了可以向指定的Topic发送消息,还可以向指定的Queue发消息。有以下4种方法

 DefaultMQProducer producer1 = new DefaultMQProducer(); 
 
        //向固定的一个queue中写消息 
        producer1.send(msg, new MessageQueueSelector() {
    
            /** 
             * Queue选择器,我们除了可以指定向哪个topic发消息,还可以直接指定向topic里的哪个queue发消息 
             * 
             * @param mqs 当前topic里的所有queue 
             * @param msg 发送的那条消息 
             * @param arg 对应send方法里的arg,也就是下面2 
             * @return 
             */ 
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    
                //获取queueId是2的queue 
                MessageQueue messageQueue = mqs.get(2); 
                return messageQueue; 
            } 
        },2, 2000); 
 
        //使用哈希的方式选择一个Queue发消息 
        producer1.send(msg, new SelectMessageQueueByHash(), 2); 
 
        //使用随机数的方式选择一个Queue发消息 
        producer1.send(msg, new SelectMessageQueueByRandom(), 2); 
 
        //根据机房选择Queue,默认无实现,需要自己写实现 
        producer1.send(msg, new SelectMessageQueueByMachineRoom(){
    
            @Override 
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    
                return null; 
            } 
        },2); 
 

Consumer在消费消息时可以并行消费,也可以串行消费

  //最大开启消费线程数 
        consumer.setConsumeThreadMax(5); 
        //最小开启消费线程数 
        consumer.setConsumeThreadMin(2); 
        //这实际上是并发消费(开多线程),一个queue用多个线程消费(可保证消费顺序),当有多个queue时,只能保证各queue里的顺序,queue的顺序无法保证 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                for(MessageExt msg : list){
    
                    System.out.println(new String(msg.getBody())); 
                } 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
            } 
        }); 
 
        //这是单线程消费,一个queue用一个线程消费 
        consumer.registerMessageListener(new MessageListenerOrderly() {
    
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
    
                for(MessageExt msg : list){
    
                    System.out.println(new String(msg.getBody())); 
                } 
                return ConsumeOrderlyStatus.SUCCESS; 
            } 
        }); 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20582.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论