kafka源码解析之十二KafkaController(下篇)详解编程语言

12.6 KafkaController内部的listener

KafkaControler(leader)通过在zk的不同目录建立各种listener来达到对topic的管理和维护,其在zk的目录结构和对应的listener如下:

kafka源码解析之十二KafkaController(下篇)详解编程语言

12.6.1 brokerChangeListener

/**  * This is the zookeeper listener that triggers all the state transitions for a replica  */ class BrokerChangeListener() extends IZkChildListener with Logging {   this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "   def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {     info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))     inLock(controllerContext.controllerLock) {       if (hasStarted.get) {         ControllerStats.leaderElectionTimer.time {           try {             val curBrokerIds = currentBrokerList.map(_.toInt).toSet             val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds             val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) //筛选出newBroker             val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) //筛选出deadBrokerIds val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds             controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)             info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"               .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(","))) //添加和newBroker的通信通道             newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_)) //删除和newBroker的通信通道             deadBrokerIds.foreach(controlleContext.controllerChannelManager.removeBroker(_))             if(newBrokerIds.size > 0) //尝试将该broker上的replica切换为online状态,并且恢复删除topic的流程               controller.onBrokerStartup(newBrokerIds.toSeq)             if(deadBrokerIds.size > 0) //尝试将该broker上的replica切换为offline状态,并且标记该replica删除失败               controller.onBrokerFailure(deadBrokerIds.toSeq)           } catch {             case e: Throwable => error("Error while handling broker changes", e)           }         }       }     }   } }

12.6.2 topicChangeListener

class TopicChangeListener extends IZkChildListener with Logging { 
  this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: " 
 
  @throws(classOf[Exception]) 
  def handleChildChange(parentPath : String, children : java.util.List[String]) { 
    inLock(controllerContext.controllerLock) { 
      if (hasStarted.get) { 
        try { 
          val currentChildren = { 
            import JavaConversions._ 
            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) 
            (children: Buffer[String]).toSet 
          } 
//筛选出newTopics 
          val newTopics = currentChildren -- controllerContext.allTopics 
//筛选出deletedTopics 
          val deletedTopics = controllerContext.allTopics -- currentChildren 
          controllerContext.allTopics = currentChildren 
//获取topic的assignment分配情况 
          val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) 
//剔除deletedTopics的replicaassignment 
          controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => 
            !deletedTopics.contains(p._1.topic)) 
//增加newTopics的replicaassignment 
          controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) 
          info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, 
            deletedTopics, addedPartitionReplicaAssignment)) 
          if(newTopics.size > 0)//创建topic 
            controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) 
        } catch { 
          case e: Throwable => error("Error while handling new topic", e ) 
        } 
      } 
    } 
  }

12.6.3 deleteTopicsListener

class DeleteTopicsListener() extends IZkChildListener with Logging { 
  this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: " 
  val zkClient = controllerContext.zkClient 
  /** 
   * Invoked when a topic is being deleted 
   * @throws Exception On any error. 
   */ 
  @throws(classOf[Exception]) 
  def handleChildChange(parentPath : String, children : java.util.List[String]) { 
    inLock(controllerContext.controllerLock) { 
      var topicsToBeDeleted = { 
        import JavaConversions._ 
        (children: Buffer[String]).toSet 
      } 
      debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) 
//过滤出不存在的topic 
      val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) 
      if(nonExistentTopics.size > 0) { 
        warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) 
        nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic))) 
      } 
//剔除不存在的topic 
      topicsToBeDeleted --= nonExistentTopics 
      if(topicsToBeDeleted.size > 0) { 
        info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) 
        // mark topic ineligible for deletion if other state changes are in progress 
        topicsToBeDeleted.foreach { topic => 
          val preferredReplicaElectionInProgress = 
            controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) 
          val partitionReassignmentInProgress = 
            controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) 
//如果topic的partition的replica正在重分配或者重新选举的话,则标识该topic不能被删除 
          if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) 
            controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) 
        } 
//把topic交由deleteTopicManager处理 
        controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) 
      } 
    } 
  } 
  @throws(classOf[Exception]) 
  def handleDataDeleted(dataPath: String) { 
  } 
}

12.6.4 preferredReplicaElectionListener

class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging { 
  this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: " 
  val zkClient = controller.controllerContext.zkClient 
  val controllerContext = controller.controllerContext 
  @throws(classOf[Exception]) 
  def handleDataChange(dataPath: String, data: Object) { 
    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" 
            .format(dataPath, data.toString)) 
    inLock(controllerContext.controllerLock) { 
      val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString) 
      if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0) 
        info("These partitions are already undergoing preferred replica election: %s" 
          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(","))) 
//剔除正在PreferredReplicaElection的topic的partition 
      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection 
//筛选出topic处于删除状态的topic and partition 
      val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) 
      if(partitionsForTopicsToBeDeleted.size > 0) { 
        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted" 
          .format(partitionsForTopicsToBeDeleted)) 
      } 
//剩余的topic and partition才是真正需要PreferredReplicaElection 
      controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) 
    } 
  } 
  @throws(classOf[Exception]) 
  def handleDataDeleted(dataPath: String) { 
  } 
}

12.6.5 partitionReassignedListener

class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { 
  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " 
  val zkClient = controller.controllerContext.zkClient 
  val controllerContext = controller.controllerContext 
  @throws(classOf[Exception]) 
  def handleDataChange(dataPath: String, data: Object) { 
    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" 
      .format(dataPath, data)) 
    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) 
//剔除正在重分配的partition 
    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) { 
      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) 
    } 
//剔除partition的topic处于删除状态的 
    partitionsToBeReassigned.foreach { partitionToBeReassigned => 
      inLock(controllerContext.controllerLock) { 
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { 
          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted" 
            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic)) 
          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1) 
        } else {//开始进行真正的partition的reassigned动作 
          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) 
          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) 
        } 
      } 
    } 
  } 
  @throws(classOf[Exception]) 
  def handleDataDeleted(dataPath: String) { 
  } 
}

Partition的reassign比较复杂,因此详细叙述下,继续往下看:

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, 
                                      reassignedPartitionContext: ReassignedPartitionsContext) { 
  val newReplicas = reassignedPartitionContext.newReplicas 
  val topic = topicAndPartition.topic 
  val partition = topicAndPartition.partition 
//过滤出有效的topic and partition的replicas 
  val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) 
  try { 
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) 
    assignedReplicasOpt match { 
      case Some(assignedReplicas) => 
        if(assignedReplicas == newReplicas) {//和之前的对比,如果一致,则不需要reassign 
          throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + 
            " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) 
        } else { 
          if(aliveNewReplicas == newReplicas) {// 如果reassign的replicas全部是在线状态的话,则执行reassign动作 
            info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) 
            //针对该partition的isr路径注册watch,检测它的变化,注意该listener为ReassignedPartitionsIsrChangeListener 
            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) 
//标记该topic and partition处于reassigned状态 
            controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) 
            //标记topic为非法,防止中途被删除 
            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) 
//真正地执行reassigned动作 
            onPartitionReassignment(topicAndPartition, reassignedPartitionContext) 
          } else {//有一些reassign的replica是离线状态,因此reassign失败 
            // some replica in RAR is not alive. Fail partition reassignment 
            throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + 
              " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + 
              "Failing partition reassignment") 
          } 
        } 
//找不到该topic and partition 
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" 
        .format(topicAndPartition)) 
    } 
  } catch {//只要发生异常,则从reassignedpartitions中删除 
    case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e) 
    // remove the partition from the admin path to unblock the admin client 
    removePartitionFromReassignedPartitions(topicAndPartition) 
  } 
}

这其中最主要的流程是onPartitionReassignment内部的逻辑,如下:

/* *1.首先解释下名词: * RAR = Reassigned replicas(replicas的重分配情况) * OAR = Original list of replicas for partition(replicas的初始状态) * AR = current assigned replicas */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {   val reassignedReplicas = reassignedPartitionContext.newReplicas   areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {     case false =>//发现new replicas不在之前该partition的isr中,表明没有同步上最新数据,则首先应该让这些new replicas同步上该partition的数据       info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +         "reassigned not yet caught up with the leader")       val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet       val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet       //因此先把该partition的replicas置为 newAndOldReplicas       updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)       //向这些replicas所在的broker发送 LeaderAndIsrRequest请求       updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),         newAndOldReplicas.toSeq) //置newReplicasNotInOldReplicaList的状态为NewReplica,那么程序在这里之后是如何运行的呢? //注意在这之前,KafkaControler在/brokers/topics/[topic]/partitions/[partitionId]/state注册了ReassignedPartitionsIsrChangeListener //函数,当新增的replicas同步上这个partition的leader之后,KafkaController更新对应的isr时会进一步触发//ReassignedPartitionsIsrChangeListener,且看ReassignedPartitionsIsrChangeListener的实现       startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)       info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +         "reassigned to catch up with the leader")     case true =>//此时new replicas已经全部同步上了       //过滤出旧的replicas       val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet       //将resignedReplicas的状态置为OnlineReplica       reassignedReplicas.foreach { replica =>         replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,           replica)), OnlineReplica)       }       //按需确定新的leader,如果leader在newreplicas中,则保持不变,如果不在,则重新选举       moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)       //删除旧的replicas       stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)       //在kafkaController cache和zk上更新topicAndPartition的replicas       updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)       //更新zk上的/admin/reassign_partitions内容,删除该topicAndPartition       removePartitionFromReassignedPartitions(topicAndPartition)       info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))       controllerContext.partitionsBeingReassigned.remove(topicAndPartition)       //发送UpdateMetadataRequest给broker       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))       //恢复删除topic的流程,可能该topic的partition在重分配之后需要被删除       deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))   } }

当新的replics同步上对应partition的leader之后,会在/brokers/topics/[topic]/partitions/[partitionId]/state路径更新对应partition的状态,此时触发ReassignedPartitionsIsrChangeListener的回调函数

class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, 
                                            reassignedReplicas: Set[Int]) 
  extends IZkDataListener with Logging { 
  this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: " 
  val zkClient = controller.controllerContext.zkClient 
  val controllerContext = controller.controllerContext 
  @throws(classOf[Exception]) 
  def handleDataChange(dataPath: String, data: Object) { 
    inLock(controllerContext.controllerLock) { 
      debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) 
      val topicAndPartition = TopicAndPartition(topic, partition) 
      try { 
        controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { 
          case Some(reassignedPartitionContext) => 
            val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) 
            newLeaderAndIsrOpt match { 
              case Some(leaderAndIsr) =>  
                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet 
                if(caughtUpReplicas == reassignedReplicas) {//如果reassigned的replicas全部处于isr之中的话,说明新增的replicas已经追上了其partition的leader 
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned." 
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + 
                    "Resuming partition reassignment") 
//则再一次进入onPartitionReassignment处理流程, 
//此时areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)为true 
                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) 
                } 
                else { 
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned." 
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + 
                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) 
                } 
              case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" 
                .format(topicAndPartition, reassignedReplicas.mkString(","))) 
            } 
          case None => 
        } 
      } catch { 
        case e: Throwable => error("Error while handling partition reassignment", e) 
      } 
    } 
  } 
  @throws(classOf[Exception]) 
  def handleDataDeleted(dataPath: String) { 
  } 
}

12.6.6 AddPartitionsListener

class AddPartitionsListener(topic: String) extends IZkDataListener with Logging { 
  this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " 
  @throws(classOf[Exception]) 
  def handleDataChange(dataPath : String, data: Object) { 
    inLock(controllerContext.controllerLock) { 
      try { 
        info("Add Partition triggered " + data.toString + " for path " + dataPath) 
        val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) 
//过滤出新增的partition 
        val partitionsToBeAdded = partitionReplicaAssignment.filter(p => 
          !controllerContext.partitionReplicaAssignment.contains(p._1)) 
//如果新增的partition的topic正在删除中的话,则忽略,否则开始创建新的partition 
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic)) 
          error("Skipping adding partitions %s for topic %s since it is currently being deleted" 
                .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic)) 
        else { 
          if (partitionsToBeAdded.size > 0) { 
            info("New partitions to be added %s".format(partitionsToBeAdded)) 
            controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet) 
          } 
        } 
      } catch { 
        case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) 
      } 
    } 
  } 
  @throws(classOf[Exception]) 
  def handleDataDeleted(parentPath : String) { 
    // this is not implemented for partition change 
  } 
}

12.7 KafkaController内部rebalance流程

那什么是rebalance呢?rebalance就是当topic and partition的leader发生变化时,造成在集群内部分布不均,需要重新调整topic and partition的leader为原始状态,使负载均衡,即如下的过程:

Topic And Partition

Leader

ISR

[topic] partition 0

1

1,2,

[topic] partition 1

2

2,3

[topic] partition 2

3

3,4

[topic] partition 3

4

4,1

每个Broker都存在一个leader,则当broker 4离线了一段时间后再上线时,其topic and partition的变化如下:

Topic And Partition

Leader

ISR

[topic] partition 0

1

1,2,

[topic] partition 1

2

2,3

[topic] partition 2

3

3,4

[topic] partition 3

1

4,1

在Broker 1上出现了2个leader,即partition 0和partition 3的leader位于broker 1了。则接着broker 2离线了一段时间后再上线时,其topic and partition的变化如下:

Topic And Partition

Leader

ISR

[topic] partition 0

1

1,2,

[topic] partition 1

3

2,3

[topic] partition 2

3

3,4

[topic] partition 3

1

4,1

此时leader都集中在了broker 1和broker 3上,其它节点没有leader了,那么这个时候生成者都会把数据发生给broker 1和broker 3,造成该两个节点负载比较大,如果此时配置了auto.leader.rebalance.enable=true的话,即开了负载均衡的功能的话,topic and partition的leader会发生迁移,会尽量恢复成系统初始的状态,即如下:

Topic And Partition

Leader

ISR

[topic] partition 0

1

1,2,

[topic] partition 1

2

2,3

[topic] partition 2

3

3,4

[topic] partition 3

4

4,1

 
即定时任务checkAndTriggerPartitionRebalance
private def checkAndTriggerPartitionRebalance(): Unit = { 
  if (isActive()) { 
    trace("checking need to trigger partition rebalance") 
    // 获取所有在线的broker的replicas 
    var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null 
    inLock(controllerContext.controllerLock) { 
      preferredReplicasForTopicsByBrokers = 
        controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { 
          case(topicAndPartition, assignedReplicas) => assignedReplicas.head 
        } 
    } 
    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) 
    // for each broker, check if a preferred replica election needs to be triggered 
    preferredReplicasForTopicsByBrokers.foreach { 
      case(leaderBroker, topicAndPartitionsForBroker) => { 
        var imbalanceRatio: Double = 0 
        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null 
        inLock(controllerContext.controllerLock) { 
//过滤出leader不在PreferredReplica的head的topics 
          topicsNotInPreferredReplica = 
            topicAndPartitionsForBroker.filter { 
              case(topicPartition, replicas) => { 
                controllerContext.partitionLeadershipInfo.contains(topicPartition) && 
                //leaderAndIsr.leader != leaderBroker(目前的leader和原本的assignedReplicas的第一个broker不一样) 
                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker 
              } 
            } 
          debug("topics not in preferred replica " + topicsNotInPreferredReplica) 
          val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size 
          val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size 
//计算不平衡度 
          imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker 
          trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) 
        } 
        //如果不平衡读到达某个程度,则触发均衡 
        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { 
          topicsNotInPreferredReplica.foreach { 
            case(topicPartition, replicas) => { 
              inLock(controllerContext.controllerLock) { 
                if (controllerContext.liveBrokerIds.contains(leaderBroker) &&// leaderBroker必须是在线状态 
                    controllerContext.partitionsBeingReassigned.size == 0 &&//没有partition在进行重分配,避免加重系统负担 
                    controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&//没有partition在被重新选举leader 
                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&//该topic不需要删除 
                    controllerContext.allTopics.contains(topicPartition.topic)) {//该topic有效 
                  onPreferredReplicaElection(Set(topicPartition), true)//则触发对这个topic and partition的PreferredReplicaElection过程 
                } 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

12.8 KafkaController内部topic删除流程TopicDeletionManager

本质是就是开启DeleteTopicsThread线程,然后等待KafakController触发删除

class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {   val zkClient = controllerContext.zkClient   override def doWork() {     awaitTopicDeletionNotification()//等待KafakController触发删除 if (!isRunning.get)       return     inLock(controllerContext.controllerLock) {       val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted       if(!topicsQueuedForDeletion.isEmpty)         info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))       topicsQueuedForDeletion.foreach { topic =>       //由于是异步流程,则当topic的每个partition的replicas成功删除之后         if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {           //此时清除KafakController内部关于该topic的任何信息           completeDeleteTopic(topic)           info("Deletion of topic %s successfully completed".format(topic))         } else {//忽略topic正在删除的状态           if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {             // ignore since topic deletion is in progress             val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)             val replicaIds = replicasInDeletionStartedState.map(_.replica)             val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))             info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),               partitions.mkString(","), topic))           } else {             //删除replica出现意外,应该重试             if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {               markTopicForDeletionRetry(topic)             }           }         }         //如果topic可以删除的话,则开始删除该topic,最重要的动作就是向该topic所在的所有broker发送StopReplicaRequest, //通知各个broker停止同步并且删除对应的replica         if(isTopicEligibleForDeletion(topic)) {           info("Deletion of topic %s (re)started".format(topic))           // topic deletion will be kicked off           onTopicDeletion(Set(topic))         } else if(isTopicIneligibleForDeletion(topic)) {           info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))         }       }     }   } }

12.9 KafkaController(leader)和其它broker通信流程ControllerChannelManager

 
ControllerChannelManager保存了和各个broker通信的通道:
class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { 
  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] 
}

且看ControllerBrokerStateInfo类:
case class ControllerBrokerStateInfo(channel: BlockingChannel, 
                                     broker: Broker, 
                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], 
                                     requestSendThread: RequestSendThread)

其messageQueue存放了发往特定broker的消息,其每个消息对应一个cb回调函数,channel为和broker通信的链路,RequestSendThread为其发送线程,查看requestSendThread发送线程:
class RequestSendThread(val controllerId: Int, 
                        val controllerContext: ControllerContext, 
                        val toBroker: Broker, 
                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], 
                        val channel: BlockingChannel) 
  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { 
  private val lock = new Object() 
  private val stateChangeLogger = KafkaController.stateChangeLogger 
  connectToBroker(toBroker, channel) 
 
  override def doWork(): Unit = { 
    val queueItem = queue.take()//获取请求 
    val request = queueItem._1 
    val callback = queueItem._2 
    var receive: Receive = null 
    try { 
      lock synchronized { 
        var isSendSuccessful = false 
        while(isRunning.get() && !isSendSuccessful) { 
          // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a 
          // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. 
          try { 
            channel.send(request)//发送请求 
            receive = channel.receive()//获取响应 
            isSendSuccessful = true 
          } catch { 
            case e: Throwable => // if the send was not successful, reconnect to broker and resend the message 
              warn(("Controller %d epoch %d fails to send request %s to broker %s. " + 
                "Reconnecting to broker.").format(controllerId, controllerContext.epoch, 
                request.toString, toBroker.toString()), e) 
              channel.disconnect() 
              connectToBroker(toBroker, channel) 
              isSendSuccessful = false 
              // backoff before retrying the connection and send 
              Utils.swallow(Thread.sleep(300)) 
          } 
        } 
        var response: RequestOrResponse = null 
        request.requestId.get match {// 转化不同的response 
          case RequestKeys.LeaderAndIsrKey => 
            response = LeaderAndIsrResponse.readFrom(receive.buffer) 
          case RequestKeys.StopReplicaKey => 
            response = StopReplicaResponse.readFrom(receive.buffer) 
          case RequestKeys.UpdateMetadataKey => 
            response = UpdateMetadataResponse.readFrom(receive.buffer) 
        } 
        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" 
                                  .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) 
        //如果设置了回调函数,则触发回调 
        if(callback != null) { 
          callback(response) 
        } 
      } 
    } catch { 
      case e: Throwable => 
        error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) 
        // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. 
        channel.disconnect() 
    } 
  }

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/11814.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论