RabbitMq简介
1.1消息队列中间件简介
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ
以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景
1.2什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
2.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
3.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
4.多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
5.多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
6.管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
7.跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
8.插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
安装准备工具
1.下载Eralng,下面链接已提供otp_win64_20.2.exe
链接: https://pan.baidu.com/s/1lmvCMPVAV1Ba9UogCdQpZg
提取码:x9m7
2.下载rabbitmq,下面链接已提供rabbitmq-server-3.7.4.exe
链接: https://pan.baidu.com/s/1CPfhg5X1e7UitpgMWIcAEg
提取码:h4r3
安装步骤(图文)
1、安装erlang并配置环境变量
1.1 双击otp_win64_20.2.exe,点击next
1.2 选择安装目录
1.3 配置环境变量
新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
ERLANG_HOME 路径:E:/Program Files/erl9.2
双击系统变量path,点击“新建”,将%ERLANG_HOME%/bin加入到path中。
%ERLANG_HOME%/bin
1.4 验证erlang是否安装成功
win+R键,输入cmd,再输入erl,看到erlang版本号就说明erlang安装成功了。
2、安装RabbitMQ
2.1 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
2.2 安装RabbitMQ-Plugins
打开命令行cd,输入RabbitMQ的sbin目录。
我的目录是:
E:/Program Files/RabbitMQ Server/rabbitmq_server-3.7.4/sbin
然后输入以下命令进行安装
rabbitmq-plugins enable rabbitmq_management
2.3 验证rabbitmq是否安装成功
输入 以下命令
rabbitmqctl status
如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常
2.4 打开浏览器,地址栏输入mq访问地址,即可看到管理界面的登陆页
http://127.0.0.1:15672 【注意页面端是:15672,服务端是:5672】
2.5 输入用户名和密码,都为guest 进入主界面:
最上侧的导航依次是:概览、连接、信道、交换器、队列、用户管理
安装过程中遇到的问题
代码示例在框架中测试:
安装php-amqplib
php-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。我们使用composer
来安装。
composer require php-amqplib/php-amqplib
示例说明
生产者(Producer)和消费者(Consumer)是消息队列的基本概念,生产者是指生产消息的一方,也是消息发送方,消费者就是消费消息的一方,也是消息接收方,队列就是存储消息的一个缓存区。
本实例将由生产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:皮鞋生产打包打包车间,不断有成品鞋进入传送带(消息队列)等待操作工人(消费者)将皮鞋打包。
因为等待打包的鞋子特别多,我们需要安排多个打包工人在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保工人最后打包好的皮鞋数量一双不少,不能因为打包工人操作慢或者个人原因暂时离开生产线,导致最终打包数不一致。
消息发送
生产者将消息发送给队列,至于谁来消费(处理)这些消息,生产者不管。
消息队列(MQ),用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
消息到达队列中后,如果没有一个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在生产者和消费者中都要将queue_declare
第3个参数设置为true,表示让消息队列持久化。
$channel->queue_declare($queue, false, true, false, false);
此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在生产者端设置:'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
。
现在我们建立生产者文件sender.php,我们假设服务端已经安装好RabbitMQ,并且开放好对应端口。
<?php
/**
* @Author: Helloweba
* @sender.php
* @消息生产者-分发任务
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib/Connection/AMQPStreamConnection;
use PhpAmqpLib/Message/AMQPMessage;
$queue = 'worker';
//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
'192.168.0.100',
56720,
'helloweba', //user
'helloweba', //password
'test' //vhost
);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表示让消息队列持久化
for ($i = 0; $i < 100; $i++) {
$arr = [
'id' => 'message_' . $i,
'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
'content' => 'helloweba-' . time()
];
$data = json_encode($arr);
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////设置rabbitmq重启后也不会丢失队列,或者设置为'delivery_mode' => 2
$channel->basic_publish($msg, '', $queue);
echo 'Send message: ' . $data . PHP_EOL;
}
$channel->close();
$connection->close();
?>
上述代码中,我们模拟了生产者向队列中发送了100条订单消息。
消息接收
消费者是指完成消息的接收和处理的客户端程序,消费者就如同生产线上的操作工人,他们按照操作规程从传送带上取出产品后有序的完成后续工作任务。
实际项目中,如果消费者处理消息能力不够时,就要开启多个消费者来消费队列中的消息。默认情况下,RabbitMQ将会把队列中的消息平均分配给每个消费者。如果消费者要对分配到的消息任务处理时间很长(耗时任务),那么处理消息任务的时候就有可能会遇到意外。
比如某个消费者断电了,或者出故障了,那它正在处理的消息会怎么办?这里就是RabbitMQ的消息确认机制,为了保证数据不丢失,RabbitMQ会将未处理完的消息分配给下一个消费者处理。
此外RabbitMQ还可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,因为消费者无法同时处理多个消息任务。
换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。
$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
我们现在建立消费者文件receiver.php,代码如下:
<?php
/**
* @Author: Helloweba
* @receiver.php
* @消息消费者-接收端
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib/Connection/AMQPStreamConnection;
$queue = 'worker';
//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;
$callback = function($msg){
echo " Received message:", $msg->body, PHP_EOL;
sleep(1); //模拟耗时执行
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4个参数值为false表示启用消息确认
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
模拟测试
现在我们运行多个消费者终端,可以打开多个ssh客户端,client1和client2运行:
php receive.php
然后再开个终端,运行生产者:
php sender.php
由于消费者是阻塞运行的,他们会一直等待队列中的消息,当有消息就会去取出来处理。我们可以模拟将其中某个客户端中断,即断开某个消费者。
然后再看消息是不是被其他消费者接收处理了。同样我们可以模拟将客户端全部重启,看看队列中的消息是否没有丢失。
当client1中断连接RabbitMQ后,再次运行连接RabbitMQ,在client2中看到的消息处理情况,注意看图中的消息id。
client1
client2:
原创文章,作者:1402239773,如若转载,请注明出处:https://blog.ytso.com/274725.html