简介
消息在生产者、Broker、消费者 都可能丢失。
生产者丢失消息
情景1:消息太大
描述
消息大小超过Broker的message.max.bytes的值。此时Broker会直接返回错误;
解决方案
指定消息的大小
Producer的max.requests.size表示生产者发送的单个消息的最大值,也可以指单个请求中所有消息的总和大小。此值必须小于Broker的message.max.bytes。
情景2:消息未到达Broker
描述
生产者发出消息后,网络突然中断,导致消息没发到Broker。或者Broker挂了,没收到。
解决方案
1.异步发送、提供回调方法
结论:
使用带回调的API,在发送失败时进行处理(比如存储到其他介质中在后边进行补偿)
详述:
生产中,我们会用Kafka生产者的异步发送,有如下两个API:
producer.send(msg) 不带回调方法
producer.send(msg,callback) 带回调方法
2.消息重试
- 设置重试次数:retries //经验值:3。默认值:
- 设置重试间隔:retry.backoff.ms //经验值:20000。默认值:1000
- 设置此项,使其过段时间后再重试,这时网络可能已经好了。重试若间隔太近,短时间网络还没好,会浪费了重试次数。
- 设置此项,会导致消息顺序改变。保证顺序不变的方法:配置max.in.flight.requests.per.connection=1 (作用:限制客户端在单个连接上能够发送的未响应请求的个数。设为1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。)
- 设置重连间隔:reconnect.backoff.ms //经验值:20000。默认值:50
3.响应个数
结论
设置多少个Broker副本写入成功才返回成功响应给生产者。 acks //经验值:all。默认值:1
详述
acks这个参数有三个值:0,1,-1(all)。
acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。
- 0:生产者只要把消息发送出去以后就认为消息发送成功了。
可能会导致数据丢失:网络挂了、broker保存消息失败- 1:生产者把消息发送到服务端,服务端的leader replica 副本写成功以后,就返回生产者消息发送成功了。
可能会导致丢数据:有可能刚好数据写入到leader replica,然后返回处理成功的响应给生产者,假如这个时候leader replica在的服务器出问题了,follower replica还没来得及同步数据,这时会丢数据。- all:所有replica 都写入成功以后,才会返回成功响应给生产者。
假设有该分区的三个replica(一个leader replica,两个follower replica),那么acks=-1就意味着消息要写入到leader replica,并且两个follower replica从leader replica上同步数据成功,服务端才会给生产者发送消息发送成功的响应。 想要保证数据不丢,那么acks的值设置为-1,并且还需要保证有1个以上的副本
4.发送延迟(对应:场景2)
结论
设置linger.ms。 //经验值:50。默认值:0
详述
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,从而一定程度上减少碰到网络问题的可能。
Broker丢失消息
情景1:leader挂掉,follower未同步
描述
假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
说明:Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
解决方案
1.leader竞选资格(对应情景1)
结论
设置unclean.leader.election.enable //经验值:false。默认值:false
详解
这个参数控制leader replica出问题了以后follower replica竞选leader replica资格。
设置为false,意思就是如果follower replica如果落后leader replica太多就不能参与竞选。
2.副本的个数(对应情景1)
结论
设置replication.factor为大于1的数 //经验值:3。默认值:1
详解
这个参数设置的是partition副本的个数,如果我们要想保证数据不丢,这个副本数需要设置成大于1。
3.最小的副本数
结论
设置min.insync.replicas为大于1的数,且要小于replication.factor //默认值:1
详解
- min.insync.replicas大于1
这参数要跟生产者里的acks参数配合使用,当生产者acks=all时,服务端所有副本都写入成功,才会给生产者返回成功的响应。
min.insync.replicas就是控制消息至少要被写入到多少个副本才算是“已提交”。假设min.insync.replicas=1,则表示可只有一个副本,这副本就是leader replica,这时即使acks设为all,但其实消息只发送到leader replica,以后就返回成功的响应了。- min.insync.replicas小于replication.factor
为了保证整个 Kafka 服务的高可用,需确保 replication.factor > min.insync.replicas 。为什么呢?假如两者相等,只要有一个副本挂掉,整个分区就无法正常工作,这明显违反高可用!一般推荐设置成 replication.factor = min.insync.replicas + 1。
情景2:Kafka在刷盘前挂了
注意
概率比较小,本项可以不进行优化。
描述
Kafka的Broker数据持久化时,会先存储到页缓存(Page cache)中,按时间或其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache时,如果系统挂掉,数据会丢失。
为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法:按照一定的消息量和时间间隔进行刷盘。
Kafka没有提供同步刷盘的方式。同步刷盘在RocketMQ中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应。
解决方案
1.消息个数
结论
设置log.flush.interval.messages //默认值:9223372036854775807
详解
当消息个数达到设置的值时,会把消息写到磁盘。
2.时间间隔
结论
设置log.flush.interval.ms //默认值:log.flush.scheduler.interval.ms(60000(1分钟))
详解
当内存中消息的时间达到设置的值时,会把消息写到磁盘。
消费者丢失消息
情景1:提交偏移量后消息处理失败
描述
先提交偏移量,后处理消息,当消息处理失败时,该消息丢失。
先处理消息,后提交偏移量,当提交偏移量失败时,该消息会重复。消息重复只需处理幂等即可,消息重复在下边一节会介绍。
解决方案
手动提交offset
结论
设置enable.auto.commit //经验值:false(手动提交)。默认值:true
详解
手动提交方法有如下几种:
- 异步提交
consumer.commitAsync()、consumer.commitAsync(new OffsetCommitCallBack())- 同步提交
consumer.commitSync()同步提交会一直重试直到成功,需要Broker的响应,而异步提交能够提高吞吐量。
消息零丢失的配置
如果上述所有情况都做到了,即可以做到消息零丢失。
总览很多博客,做到上边的配置后,在生产中已经可以消息零丢失了。
下边这些情况也会导致修饰消息,但可以忽略它
Kafka的数据首先是写到操作系统缓存的,假如我们用了上面的配置方案,数据写入成功了,还没落到磁盘,但是集群停电了,这个时候也是会丢数据的。如果配置其立马写入磁盘,则会降低吞吐量,一般不会这么配置。
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/150980.html