上图中大致包含了这么几种场景:
- 生产者产生消息发送给RocketMQ
- RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失
- 消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束
这三种场景都可能会产生消息的丢失,如下图所示:
- 场景1中生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失
- 场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失
①RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失
②如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失 - 消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,但是RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了
那么如何保证消息的零丢失呢?
- 场景1中保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为
①首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑
②half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路
③如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息
④如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据
其中还有一些RocketMQ长时间没有收到生产者是要commit/rollback操作的响应,回调生产者接口的细节
在使用了RocketMQ事务将生产者的消息成功发送给RocketMQ,就可以保证在这个阶段消息不会丢失 - 在场景2中要保证消息不丢失,首先需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障。
- 在场景3中,消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
//对消息进行处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
最后
编程基础的初级开发者,计算机科学专业的学生,以及平时没怎么利用过数据结构与算法的开发人员希望复习这些概念为下次技术面试做准备。或者想学习一些计算机科学的基本概念,以优化代码,提高编程技能。这份笔记都是可以作为参考的。
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/163102.html