RocketMQ——IDEA开发RocketMQ程序简单入门详解编程语言

建立maven项目,导入下面依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> 
        <dependency> 
            <groupId>org.apache.rocketmq</groupId> 
            <artifactId>rocketmq-client</artifactId> 
            <version>4.6.1</version> 
        </dependency> 

保证已在服务器启动nameserver和broker,强烈推荐读者使用上文提到的网页版UI观察消息队列状态,下面展示三种发送生产消费的方法:

import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
 
/** 
 * 消息发送者Producer 
 * 
 * @author Song X. 
 * @date 2020/03/02 
 */ 
public class Producer_Sync {
    
    public static void main(String[] args) throws Exception {
    
        //创建生产者组 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
        //设置nameserver地址,如1.1.1.1:9876 
        producer.setNamesrvAddr("enter the addr of nameserver"); 
        //启动producer 
        producer.start(); 
 
        //topic消息将要发往的地址 
        //body是真正的消息体 
        Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes()); 
 
        //同步阻塞式发消息,也就是发送完消息,必须等broker返回一个发送成功的消息,才会做其他事 
        //这种方式消息发送的安全性可以得到保证,但是同步阻塞必定慢 
        SendResult sendResult = producer.send(msg); 
        System.out.println(sendResult); 
         
        //关闭消费者 
        producer.shutdown(); 
    } 
} 
 
import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendCallback; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
import org.apache.rocketmq.remoting.common.RemotingHelper; 
/** 
* 消息发送者Producer 
* 
* @author Song X. 
* @date 2020/03/02 
*/ 
public class Producer_Asyc {
 
public static void main(String[] args) throws Exception {
 
//创建生产者组 
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
//设置nameserver地址 
producer.setNamesrvAddr("enter the addr of nameserver"); 
//启动producer 
producer.start(); 
//设置异步发送失败时的重设时间 
producer.setRetryTimesWhenSendAsyncFailed(1); 
//异步非阻塞式发消息,也就是发送完消息,不等broker返回一个发送成功的消息,就做其他事了 
//那么如何获得broker的确认信息,采用监听的形式 
//这种方式消息发送的安全性无法得到保证,但是速度快 
for(int i = 0; i < 100; i++) {
 
final int index = i; 
StringBuffer body = new StringBuffer("Rocket MQ 第"); 
body.append(i+"条消息"); 
Message msg = new Message("myTopic001", body.toString().getBytes( 
RemotingHelper.DEFAULT_CHARSET 
)); 
producer.send(msg, new SendCallback() {
 
@Override 
public void onSuccess(SendResult sendResult) {
 
System.out.println("第 " + index + " 消息发送成功"); 
} 
@Override 
public void onException(Throwable throwable) {
 
System.out.println("第 " + index + " 消息发送失败"); 
throwable.printStackTrace(); 
} 
}); 
} 
//关闭消费者,发送异步消息时,切勿使用这句话,因为无法确定上面的回调方法什么时候执行 
//producer.shutdown(); 
} 
} 
import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.client.producer.SendResult; 
import org.apache.rocketmq.common.message.Message; 
/** 
* 消息发送者Producer 
* 
* @author Song X. 
* @date 2020/03/02 
*/ 
public class Producer_OneWay {
 
public static void main(String[] args) throws Exception {
 
//创建生产者组 
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01"); 
//设置nameserver地址 
producer.setNamesrvAddr("192.168.255.131:9876"); 
//启动producer 
producer.start(); 
//topic消息将要发往的地址 
//body是真正的消息体 
Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes()); 
//单向发送消息,不关心有没有发送成功。速度最快 
producer.sendOneway(msg); 
//关闭消费者 
producer.shutdown(); 
} 
} 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt; 
import java.util.List; 
/** 
* 消息消费者Consumer 
* 
* @author Song X. 
* @date 2020/03/02 
*/ 
public class Consumer {
 
public static void main(String[] args) throws MQClientException {
 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup01"); 
//设置nameserver地址 
consumer.setNamesrvAddr("enter the addr of nameserver"); 
//每个consumer只能关注一个topic 
//topic 关注的topic名 
//subExpression 根据Tag过滤 表示我需要过滤掉当前topic里的什么信息, *代表不过滤 
String topic = "myTopic001"; 
String subExpression = "*"; 
consumer.subscribe(topic, subExpression); 
//使用监听的方式接受信息,防止消费者本身一直在这阻塞等着消息 
consumer.registerMessageListener(new MessageListenerConcurrently() {
 
//list就是收到的所有消息 
@Override 
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 
for(MessageExt msg : list){
 
System.out.println(new String(msg.getBody())); 
} 
//给broker返回消费成功的信息,broker就会将这条消息状态改为success,这样这条消息之后就不会被其他consumer消费 
//相当于给broker返回了一个ack信息 
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
} 
}); 
//启动消费者 
consumer.start(); 
System.out.println("consumer start...."); 
} 
} 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/20586.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论