rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
package com.tszr.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Productor {
public static void main(String[] args){
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、向 Queue1 发布20个消息
for (int i = 0; i < 20; i++) {
String msg = "晴天: " + i;
channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.tszr.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Worker1 {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
// Channel 使用 Qos 机制
finalChannel.basicQos(16);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("Worker1 开始接收消息");
System.in.read();
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.tszr.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker2 {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
// Channel 使用 Qos 机制
finalChannel.basicQos(5);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker2" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("Worker2 开始接收消息");
System.in.read();
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}


相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/270911.html