Routing
在上一节我们建立了一个简单的日志系统,已经能够传播日志信息给接收者了。
在这一节我们将给它增加一个特性-订阅部分消息。比如说,我们能够从控制台打印的所有日志信息中将至关重要的错误信息指向日志文件(保存在硬盘里)。
Bindings
在前面的例子中我们已经创造了bindings,你可以这样调用它
channel.queueBind(queueName, EXCHANGE_NAME, "");
一个bindings就是queue和exchange之间的一种映射关系(多对多关系),将queue绑定到exchange上,也可以这样理解: queue对来自此exchange的消息感兴趣(传递消息)。
Bindings创建的时候也可以同时附带一个额外的名为 routingKey 的参数,为了避免与 basic_publish 参数混淆,我们把他叫做 binding key .我们可以这样创建一个带有binding key的 bindings。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
这意味着一个 binding key 的值取决于于exchange的类型,像前面提到的 fanout 类型的 exchange, 就会忽略它的值。
Direct exchange
在前几节中我们的日志系统传播所有的信息给所有的消费者,我们想基于它实现根据信息的严重程度过滤信息的功能,比如说我们想要一个程序只会将error级别的日志写入硬盘,warning和info级别的日志就不浪费硬盘空间了。
我们前面使用的是 fanout 类型的exchange,它并没有给我们更多的灵活性 – 它只会无意识的传播所有它接收到的信息
这里我们将使用一个 direct 类型的exchange 替换它, direct exchange的路由算法是非常简单的,只有当目标queue的 binding key 和 消息的routing key完全相等时才会进行路由。
为了详细说明它,考虑下面的流程:
在这个流程里面,我们可以看到一个名为x的 direct exchange绑定了两个queue,第一个queue通过名为orange 的 binding key 绑定,第二个队列有两个 bindings, 一个叫 black,另一个叫green.
在上述流程中一个routing key为 orange 的消息将被路由到Q1这个queue上面,routing key为black或者green的消息将被路由到Q2这个queue上。其他的消息将会被丢弃。
Multiple bindings
其实通过相同的binding key绑定多个queue也是完全可以的,在我们的示例中我们可以将x与Q1也通过black binding key绑定在一起,此时这个direct exchange将和fanout exchange传播信息的行为一样,都会将信息传播给相匹配的queue。所以routing key为black的消息将被传递到Q1和Q2两个queue里面.
发送日志
下面我们将为我们的日志系统使用这个模型。用direct exchange 替换掉fanout exchange来发送消息,我们将把日志的严重程度(severity)作为一个routing key.然后接收程序就可以根据消息的严重程度(severity)来接收。首先,让我们来看看发送日志部分。
像往常一样,首先我们需要创建一个exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
发送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化事情我们假定’severity’ 就是’info’,’warning’或者’error’,即日志消息的严重程度有info,warning或者error三种级别。
订阅
接收消息部分除了创建binding时附带上severity之外和前面章节都相同:
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
看看效果图:
发送消息代码类EmitLogDirect.java:
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
}
接收消息代码类ReceiveLogsDirect.java:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
将java代码编译成class文件(看第一节对编译和类路径选择的建议),运行的时候为了方便我们用 $CP 环境变量(windows 上是 %CP%)代替类路径.
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果你仅仅想将‘warning’和‘error’(不包括’info’)级别的消息保存进一个日志文件,只需打开一个控制台输入:
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果你想输出所有级别的日志信息在你的屏幕上,只需打开终端这样输入:
java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
发送一个error级别的日志消息:
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(全部的示例代码EmitLogDirect.java和ReceiveLogsDirect.java)
去第五节查看如何基于模式去监听消息
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99537.html