RocketMQ中broker消息存储之如何实现拉取消息

这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

在consumer拉取消息时,broker首先会根据待拉取的topic+queueId得到对应的ConsumeQueue,再根据消费offset从ConsumeQueue相应的偏移位置中获取该消息在commitlog里真实的offset/msgsize/tagscode信息,最后再从commitlog查出消息体。

消息拉取在broker存储层的调用入口为DefaultMessageStore.getMessage方法。核心逻辑如下:

    // DefaultMessageStore.java
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        // ...
        // 1. 定位ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();
            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                // 2. 从ConsumeQueue中读取消费偏移offset处的内容
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;
                        int i = 0;
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 单个请求最大拉取数据量
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();  // commitlog offset 8bytes
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes
                            // ...
                            // 3. 通过tagscode快速过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                continue;
                            }
                            // 4. 从commitlog获取消息体
                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }
                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }
                            // 5. 通过消息体过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                // release...
                                selectResult.release();
                                continue;
                            }
                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                            // 6.添加到返回结果
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }
                        // ...
                    } finally {
                        bufferConsumeQueue.release();
                    }
                } else {
                    // ...
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }
        // ...
        return getResult;
    }

ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单:

    // ConsumeQueue.java    
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度
        if (offset >= this.getMinLogicOffset()) {
            // 定位到所属的MappedFile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 从MappedFile中读取实际的数据
                return result;
            }
        }
        return null;
    }

通过ConsumeQueue获取消息在commitlog中的偏移量以及消息大小之后,获取消息体的方法如下

    // CommitLog.java
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // 定位消息所在的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size); // 从MappedFile中获取消息体
        }
        return null;
    }

消息拉取整体流程如下

RocketMQ中broker消息存储之如何实现拉取消息

感谢各位的阅读!关于“RocketMQ中broker消息存储之如何实现拉取消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/222860.html

(0)
上一篇 2022年1月6日
下一篇 2022年1月6日

相关推荐

发表回复

登录后才能评论