其他网址

平台搭建—Kafka使用—Kafka重复消费和丢失数据_diggerTT的博客 

简介

消息在生产者、Broker、消费者 都可能丢失。

生产者丢失消息

情景1:消息太大

描述

消息大小超过Broker的message.max.bytes的值。此时Broker会直接返回错误;

解决方案 

指定消息的大小

Producer的max.requests.size表示生产者发送的单个消息的最大值,也可以指单个请求中所有消息的总和大小。此值必须小于Broker的message.max.bytes。

情景2:消息未到达Broker

描述

生产者发出消息后,网络突然中断,导致消息没发到Broker。或者Broker挂了,没收到。

解决方案

1.异步发送、提供回调方法

结论

使用带回调的API,在发送失败时进行处理(比如存储到其他介质中在后边进行补偿)

详述:

生产中,我们会用Kafka生产者的异步发送,有如下两个API:

producer.send(msg) 不带回调方法
producer.send(msg,callback) 带回调方法

2.消息重试

  • 设置重试次数:retries            //经验值:3。默认值:
  • 设置重试间隔:retry.backoff.ms    //经验值:20000。默认值:1000
    • 设置此项,使其过段时间后再重试,这时网络可能已经好了。重试若间隔太近,短时间网络还没好,会浪费了重试次数。
    • 设置此项,会导致消息顺序改变。保证顺序不变的方法:配置max.in.flight.requests.per.connection=1  (作用:限制客户端在单个连接上能够发送的未响应请求的个数。设为1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。)
  • 设置重连间隔:reconnect.backoff.ms     //经验值:20000。默认值:50

3.响应个数

结论

设置多少个Broker副本写入成功才返回成功响应给生产者。 acks //经验值:all。默认值:1

详述

acks这个参数有三个值:0,1,-1(all)。

acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。

  • 0:生产者只要把消息发送出去以后就认为消息发送成功了。
    可能会导致数据丢失:网络挂了、broker保存消息失败
  • 1:生产者把消息发送到服务端,服务端的leader replica 副本写成功以后,就返回生产者消息发送成功了。
    可能会导致丢数据:有可能刚好数据写入到leader replica,然后返回处理成功的响应给生产者,假如这个时候leader replica在的服务器出问题了,follower replica还没来得及同步数据,这时会丢数据。
  • all:所有replica 都写入成功以后,才会返回成功响应给生产者。
    假设有该分区的三个replica(一个leader replica,两个follower replica),那么acks=-1就意味着消息要写入到leader replica,并且两个follower replica从leader replica上同步数据成功,服务端才会给生产者发送消息发送成功的响应。 想要保证数据不丢,那么acks的值设置为-1,并且还需要保证有1个以上的副本

4.发送延迟(对应:场景2) 

结论

设置linger.ms。  //经验值:50。默认值:0

详述

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,从而一定程度上减少碰到网络问题的可能。

详见:https://kafka.apache.org/documentation/

Broker丢失消息

情景1:leader挂掉,follower未同步

描述

假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

说明:Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

解决方案

1.leader竞选资格(对应情景1)

结论

设置unclean.leader.election.enable  //经验值:false。默认值:false

详解

这个参数控制leader replica出问题了以后follower replica竞选leader replica资格。

设置为false,意思就是如果follower replica如果落后leader replica太多就不能参与竞选。

2.副本的个数(对应情景1)

结论

设置replication.factor为大于1的数  //经验值:3。默认值:1

详解

        这个参数设置的是partition副本的个数,如果我们要想保证数据不丢,这个副本数需要设置成大于1。

3.最小的副本数 

结论

设置min.insync.replicas为大于1的数,且要小于replication.factor  //默认值:1

详解

  • min.insync.replicas大于1
            这参数要跟生产者里的acks参数配合使用,当生产者acks=all时,服务端所有副本都写入成功,才会给生产者返回成功的响应。
            min.insync.replicas就是控制消息至少要被写入到多少个副本才算是“已提交”。假设min.insync.replicas=1,则表示可只有一个副本,这副本就是leader replica,这时即使acks设为all,但其实消息只发送到leader replica,以后就返回成功的响应了。
  • min.insync.replicas小于replication.factor
            为了保证整个 Kafka 服务的高可用,需确保 replication.factor > min.insync.replicas 。为什么呢?假如两者相等,只要有一个副本挂掉,整个分区就无法正常工作,这明显违反高可用!一般推荐设置成 replication.factor = min.insync.replicas + 1。

情景2:Kafka在刷盘前挂了

注意

       概率比较小,本项可以不进行优化。

描述

        Kafka的Broker数据持久化时,会先存储到页缓存(Page cache)中,按时间或其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache时,如果系统挂掉,数据会丢失。

        为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法:按照一定的消息量和时间间隔进行刷盘。

        Kafka没有提供同步刷盘的方式。同步刷盘在RocketMQ中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应。

解决方案

1.消息个数

结论

设置log.flush.interval.messages   //默认值:9223372036854775807

详解

当消息个数达到设置的值时,会把消息写到磁盘。

2.时间间隔

结论

设置log.flush.interval.ms   //默认值:log.flush.scheduler.interval.ms(60000(1分钟))

详解

当内存中消息的时间达到设置的值时,会把消息写到磁盘。

消费者丢失消息

情景1:提交偏移量后消息处理失败

描述

        先提交偏移量,后处理消息,当消息处理失败时,该消息丢失。

        先处理消息,后提交偏移量,当提交偏移量失败时,该消息会重复。消息重复只需处理幂等即可,消息重复在下边一节会介绍。

解决方案

手动提交offset

结论

设置enable.auto.commit  //经验值:false(手动提交)。默认值:true

详解

手动提交方法有如下几种:

  1. 异步提交
    consumer.commitAsync()、consumer.commitAsync(new OffsetCommitCallBack())
  2. 同步提交
    consumer.commitSync()

同步提交会一直重试直到成功,需要Broker的响应,而异步提交能够提高吞吐量。

消息零丢失的配置

如果上述所有情况都做到了,即可以做到消息零丢失。

总览很多博客,做到上边的配置后,在生产中已经可以消息零丢失了。

下边这些情况也会导致修饰消息,但可以忽略它

Kafka的数据首先是写到操作系统缓存的,假如我们用了上面的配置方案,数据写入成功了,还没落到磁盘,但是集群停电了,这个时候也是会丢数据的。如果配置其立马写入磁盘,则会降低吞吐量,一般不会这么配置。