52.源代码解读-RocketMQ消息写入机制

一. 前言

RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程

二. 代码流程

在Broker启动的时候会拉起相关服务
流程如下:

52.源代码解读-RocketMQ消息写入机制

流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000

三. 代码流程

由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。

Broker启动流程

以发送消息为例

1. Broker启动注册发送消息处理器

Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

2. 消息处理器处理发送者发送过来的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand proce***equest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
                ...
        switch (request.getCode()) {
            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        }
    }
}       

继续看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        ...
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}               

调用MessageStore.putMessage(msgInner)

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/194921.html

(0)
上一篇 2021年11月16日 08:59
下一篇 2021年11月16日 09:01

相关推荐

发表回复

登录后才能评论