kafka源码解析之十OffsetManager详解编程语言

OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:1)zookeeper,即把偏移量提交至zk上;2)kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定,默认为zookeeper。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为“__consumer_offsets”的log里面。

查看OffsetManager类:

class OffsetManager(val config: OffsetManagerConfig, 
                    replicaManager: ReplicaManager, 
                    zkClient: ZkClient, 
                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup { 
 
  /* offsets and metadata cache */ 
//通过offsetsCache提供对GroupTopicPartition的查询 
  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] 
  //把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘 
  scheduler.schedule(name = "offsets-cache-compactor", 
                     fun = compact, 
                     period = config.offsetsRetentionCheckIntervalMs, 
                     unit = TimeUnit.MILLISECONDS) 
…… 
}

主要完成2件事情:
           1)提供对topic偏移量的查询
           2)将偏移量消息刷入到以__consumer_offsets命名的topic的log中

10.1 offsetsCache的更新机制

那么offsetsCache是如何生成的呢?是通过producer端发送消息给leader,然后leader不断更新此偏移量。Leader更新此偏移量分3种情况:
1)当produceRequest.requiredAcks == 0时,即不需要ack,则立刻调用putOffsets更新偏移量
2)当produceRequest.requiredAcks == 1时,即需要立即返回response时,则立刻调用putOffsets更新偏移量
3)当produceRequest.requiredAcks == -1时,即只有此批消息达到最小副本数的时候,通过ProducerRequestPurgatory触发调用putOffsets更新偏移量 (ProducerRequestPurgatory之后的章节会讲)

kafka源码解析之十OffsetManager详解编程语言

10.2 compact机制

那么compact是如何工作的呢?

//去除offsetsCache过时的OffsetAndMetadata,并把偏移量刷入磁盘 private def compact() {   debug("Compacting offsets cache.")   val startMs = SystemTime.milliseconds //过滤出长时间没有被更新的offset   val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)   debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))   // delete the stale offsets from the table and generate tombstone messages to remove them from the log   val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>     val offsetsPartition = partitionFor(groupTopicAndPartition.group)     trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))     offsetsCache.remove(groupTopicAndPartition)     val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,       groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)     (offsetsPartition, new Message(bytes = null, key = commitKey))   }.groupBy{ case (partition, tombstone) => partition }   // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,   // if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.   val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>     val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)     partitionOpt.map { partition =>       val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)       val messages = tombstones.map(_._2).toSeq       trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))       try { //把偏移量刷入磁盘,供kafka重启的时候读取,即loadOffsetsFromLog         partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))         tombstones.size       }       catch {         case t: Throwable =>           error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)           // ignore and continue           0       }     }   }.sum   debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) }

其实就是把不再有消息发送的topic的偏移量刷入到磁盘,并且leader在重启的时候可以调用loadOffsetsFromLog从磁盘加载偏移量。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11818.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论