kafka源码解析之十四TopicConfigManager详解编程语言

当Kakfka的topic创建之后,需要中途修改该topic的属性的时候,可以通过在/brokers/config_changes上传配置信息,触发TopicConfigManager内部的监听函数来更改topic的属性。

class TopicConfigManager(private val zkClient: ZkClient, 
                         private val logManager: LogManager, 
                         private val changeExpirationMs: Long = 15*60*1000, 
                         private val time: Time = SystemTime) extends Logging { 
  private var lastExecutedChange = -1L 
  def startup() {//在/brokers/config_changes目录下注册监听函数ConfigChangeListener 
    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) 
    zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) 
    processAllConfigChanges() 
  } 
  private def processAllConfigChanges() { 
    val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) 
    import JavaConversions._ 
    processConfigChanges((configChanges: mutable.Buffer[String]).sorted) 
  } 
  private def processConfigChanges(notifications: Seq[String]) { 
    if (notifications.size > 0) { 
      info("Processing config change notification(s)...") 
      val now = time.milliseconds 
      val logs = logManager.logsByTopicPartition.toBuffer 
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) 
      for (notification <- notifications) { 
        val changeId = changeNumber(notification) 
        if (changeId > lastExecutedChange) {//标识更改的时间戳,不能利用旧的时间戳的config来覆盖新的时间戳的config 
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification 
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) 
          if(jsonOpt.isDefined) { 
            val json = jsonOpt.get 
            val topic = json.substring(1, json.length - 1) // hacky way to dequote 
            if (logsByTopic.contains(topic)) { 
              //合并zk上被更改的属性至topic里面 
              val props = new Properties(logManager.defaultConfig.toProps) 
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) 
              val logConfig = LogConfig.fromProps(props) 
              for (log <- logsByTopic(topic)) 
                log.config = logConfig 
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) 
              purgeObsoleteNotifications(now, notifications) 
            } 
          } 
          lastExecutedChange = changed//更改时间戳 
        } 
      } 
    } 
  } 
  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { 
    for(notification <- notifications.sorted) { 
      val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) 
      if(jsonOpt.isDefined) {//删除过期的通知 
        val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification 
        if (now - stat.getCtime > changeExpirationMs) {//删除过期的通知 
          debug("Purging config change notification " + notification) 
          ZkUtils.deletePath(zkClient, changeZnode) 
        } else { 
          return 
        } 
      } 
    } 
  } 
  private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong 
  object ConfigChangeListener extends IZkChildListener { 
    override def handleChildChange(path: String, chillins: java.util.List[String]) { 
      try { 
        import JavaConversions._ 
        processConfigChanges(chillins: mutable.Buffer[String]) 
      } catch { 
        case e: Exception => error("Error processing config change:", e) 
      } 
    } 
  } 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11812.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论