这篇文章主要为大家展示了“rabbitMq中消息可靠性的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“rabbitMq中消息可靠性的示例分析”这篇文章吧。
正文: 消息虽然有异步、高并发的优点,但具有一定延时、数据弱一致性,甚至消息发送/消费失败等缺点。所以,我们可以从生产者端,服务器和消费者端来保证消息可靠性,如果这三端都能保证,消息可靠性也就OK了。
生产者端:
-
异常捕获机制 客户端代码中的异常捕获,如果在业务逻辑执行过程中抛出异常,则重发或者回滚。 但是,没有异常不代表一定成功,所以此方案是最大努力保证。
注:也可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试
-
AMQP/RabbitMQ的事务机制 开启mq的事务模式,如果事务能成功提交,肯定发送成功了。 如下图:
但是,此方式性能开销很大,一般不用。
-
发送端确认机制(publish confirm,后文简称pc) 所谓发送端消息确认,就是保证消息发送成功,如果不成功要能知道哪些消息不成功,并做出相应的处理。
pc原理: 在创建channel后,把channel设置为confirm模式,通过此信道发送的消息都会生成一个唯一id(msg_id,从1开始)。消息成功发送到队列后,会给发送端返回ack确认消息,失败则返回Nack消息。
特别说明: 所谓返回ack和nack,其实就是两个回调函数。 回调函数中包含msg_id,还有两个参数: Long deliveryTag 和Boolean multiple。 如果multiple为true,代表msg_id <= deliveryTag 的消息都被确认了(Nack代表都没确认); 如果multiple为false,代表只有msg_id=deliverTag的这条消息被确认了(Nack代表没被确认)。
下面,分三种情况来说一下pc模式的应用(前两种做了解不推荐,引出第三种)。
confirm模式1:
以上这种方式是同步阻塞,发送一条消息等到结果确认后,才发送下一条,性能很差。
confirm模式2: 在模式1的基础上,采用批量发送。
此模式和模式1一样也是同步,通过异常判断,只不过是一批等待一次,效率好一些。 不过最大的问题是,如果失败了,我们只知道这一批有问题,但不知道是哪一些,无法针对性处理。
confirm模式3: 还记得我们前面说ack和nack回调函数,但是前两种模式都没有显示的调用,只是通过异常来判断。。我们可以通过重写和显示调用,来处理成功或失败消息的逻辑,这样,生产者发送消息后不需要等待结果,可继续投递下一条。
如上图(也是需要先把channel声明为确认模式,图中没有截出来),定义重写了两个ConfirmCallback,在addConfirmListener中根据参数来区分,谁是ack,谁是nack。 当消息成功或失败后会执行相关的回调,可以在其中处理逻辑,如每次发送消息后,会把消息也放到ConcurrentNavigableMap中,当执行ack回调后,,清空map中已被确认的消息(ConcurrentNavigableMap的用法,自己去补哈)。 服务器端: 消息持久化。持久化是保障mq可靠的基础,有几个方面: Exchange: 定义时设置durable 参数为true Queue:定义时设置durable 参数为true 消息: 投递模式设置为2(BasicProperties 中的 deliveryMode 属性)
除此之外,如有需要,broker端还需要做高可用的集群架构。 这里不做描述,后面会专门拿一篇来讲如何做。 消费者端: 消费者端要保证的就是,消息一定消费成功,如果不成功,消息不能丢失。 可以从三方面来做: 1、采用NONE模式,即不需要ack确认 。 但是一定要在代码中把消息存起来,并作异常捕获,确认成功后再删除。 2、采用AUTO(自动Ack)模式。 如果有异常会把消息重新放回队列,重新消费直到成功或者过期。 但是,没异常也不代表消费一定会成功。 3、采用MANUAL(手动Ack)模式。 手动确认是保证消息不丢失的最好途径。
二: 可靠性其他优化 单从消息层面来说,保证发送端,消费端和服务器的可靠后,基本就问题不大了。。但是从系统层面来说,还是需要再做一些优化: 如果消息生成远超过消费,比如秒杀瞬间消息量暴增,消息中间件本身的缓冲能力是有容量限制的,就可能导致broker崩溃。 限流,可以从几个方面来做: 限流: 1、rabbit可以对内存和磁盘使用量设置阈值,达到阈值后,会暂时阻止消息发布者端的连接,不再接收消息。 可在/etc/rabbitmq/rabbitmq.conf中配置: 磁盘配置: 内存配置:
2、基于credit flow 的流控机制,这个机制是针对单个connection的。 在单个队列达到最大流速,或该connection下所有队列达到总流速时,都会触发流控。 触发时可能时因为connetion,exchange,或者是queue的某一个过程处于flow状态,这些可以通过界面查看到: 3、限制channel上未被ack确认的消息数 通过在消费前,调用channel.basicQoS 方法设置该数量-prefetchCount。 一旦给消费的发送的消息达到prefetchCount未确认,就不会再向消费者推送了 (非None Ack模式下的推送模式 才有效果),这样可以缓解消费者压力。
4、另外,可以提升消费端的消费能力,如优化程序性能,增加消费节点,增加并发消费线程数等。
二:消息的可靠性分析
在使用中间件过程中,我们有上面很多的方式来保证可靠性,可也难保有消息丢失的情况,这时我们需要有消息轨迹可以追踪。
rabbitmq可以使用Firehose 功能来追踪生产者发送或者消费者消费的消息,原理是把需要追踪的消息,通过一个topic类型的交换机amq.rabbitmq.trace发送到队列中(首先要去建立两个队列,一个放发送的消息,一个放消费的消息,然后将此交换机和两个队列绑定,bingdingKey分别为:publish.{exchangename} 和 deliver. {queuename}。exchangename和queuename代表正常业务中,生产者发送消息的交换器名称 和 消费者消费消息的队列名称)。 绑定后,在正常发送或消费消息,这些消息就能路由到刚才两个追踪的队列中。
使用Firehose,需要先开启Firehose命令 (默认关闭,服务重启恢复默认状态),开启和关闭命令如下:
rabbitmqctl trace_on [-p vhost] rabbitmqctl trace_off [-p vhost]
Firehose 开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额 外的消息生成、路由和存储。此外,没有管理界面,只能获取追踪队列的消息来看,不方便。
这时候,我们可以使用rabbitmq_tracing 插件,它是Firehose的GUI版本,可以通过界面方便的操作,能把消息以text/json的格式记录到文件中,方便追踪。 用法如下:
1、开启插件: rabbitmq-plugins enable rabbitmq_tracing (禁用是disable)
2、在界面中操作(不需要创建队列,是保存在文件中) 比如要记录发送的消息:
如图,在Add a new trace可以创建一个新的trace(追踪): name: 生成trace文件的名称,自定义 format: 消息保存文件的形式 text 或 json username/password: 连接rabbit的用户名和密码 payload bytes: 消息的大小,超过这个数后不能记录到文件中。 不填即所有消息 pattern: 指定哪些 发送方/消费方的消息会记录到文件(可用通配符)。 如: publish.#:代表只要是投递的消息,都要记录。 deliver.#: 代表只要是消费的信息,都要记录。 #.amq: 代表交换机名称为amq的投递消息,或者队列名为amq的消费消息,都要记录。
新建trace后,可用在界面停止采集或删除,也可以查看里面的消息内容。 这种用法,对于一些特别重要的消息,保证可靠性还是很有必要的
以上是“rabbitMq中消息可靠性的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
原创文章,作者:506227337,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/240183.html