建立maven项目,导入下面依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.1</version>
</dependency>
保证已在服务器启动nameserver和broker,强烈推荐读者使用上文提到的网页版UI观察消息队列状态,下面展示三种发送生产消费的方法:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* 消息发送者Producer
*
* @author Song X.
* @date 2020/03/02
*/
public class Producer_Sync {
public static void main(String[] args) throws Exception {
//创建生产者组
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01");
//设置nameserver地址,如1.1.1.1:9876
producer.setNamesrvAddr("enter the addr of nameserver");
//启动producer
producer.start();
//topic消息将要发往的地址
//body是真正的消息体
Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes());
//同步阻塞式发消息,也就是发送完消息,必须等broker返回一个发送成功的消息,才会做其他事
//这种方式消息发送的安全性可以得到保证,但是同步阻塞必定慢
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
//关闭消费者
producer.shutdown();
}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 消息发送者Producer
*
* @author Song X.
* @date 2020/03/02
*/
public class Producer_Asyc {
public static void main(String[] args) throws Exception {
//创建生产者组
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01");
//设置nameserver地址
producer.setNamesrvAddr("enter the addr of nameserver");
//启动producer
producer.start();
//设置异步发送失败时的重设时间
producer.setRetryTimesWhenSendAsyncFailed(1);
//异步非阻塞式发消息,也就是发送完消息,不等broker返回一个发送成功的消息,就做其他事了
//那么如何获得broker的确认信息,采用监听的形式
//这种方式消息发送的安全性无法得到保证,但是速度快
for(int i = 0; i < 100; i++) {
final int index = i;
StringBuffer body = new StringBuffer("Rocket MQ 第");
body.append(i+"条消息");
Message msg = new Message("myTopic001", body.toString().getBytes(
RemotingHelper.DEFAULT_CHARSET
));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("第 " + index + " 消息发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("第 " + index + " 消息发送失败");
throwable.printStackTrace();
}
});
}
//关闭消费者,发送异步消息时,切勿使用这句话,因为无法确定上面的回调方法什么时候执行
//producer.shutdown();
}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* 消息发送者Producer
*
* @author Song X.
* @date 2020/03/02
*/
public class Producer_OneWay {
public static void main(String[] args) throws Exception {
//创建生产者组
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup01");
//设置nameserver地址
producer.setNamesrvAddr("192.168.255.131:9876");
//启动producer
producer.start();
//topic消息将要发往的地址
//body是真正的消息体
Message msg = new Message("myTopic001", "RocketMQ 第一条消息".getBytes());
//单向发送消息,不关心有没有发送成功。速度最快
producer.sendOneway(msg);
//关闭消费者
producer.shutdown();
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 消息消费者Consumer
*
* @author Song X.
* @date 2020/03/02
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup01");
//设置nameserver地址
consumer.setNamesrvAddr("enter the addr of nameserver");
//每个consumer只能关注一个topic
//topic 关注的topic名
//subExpression 根据Tag过滤 表示我需要过滤掉当前topic里的什么信息, *代表不过滤
String topic = "myTopic001";
String subExpression = "*";
consumer.subscribe(topic, subExpression);
//使用监听的方式接受信息,防止消费者本身一直在这阻塞等着消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//list就是收到的所有消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
//给broker返回消费成功的信息,broker就会将这条消息状态改为success,这样这条消息之后就不会被其他consumer消费
//相当于给broker返回了一个ack信息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("consumer start....");
}
}
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/20586.html