kafka源码解析之十二KafkaController(上篇)详解编程语言

KafkaController比较复杂,准备花上中下三篇来介绍,其主要的内容如下:

12.1 KafkaController成为leader的过程
12.2 kafkaController的初始化(leader)
12.3 KafkaController PartitionStateMachine
12.4 KafkaController PartitionLeaderSelector
12.4.1 NoOpLeaderSelector
12.4.2 OfflinePartitionLeaderSelector
12.4.3 ReassignedPartitionLeaderSelector
12.4.4 PreferredReplicaPartitionLeaderSelector
12.4.5 ControlledShutdownLeaderSelector
12.5 KafkaController ReplicaStateMachine
12.6 KafkaController内部的listener
12.6.1 brokerChangeListener
12.6.2 topicChangeListener
12.6.3 deleteTopicsListener
12.6.4 preferredReplicaElectionListener
12.6.5 partitionReassignedListener
12.6.6 AddPartitionsListener
12.7 KafkaController内部rebalance流程
12.8 KafkaController内部topic删除流程TopicDeletionManager
12.9 KafkaController(leader)和其它broker通信流程ControllerChannelManager

废话不说,直奔主题

KafkaController作为kafka集群的控制者,有且存在一个leader,若干个follower。只有leader才有资格send request给其它的follower,它们分别是:RequestKeys.LeaderAndIsrKey,RequestKeys.StopReplicaKey,RequestKeys.UpdateMetadataKey。

12.1 KafkaController成为leader的过程

KafkaController内部有一个ZookeeperLeaderElector,用来通过zk选举自己是否是leader
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { 
…… 
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, 
  onControllerResignation, config.brokerId) 
 
/** 
 * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker 
 * is the controller. It merely registers the session expiration listener and starts the controller leader 
 * elector 
 */ 
def startup() = { 
  inLock(controllerContext.controllerLock) { 
    info("Controller starting up"); 
    registerSessionExpirationListener()//注册一个会话超时的listener 
    isRunning = true 
    controllerElector.startup//启动controllerElector 
    info("Controller startup complete") 
  } 
} 
}
其zk选举的路径为/controller/*,并且对zk集群建立一个会话超时的listener
class SessionExpirationListener() extends IZkStateListener with Logging { 
  this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " 
  @throws(classOf[Exception]) 
  def handleStateChanged(state: KeeperState) { 
    // do nothing, since zkclient will do reconnect for us. 
  } 
  /** 
   * Called after the zookeeper session has expired and a new session has been created. You would have to re-create 
   * any ephemeral nodes here. 
   * 
   * @throws Exception 
   *             On any error. 
   */ 
  @throws(classOf[Exception]) 
  def handleNewSession() { 
    info("ZK expired; shut down all controller components and try to re-elect") 
    inLock(controllerContext.controllerLock) { 
      onControllerResignation()//当会话超时,重新连接上的时候,调用之前注册在ZookeeperLeaderElector的onControllerResignation函数 
      controllerElector.elect//重新选举 
    } 
  } 
}
因此重点关注ZookeeperLeaderElector内部的逻辑:
class ZookeeperLeaderElector(controllerContext: ControllerContext, 
                             electionPath: String, 
                             onBecomingLeader: () => Unit, 
                             onResigningAsLeader: () => Unit, 
                             brokerId: Int) 
  extends LeaderElector with Logging { 
  var leaderId = -1 
  // create the election path in ZK, if one does not exist 
  val index = electionPath.lastIndexOf("/") 
  if (index > 0) 
    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index)) 
  val leaderChangeListener = new LeaderChangeListener 
 
  def startup { 
    inLock(controllerContext.controllerLock) {//其选举路径为/controller/* 
      controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) 
      elect//触发选举 
    } 
  } 
 
  private def getControllerID(): Int = { 
    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { 
       case Some(controller) => KafkaController.parseControllerId(controller) 
       case None => -1 
    } 
  } 
     
  def elect: Boolean = { 
    val timestamp = SystemTime.milliseconds.toString 
    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) 
    
   leaderId = getControllerID  
    /*  
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,  
     * it's possible that the controller has already been elected when we get here. This check will prevent the following  
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. 
     */ 
    if(leaderId != -1) { 
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) 
       return amILeader 
    } 
 
    try {//通过zk创建Ephemeral Node的方式来进行选举,即如果存在并发情况下向zk的同一个路径创建node的话,有且只有1个客户端会创建成功,其它客户端创建失败,但是当创建成功的客户端和zk的链接断开之后,这个node也会消失,其它的客户端从而继续竞争 
      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, 
        (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], 
        controllerContext.zkSessionTimeout) 
      info(brokerId + " successfully elected as leader") 
      leaderId = brokerId 
      onBecomingLeader()//如果成功,则自己成为leader 
    } catch { 
      case e: ZkNodeExistsException => 
        // If someone else has written the path, then 
        leaderId = getControllerID  
 
        if (leaderId != -1) 
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) 
        else 
          warn("A leader has been elected but just resigned, this will result in another round of election") 
 
      case e2: Throwable => 
        error("Error while electing or becoming leader on broker %d".format(brokerId), e2) 
        resign()//发生异常,删除路径 
    } 
    amILeader 
  } 
 
  def close = { 
    leaderId = -1 
  } 
 
  def amILeader : Boolean = leaderId == brokerId 
 
  def resign() = { 
    leaderId = -1 
    deletePath(controllerContext.zkClient, electionPath) 
  } 
 
  /** 
   * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will 
   * have its own session expiration listener and handler 
   */ 
  class LeaderChangeListener extends IZkDataListener with Logging { 
    /** 
     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory 
     * @throws Exception On any error. 
     */ 
    @throws(classOf[Exception]) 
    def handleDataChange(dataPath: String, data: Object) { 
      inLock(controllerContext.controllerLock) { 
        leaderId = KafkaController.parseControllerId(data.toString) 
        info("New leader is %d".format(leaderId)) 
      } 
    } 
 
    /** 
     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader 
     * @throws Exception 
     *             On any error. 
     */ 
    @throws(classOf[Exception]) 
    def handleDataDeleted(dataPath: String) {//KafkaController在第一次启动的时候没有选举成功,然后当其发现节点已经消失的时候,会重新触发选举 
      inLock(controllerContext.controllerLock) { 
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" 
          .format(brokerId, dataPath)) 
        if(amILeader)//可能之前自己的角色是leader,则重新选举未必成为leader,则需要清除之前所有缓存的内容 
          onResigningAsLeader() 
        elect//触发选举 
      } 
    } 
  } 
}
因此KafkaController成为leader分2种情况:
1. 第一次启动的时候会主动触发elect,如果被选举成为leader,则做leader该做的事情
2. 第一次启动的时候选举失败,则通过LeaderChangeListener监控/controller/*路径,发现下面数据被删除的时候,触发handleDataDeleted,从而再次触发选举

12.2 kafkaController的初始化(leader)

从上节可以看到,KafkaController选举成功则调用onBecomingLeader,当之前的leader再次触发选举的时候调用onResigningAsLeader,以上2个函数分别对应:onControllerFailover和onControllerResignation。
onControllerResignation很简单,就是把里面所有的模块shutdown或者注销掉:
def onControllerResignation() { 
  // de-register listeners 
  deregisterReassignedPartitionsListener() 
  deregisterPreferredReplicaElectionListener() 
  // shutdown delete topic manager 
  if (deleteTopicManager != null) 
    deleteTopicManager.shutdown() 
  // shutdown leader rebalance scheduler 
  if (config.autoLeaderRebalanceEnable) 
    autoRebalanceScheduler.shutdown() 
  inLock(controllerContext.controllerLock) { 
    // de-register partition ISR listener for on-going partition reassignment task 
    deregisterReassignedPartitionsIsrChangeListeners() 
    // shutdown partition state machine 
    partitionStateMachine.shutdown() 
    // shutdown replica state machine 
    replicaStateMachine.shutdown() 
    // shutdown controller channel manager 
    if(controllerContext.controllerChannelManager != null) { 
      controllerContext.controllerChannelManager.shutdown() 
      controllerContext.controllerChannelManager = null 
    } 
    // reset controller context 
    controllerContext.epoch=0 
    controllerContext.epochZkVersion=0 
    brokerState.newState(RunningAsBroker) 
  } 
}
以上各种模块会在onControllerFailover介绍,onControllerFailover本质上就是开启里面所有的功能。
onControllerFailover的逻辑如下:
 def onControllerFailover() { 
    if(isRunning) { 
      info("Broker %d starting become controller state transition".format(config.brokerId)) 
      readControllerEpochFromZookeeper() 
//记录选举的时钟,每成功选举一次,递增1 
      incrementControllerEpoch(zkClient) 
/*leader初始化,具体内容见评注*/ 
      registerReassignedPartitionsListener() 
      registerPreferredReplicaElectionListener() 
      partitionStateMachine.registerListeners() 
      replicaStateMachine.registerListeners() 
      initializeControllerContext() 
      replicaStateMachine.startup() 
      partitionStateMachine.startup() 
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) 
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) 
      brokerState.newState(RunningAsController) 
      maybeTriggerPartitionReassignment() 
      maybeTriggerPreferredReplicaElection() 
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) 
      if (config.autoLeaderRebalanceEnable) { 
        info("starting the partition rebalance scheduler") 
        autoRebalanceScheduler.startup() 
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) 
      } 
      deleteTopicManager.start() 
    } 
    else 
      info("Controller has been shut down, aborting startup/failover") 
  }
其中步骤如下:
1) 在/admin/reassign_partitions目录注册partitionReassignedListener监听函数
2) 在/admin/preferred_replica_election目录注册preferredReplicaElectionListener监听函数
3) 在/brokers/topics目录注册topicChangeListener监听函数
4) 在/admin/delete_topics目录注册deleteTopicsListener监听函数
5) 在/brokers/ids目录注册brokerChangeListener监听函数
6) 初始化ControllerContext上下文,里面包含了topic的各种元数据信息,除此之外ControllerContext内部的ControllerChannelManager负责和kafka集群内部的其它KafkaServer建立channel来进行通信,TopicDeletionManager
负责删除topic
7)通过replicaStateMachine初始化所有的replica状态
8)通过partitionStateMachine初始化所有的partition状态
9) 在brokers/topics/***(具体的topic名字)/目录下注册AddPartitionsListener函数
10) 通过处理之前启动留下的partition重分配的情况
11) 处理之前启动留下的replica重新选举的情况
12)向其它KafkaServer发送集群topic的元数据信息已进行数据的同步更新
13)根据配置是否开启自动均衡
14)开始删除topic
KafkaControl主要通过以上各种监听函数来完成kafka集群元数据的管理,接下来先详细描述PartitionStateMachine和ReplicaStateMachine原理,因为kafka topic 的partition状态和内容主要是通过以上2个管理类来实现的,然后按照上面的流程描述不同的listener的作用。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11816.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论