《译文:RabbitMQ 消费者确认和发布者确认》

原文链接 译者:flystarfly

介绍

使用消息代理(如RabbitMQ)的系统按定义分发。 由于发送的协议方法(消息)不能保证到达对方或被成功处理,所以发布者和消费者都需要一个交付和处理确认的机制。 RabbitMQ支持的一些消息协议提供了这样的功能。 本指南涵盖AMQP 0-9-1中的特性,但其他协议(STOMP,MQTT等)中的思路大致相同。

从消费者(consumers)投递rabbitmq代理(broker)的处理确认被认为是AMQP 0-9-1协议中的确认; rabbitmq代理(broker)对发布者(publishers)的确认被称作发布者确认(publisher confirms)

(消费者)投递确认

当RabbitMQ向消费者发送消息时,它需要知道如何思考才能成功地发送消息。 怎么样是最佳的逻辑需要取决于系统。 因此,这主要是一个应用型决策。 在AMQP 0-9-1协议中,这个决策是由当消费者使用basic.consume注册,或者使用basic.get按需提取消息等这些方法组成的。

如果您想要更多的向示例和手把手的材料,在RabbitMQ教程#2中也包含了消费者的确认。

投递标识符:投递标签

在我们开始讨论其他主题之前,解释如何确认投递(以及确认表示它们各自的传送)是非常重要的。 当消费者(订阅)被注册时,消息将由RabbitMQ使用basic.deliver方法投递(推送)。 该方法带有一个投递标签,它唯一地标识一个信道上的投递。 投递标签因此被限定在每个信道。

投递标签是单调增长的正整数,并由客户端类库提供。 客户端类库中的确认投递方法将投递标记当做参数。

确认模式

根据使用的确认模式,RabbitMQ可以在发送(写入TCP套接字)之后立即或在收到显式(“手动”)客户端确认后立即考虑成功传送消息。 手动发送的确认可以是正面的或负面的,并使用以下方法之一:

  • basic.ack用于肯定确认
  • basic.nack用于否定确认 (注意: :这是一个AMQP 0-9-1协议的RabbitMQ扩展)
  • basic.reject同样用于否定确认,但与basic.nack相比有一个限制

肯定的确认只是指导RabbitMQ将一个消息记录为已投递。basic.reject的否定确认具有相同的效果。 两者的差别在于:肯定的确认假设一个消息已经成功处理,而对立面则表示投递没有被处理,但仍然应该被删除。

在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

同时确认多个投递

可以批量手动确认以减少网络流量。
这是通过将确认方法的 multiple “字段(见上文)设置为 true 完成的。 请注意, basic.reject 历史上没有这个字段,这就是为什么RabbitMQ引入 basic.nack 作为协议扩展。

multiple 字段设置为 true 时,RabbitMQ将确认所有未完成投递标签,直到包括确认中指定的标签。 像所有其他与确认相关的信息,这是每个信道的范围。
例如,假设在信道 Ch 上没有确认投递标签5,6,7和8,当确认帧在 delivery_tag 设置为 8 multiple设置为 true ,从5到8的所有标签将被确认。
如果 multiple 被设置为 false ,那么投递标签5,6和7仍然是未确认的。

信道预取设置(QoS)

由于消息以异步方式发送(推送)到客户端,因此在任何给定的时刻通常都有多个消息在信道中。 另外,来自客户端的手动确认本质上也是异步的。 所以有一个未确认的投递标签的滑动窗口。 开发人员往往更喜欢限制这个窗口的大小,以避免消费者端无限的缓冲区问题。 这是通过使用basic.qos方法设置“预取计数”值来完成的。 该值定义了通道上允许的最大数量的未确认交付。 一旦数字达到配置计数,RabbitMQ将停止在信道上传送更多消息,直到有一个消息被确认。

例如,假设在通道Ch上未确认的交付标签5,6,7和8以及通道Ch的预取计数设置为4,那么RabbitMQ除非至少有一个投递被确认,否则将不会在通道Ch上推送更多投递。 当一个确认帧在delivery_tag被设置为8的频道上到达时,RabbitMQ会注意到并传送一条消息。

值得重申的是,投递流程以及客户端手动确认完全是异步的。 因此,如果在已经有投递的情况下预取值发生了变化,则会出现自然的竞争状态,并且在一个信道上可能暂时超过预取计数未确认的消息。

QoS设置可以为信道道或消费者配置。
有关详细信息,请参见消费预取

应谨慎使用无限QoS的手动确认模式。 消费者在没有确认的情况下消耗大量的消息将导致其所连接的节点上的内存消耗增长。 找到一个合适的QoS值是一个反复试验的问题,因工作负载而异。 100到300范围内的值通常提供最佳的吞吐量,并且不会承受压倒性消费者的重大风险。 经常会遇到 收益递减的规律.

即使在手动确认模式下,QoS设置也不会影响使用basic.get(“pull API”)获取的消息。

消费者失败或失去连接时:自动重新排队

当使用手动确认时,任何未收到的投递(消息)将在发送投递的信道(或连接)关闭时自动重新发送。 这包括客户端的TCP连接丢失,消费者应用程序(进程)失败以及通道级别的协议例外(如下所述)。

请注意,检测不可用客户端需要一段时间。

由于这种行为,消费者必须准备好处理消息重投递,否则就要考虑到幂等性
重投递将会有一个特殊的布尔属性,由RabbitMQ设置为 true redeliver 首次投递时,它将被设置为 false 。 请注意,消费者可以收到先前投递给其他消费者的消息。

客户端错误:双重确认和未知标签

如果客户确认投递标签不止一次,RabbitMQ将会导致信道错误,比如precondition_failed-未知的投递标签 100。如果使用未知的投递标记,则会引发同一个通道异常。

发布者(publishers)确认

使用标准AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务 – 使通道事务化,发布消息,提交。 在这种情况下,事物是重量级的,并会将吞吐量降低250倍。为了解决这个问题,引入了确认机制。 它模仿协议中已经存在的消费者确认机制。

要启用确认(confirm)模式,客户端要提供confirm.select方法。 根据是否设置了no-wait,代理可以用confirm.select-ok来回应。 一旦在信道上使用confirm.select方法,就说它处于确认模式。 事务通道不能进入确认模式,一旦通道处于确认模式,就不能进行事务处理。

一旦通道处于确认模式,代理和客户端都会计数消息(从第一个confirm.select开始计数)。 然后,代理通过在相同的信道上发送一个basic.ack来处理消息,从而确认消息。 delivery-tag字段包含确认消息的序列号。 代理也可以在basic.ack中设置多个字段,以指示已经处理了包括序列号在内的所有消息。

一个JAVA中以确认模式向通道发布大量消息并等待确认的示例可以点击here的。

否定确认

在特殊情况下, rabbitmq代理(broker)无法成功处理消息,而不是basic.ack, rabbitmq代理(broker)将发送一个basic.nack。 在这种情况下, basic.nack的属性与basic.ack中对应的属性意义相同,并且对requeue 属性不予理会。在否定一条或者多条消息之后, rabbitmq代理(broker)表示无法处理这些信息,并拒绝对这些消息负责; 在这种情况下,客户端可以选择重新发布消息。

信道进入确认模式后,所有随后发布的消息将被确认或否定一次。消息的确认时间无法保证。消息不会被同时确认和否定。

basic.nack只会在负责队列的Erlang进程中发生内部错误。
当消息被重新发送时,如果可能的话,它将被放置在其队列中的原本位置。 如果不是(由于当多个消费者共享一个队列时来自其他消费者的并发递送和确认),则该消息将被重新排队到靠近队列头部的位置。

消息什么时候会被确认?

对于不可路由的消息,一旦交换机验证消息不会路由到任何队列(返回空的队列列表),代理将发出确认。 如果消息也是强制发布的,则发送 basic.return basic.ack 之前的客户端。 否定确认( basic.nack )也是如此。

对于可路由消息,当消息被所有队列接受时,发送 basic.ack 。 对于路由到持久队列的持久消息,这个意味着持久化到磁盘。 对于镜像队列,这意味着所有的镜像都已经接受了这个消息。

持久消息的延迟ACK

basic.ack 为一个持久的消息路由到一个
持久化队列将在将消息保存到磁盘之后发送。 RabbitMQ消息存储在间隔(几百毫秒)之后将消息持久保存到磁盘,以最大限度地减少fsync(2)调用的次数,或者当一个队列处于空闲状态时。
这意味着在恒定负载下, basic.ack 的延迟可以达到几百毫秒。 为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息并等待未完成的确认。 客户端库的确切API不尽相同。

发布者确认的注意事项

在大多数情况下,RabbitMQ将以发布的顺序向发布者确认消息(这适用于发布在单个通道上的消息)。 但是,发布者确认是异步发送的,并且可以确认一条消息或一组消息。 发出确认的确切时刻取决于消息的传递模式(持久性与瞬态)以及消息被路由到的队列的属性(参见上文)。 也就是说不同的消息可以被认为是准备好在不同的时间进行确认。 这意味着确认可以以与它们各自的消息相比不同的顺序到达。 在可能的情况下,应用程序不应该依赖于确认的顺序。

发布者确认并保证送达

如果代理在所述消息写入磁盘之前崩溃,代理将丢失持久性消息。 在某些条件下,这导致代理以令人惊讶的方式行事。

例如,考虑这种情况:

此时,客户端可以合理地认为该消息将被再次投递。 情况并非如此:重启导致代理丢失信息。 为了保证持久性,客户端应该使用确认模式。 如果发布者的信道处于确认模式,则发布者不会收到已丢失消息的确认(因为消息还没有写入磁盘)。

最大投递标签

投递标签是一个64位长的值,因此其最大值是9223372036854775807.由于投递标签的范围是每个信道的,所以发布者或消费者在实践中不太可能运行这个值。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99007.html

(0)
上一篇 2021年8月21日 01:22
下一篇 2021年8月21日

相关推荐

发表回复

登录后才能评论