kafka源码解析之十三KafkaHealthcheck详解编程语言

主要是在zk的/brokers/[0...N] 路径上建立该Broker的信息,并且该节点是ZK中的Ephemeral Node,当此Broker离线的时候,zk上对应的节点也就消失了,那么其它Broker可以及时发现该Broker的异常。
class KafkaHealthcheck(private val brokerId: Int,  
                       private val advertisedHost: String,  
                       private val advertisedPort: Int, 
                       private val zkSessionTimeoutMs: Int, 
                       private val zkClient: ZkClient) extends Logging { 
  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId 
  val sessionExpireListener = new SessionExpireListener 
  def startup() { 
    zkClient.subscribeStateChanges(sessionExpireListener) 
    register() 
  } 
  def shutdown() { 
    zkClient.unsubscribeStateChanges(sessionExpireListener) 
    ZkUtils.deregisterBrokerInZk(zkClient, brokerId) 
  } 
  def register() { 
    val advertisedHostName =  
      if(advertisedHost == null || advertisedHost.trim.isEmpty)  
        InetAddress.getLocalHost.getCanonicalHostName  
      else 
        advertisedHost 
    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt 
//在/brokers/ids/路径下存储broker的基本消息,例如端口号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应目录的内容被清空 
    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) 
  } 
//该SessionExpireListener的作用就是重建broker的节点,防止短暂的和zk失去链接之后,该broker对应的节点也全部丢失了 
  class SessionExpireListener() extends IZkStateListener { 
    @throws(classOf[Exception]) 
    def handleStateChanged(state: KeeperState) { 
      // do nothing, since zkclient will do reconnect for us. 
    } 
    def handleNewSession() { 
      info("re-registering broker info in ZK for broker " + brokerId) 
      register() 
      info("done re-registering broker") 
      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) 
    } 
  } 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11813.html

(0)
上一篇 2021年7月19日 11:52
下一篇 2021年7月19日 11:52

相关推荐

发表回复

登录后才能评论