这篇文章主要介绍RocketMQ中push consumer启动之触发消息拉取的示例代码,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
一、consumer端数据存储结构
push consumer启动方法DefaultMQPushConsumerImpl.start最后一步会触发MQClientInstance.rebalanceImmediately,该调用最终会进入到RebalanceImpl.doRebalance中,它会根据topic当前的实际consumer数量(从nameserver获取)通过负载均衡原则来决定自己所要订阅的message queue。然后在本地创建对应的消息缓存队列(ProcessQueue),并触发消息拉取操作。
RebalanceImpl是整个consumer的核心,它即保存本消费者订阅的topic信息,又缓存了topic中的message数据。RebalanceImpl相关的几个核心类如下:
-
MessageQueue代表的是远端broker上一个topic下的某个message queue
-
ProcessQueue是对远端message queue的一个本地缓存,拉取下来的消息都存在一个TreeMap中,其中key是commitlog中的offset
-
RebalanceImpl中保存了三种关系:message queue和process queue的映射关系;topic和message queue的映射关系;topic的订阅关系
二、订阅端的负载均衡策略
doRebalance方法会调用rebalanceByTopic来决定本消费者具体要订阅一个topic下的哪些message queue,以达到负载均衡的效果。
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { // 广播模式,订阅所有message queue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { // 集群模式,获取该topic下所有的message queue + 该topic所有的consumer Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { // 通过负载均衡策略计算出当前消费者所需订阅的message queue子集 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 更新Process Queue缓存列表 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
三、创建本地缓存并启动消息拉取
rebalanceByTopic中通过负载均衡策略计算出当前消费者对于一个topic实际订阅的message queue子集之后,就会在updateProcessQueueTableInBalance方法中创建ProcessQueue,并启动消息拉取。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { // 本地Process Queue存在,但不再订阅,则废弃改process queue pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { // process queue过期,也废弃,等待新建 switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { // 新订阅,本地不存在对应的process queue,则新建 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); // 初始化首次拉取请求 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 批量触发首次拉取请求 this.dispatchPullRequest(pullRequestList); return changed; }
四、整体流程
消息拉取的初始化过程如下图:
以上是“RocketMQ中push consumer启动之触发消息拉取的示例代码”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/222855.html