RocketMQ——IDEA开发RocketMQ程序简单入门详解编程语言

建立maven项目,导入下面依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> 
        <dependency> 
            <groupId>org.apache.rocketmq</groupId> 
            <artifactId>rocketmq-client</artifactId> 
            <version>4.6.1</version> 
        </dependency> 

保证已在服务器启动nameserver和broker,强烈推荐读者使用上文提到的网页版UI观察消息队列状态,下面展示三种发送生产消费的方法:

import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
 
/** 
 * 消息发送者Producer 
 * 
 * @author Song X. 
 * @date 2020/03/02 
 */ 
public class Producer_Sync {
    
    public static void main(String[] args) throws Exception {
    
        //创建生产者组 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
        //设置nameserver地址,如1.1.1.1:9876 
        producer.setNamesrvAddr("enter the addr of nameserver"); 
        //启动producer 
        producer.start(); 
 
        //topic消息将要发往的地址 
        //body是真正的消息体 
        Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes()); 
 
        //同步阻塞式发消息,也就是发送完消息,必须等broker返回一个发送成功的消息,才会做其他事 
        //这种方式消息发送的安全性可以得到保证,但是同步阻塞必定慢 
        SendResult sendResult = producer.send(msg); 
        System.out.println(sendResult); 
         
        //关闭消费者 
        producer.shutdown(); 
    } 
} 
 
import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendCallback; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
import org.apache.rocketmq.remoting.common.RemotingHelper; 
 
/** 
 * 消息发送者Producer 
 * 
 * @author Song X. 
 * @date 2020/03/02 
 */ 
public class Producer_Asyc {
    
    public static void main(String[] args) throws Exception {
    
        //创建生产者组 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
        //设置nameserver地址 
        producer.setNamesrvAddr("enter the addr of nameserver"); 
        //启动producer 
        producer.start(); 
        //设置异步发送失败时的重设时间 
        producer.setRetryTimesWhenSendAsyncFailed(1); 
 
        //异步非阻塞式发消息,也就是发送完消息,不等broker返回一个发送成功的消息,就做其他事了 
        //那么如何获得broker的确认信息,采用监听的形式 
        //这种方式消息发送的安全性无法得到保证,但是速度快 
        for(int i = 0; i < 100; i++) {
    
            final int index = i; 
            StringBuffer body = new StringBuffer("Rocket MQ 第"); 
            body.append(i+"条消息"); 
 
            Message msg = new Message("myTopic001", body.toString().getBytes( 
                    RemotingHelper.DEFAULT_CHARSET 
            )); 
 
            producer.send(msg, new SendCallback() {
    
                @Override 
                public void onSuccess(SendResult sendResult) {
    
                    System.out.println("第 " + index + " 消息发送成功"); 
                } 
 
                @Override 
                public void onException(Throwable throwable) {
    
                    System.out.println("第 " + index + " 消息发送失败"); 
                    throwable.printStackTrace(); 
                } 
            }); 
        } 
 
        //关闭消费者,发送异步消息时,切勿使用这句话,因为无法确定上面的回调方法什么时候执行 
        //producer.shutdown(); 
    } 
} 
import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
 
/** 
 * 消息发送者Producer 
 * 
 * @author Song X. 
 * @date 2020/03/02 
 */ 
public class Producer_OneWay {
    
    public static void main(String[] args) throws Exception {
    
        //创建生产者组 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
        //设置nameserver地址 
        producer.setNamesrvAddr("192.168.255.131:9876"); 
        //启动producer 
        producer.start(); 
 
        //topic消息将要发往的地址 
        //body是真正的消息体 
        Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes()); 
 
        //单向发送消息,不关心有没有发送成功。速度最快 
        producer.sendOneway(msg); 
 
        //关闭消费者 
        producer.shutdown(); 
    } 
} 
 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt; 
 
import java.util.List; 
 
/** 
 * 消息消费者Consumer 
 * 
 * @author Song X. 
 * @date 2020/03/02 
 */ 
public class Consumer {
    
    public static void main(String[] args) throws MQClientException {
    
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup01"); 
 
        //设置nameserver地址 
        consumer.setNamesrvAddr("enter the addr of nameserver"); 
 
        //每个consumer只能关注一个topic 
        //topic 关注的topic名 
        //subExpression 根据Tag过滤 表示我需要过滤掉当前topic里的什么信息, *代表不过滤 
        String topic = "myTopic001"; 
        String subExpression = "*"; 
        consumer.subscribe(topic, subExpression); 
 
        //使用监听的方式接受信息,防止消费者本身一直在这阻塞等着消息 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
    
            //list就是收到的所有消息 
            @Override 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                for(MessageExt msg : list){
    
                    System.out.println(new String(msg.getBody())); 
                } 
 
                //给broker返回消费成功的信息,broker就会将这条消息状态改为success,这样这条消息之后就不会被其他consumer消费 
                //相当于给broker返回了一个ack信息 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
            } 
        }); 
 
        //启动消费者 
        consumer.start(); 
        System.out.println("consumer start...."); 
    } 
} 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/20586.html

(0)
上一篇 2021年7月19日 23:34
下一篇 2021年7月19日 23:34

相关推荐

发表回复

登录后才能评论