rabbitmq的简单介绍二详解编程语言

上一篇博客我们没有介绍完rabbitmq,今天我们接着上一篇的博客继续介绍rabbitmq

这边的博客的内容如下

1、组播,对指定的队列设置关键词,通过关键词来控制消息的分发

2、更加细致的组播

先来介绍组播

其实组播和广播只有3个区别

a、组播的exchange的类型是“direct”

b、组播的生产端不需要绑定queue,只需要申明exchange,然后在发布消息的时指定route_key、exchange、message即可

c、消费者者需要绑定队列到exchange上,且也需要指定route_key,只有绑定到和生产者端相同的exchange的队列【队列名称可以不一样】,且生产端的route_key一致就可以收到消息

先看生产端的核心代码

import pika 
 
test_connectio = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 
 
test_channel = test_connectio.channel() 
 
 
test_channel.exchange_declare(exchange="logs", 
                              exchange_type="direct") 
 
#在队列中什么一个exchange,类型为“direct” 
 
 
test_channel.queue_declare(queue="zhang1",durable=True) 
test_channel.queue_declare(queue="zhang2",durable=True) 
test_channel.queue_declare(queue="zhang3",durable=True) 
 
test_channel.basic_publish(exchange="logs", 
                           routing_key="error", 
                           body="this is 组播") 
 
 
#发布消息,将消息发往exchange名称为“logs”中,且指定的routing_key为error的队列中 
 
test_channel.close() 

 

在看消费者的核心代码

import pika 
 
test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 
 
test_channel = test_connection.channel() 
 
 
test_channel.exchange_declare(exchange="logs", 
                              exchange_type="direct") 
 
#在管道中申明一个exhcange 
 
 
test_channel.queue_declare(queue="zhang2",durable=True) 
#在管道中申明一个队列 
 
 
 
test_channel.queue_bind(exchange="logs", 
                        queue="zhang2", 
                        routing_key="info") 
#将队列绑定到exchange上,并指明我这个消费者的routing_key,只有和生产端相同 
#routing_key的消费者才可以收到生产者的消息,这里可以绑定多个routing_key 
 
 
def callback(ch,method,properties,body): 
    print("server:",body) 
 
 
 
test_channel.basic_consume(callback, 
                      queue="zhang2", 
                      no_ack=True) 
 
test_channel.start_consuming() 

 

二、这里我们看下更细致的消息控制

更细致的消息控制是什么意思呢?比如我们实现这样一个场景;消费者可以接受*.error的日志,还可以接受apache.info的日志

更细致的消息控制和组播有2个不同

1、更细致的消息控制用到的exchange的类型为topic

2、routing_key的用法用法不一样

下面我们看下生产者的核心代码

 import pika 
import sys 
 
test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 
 
test_channel = test_connection.channel() 
 
test_channel.exchange_declare(exchange="topic_logs", 
                              exchange_type="topic") 
 
route_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info" 
 
message = "".join(sys.argv[2:]) or "hello world" 
 
test_channel.basic_publish(exchange="topic_logs", 
                           routing_key=route_key, 
                           body=message) 
 
print("[x] send %r %r" %(route_key,message)) 
 
test_connection.close() 

  

在看消费者的核心代码

import pika 
import sys 
test_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 
 
test_channel = test_connection.channel() 
 
test_channel.exchange_declare(exchange="topic_logs", 
                              exchange_type="topic") 
 
result = test_channel.queue_declare(exclusive=True) 
 
queue_name = result.method.queue 
binding_keys = sys.argv[1:] 
 
if not binding_keys: 
    sys.stderr.write("Usage: %s [binding_key]..../n" %sys.argv[0]) 
    sys.exit(0) 
 
for binding_key in binding_keys: 
    test_channel.queue_bind(exchange="topic_logs", 
                            queue=queue_name, 
                            routing_key=binding_key) 
 
print("waiting for message") 
 
def callback(ch,method,properties,body): 
    print("[x] %s" %(body)) 
 
test_channel.basic_consume(callback, 
                           queue=queue_name, 
                           no_ack=True) 
 
test_channel.start_consuming() 

我们可以做如下测试

在生产者端执行下面的命令,将消息发往mysql.error,消息的内容是xxxxxxxx

python rabbitmq_send.py mysql.error xxxxxxxxx 

在消费者端执行下面的命令,该消费者接受所有的*.info的消息,和mysql.info的消息

python rabbitmq_receive.py *.info mysql.error 

  

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20862.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论