Topics
在上一个教程中,我们改进了我们的日志系统而不是使用只能进行广播的fanout交换类型,我们使用direct类型,能够选择性地接收日志。 虽然使用direct交换类型改进了我们的系统,但它仍然有限制 – 它不能基于多条件进行路由选择。 在我们的日志记录系统中,我们可能不仅要根据日志级别订阅日志,还可以基于日志源进行订阅。您可能会从syslog unix工具中了解过这个概念,该工具可以根据日志级别(info/warn/crit..)和设备(auth / cron / kern …)来路由日志。
这将给我们很大的灵活性 – 我们可能只想要监听来自“cron”的重要错误,但是想监听来自”kern“的所有日志。 要在我们的日志系统中实现这一点,我们需要了解一个更复杂的交换类型-Topic(主题)交换。
主题交换
发送到主题交换区的消息不能是任意的routing_key – 它必须是由点分隔的单词列表。这些单词可以是任何东西,但通常它们指定与消息相关联的一些功能。几个有效的路由密钥示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密钥中可以有任意多的单词,最多可达255个字节。
绑定键也必须是相同的形式。主题交换背后的逻辑类似于直接交换 – 使用特定路由密钥发送的消息将被传递到与绑定密钥匹配的所有绑定队列。但是,绑定密钥有两个重要的特殊情况:
- *(star)可以替代一个字。
- #(hash)可以替换零个或多个单词。
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由密钥发送。路由密钥中的第一个单词将描述速度,第二个描述颜色和第三个代表物种: “<speed>.<colour>.<species>“。 我们创建了三个绑定:Q1绑定键“* .orange.*”, Q2绑定“*.*.rabbit”和Q3绑定 “lazy.#“。
这些绑定可以总结为:
- Q1对所有的橙色动物感兴趣。
- Q2想听听有关兔子的一切,以及有关懒惰动物的一切。
将路由密钥设置为“quick.orange.rabbit”的消息将传递给两个队列。消息“lazy.orange.elephant”也会去他们两个。另一方面,“quick.orange.fox”只会转到第一个队列,而“lazy.brown.fox”只能到第二个。 “lazy.pink.rabbit”只会传递到第二个队列,即使它匹配两个绑定。 “quick.brown.fox”不匹配任何绑定,所以它将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,如 “orange” 或 “quick.orange.male.rabbit“,会发生什么?那么这些消息将不会匹配任何绑定,并将丢弃。 另一方面,”lazy.orange.male.rabbit“即使它有四个单词,但他将匹配最后的绑定,并将被传递到第二个队列。
主题交换(Topic exchange)
主题交换是强大的,可以像其他交换类型一样行事。
当队列与“#”(hash)绑定键绑定时,
它将接收所有消息,而不管路由密钥是什么,叫如fanout交换一样。
当特殊字符“*”(start)和“#”(hash)在绑定中不被使用时,主题交换将表现得像一个direct交换。
把它们组合在一起
我们将在我们的日志系统中使用主题交换。我们将从一个假设开始,日志的路由密钥将有两个单词组成: “<facility>.<severity>“。
该代码与上一个教程几乎相同。
EmitLogTopic.java的代码:
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... }
ReceiveLogsTopic.java:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
编译运行,像在Tutorial 1中一样,把classpath加进来 – 在Windows系统, 使用%CP%.
编译:
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志:
java -cp $CP ReceiveLogsTopic "#"
从设备 “kern“接收日志:
java -cp $CP ReceiveLogsTopic "kern.*"
或者只想接收级别为的”critical” 日志:
java -cp $CP ReceiveLogsTopic "*.critical"
可以创建多个绑定:
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
生产路由密钥为”kern.critical” 的日志:
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
开心的玩这些程序吧,注意,程序没有对于路由和绑定密钥有任何限制,你可以添加超过两个路由密钥。
(源代码 EmitLogTopic.java and ReceiveLogsTopic.java)
接下来,在教程6( tutorial 6)中找出如何做一个包含往返信息的远程过程调用。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99534.html