当Kakfka的topic创建之后,需要中途修改该topic的属性的时候,可以通过在/brokers/config_changes上传配置信息,触发TopicConfigManager内部的监听函数来更改topic的属性。
class TopicConfigManager(private val zkClient: ZkClient,
private val logManager: LogManager,
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
def startup() {//在/brokers/config_changes目录下注册监听函数ConfigChangeListener
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
processAllConfigChanges()
}
private def processAllConfigChanges() {
val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
import JavaConversions._
processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
}
private def processConfigChanges(notifications: Seq[String]) {
if (notifications.size > 0) {
info("Processing config change notification(s)...")
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {//标识更改的时间戳,不能利用旧的时间戳的config来覆盖新的时间戳的config
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
if(jsonOpt.isDefined) {
val json = jsonOpt.get
val topic = json.substring(1, json.length - 1) // hacky way to dequote
if (logsByTopic.contains(topic)) {
//合并zk上被更改的属性至topic里面
val props = new Properties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig.fromProps(props)
for (log <- logsByTopic(topic))
log.config = logConfig
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
purgeObsoleteNotifications(now, notifications)
}
}
lastExecutedChange = changed//更改时间戳
}
}
}
}
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
for(notification <- notifications.sorted) {
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
if(jsonOpt.isDefined) {//删除过期的通知
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
if (now - stat.getCtime > changeExpirationMs) {//删除过期的通知
debug("Purging config change notification " + notification)
ZkUtils.deletePath(zkClient, changeZnode)
} else {
return
}
}
}
}
private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
object ConfigChangeListener extends IZkChildListener {
override def handleChildChange(path: String, chillins: java.util.List[String]) {
try {
import JavaConversions._
processConfigChanges(chillins: mutable.Buffer[String])
} catch {
case e: Exception => error("Error processing config change:", e)
}
}
}
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11812.html