kafka源码解析之十二KafkaController(中篇)详解编程语言

12.3 KafkaController PartitionStateMachine

它实现了topic的分区状态切换功能,Partition存在的状态如下:

状态名

状态存在的时间

有效的前置状态

NonExistentPartition

1.partition重来没有被创建

2.partition创建之后被删除

OfflinePartition

 

NewPartition

1.partition创建之后,被分配了replicas,但是还没有leader/isr

NonExistentPartition
OnlinePartition

1.partition在replicas中选举某个成为leader之后

NewPartition/OfflinePartition
OfflinePartition

1.partition的replicas中的leader下线之后,没有重新选举新的leader之前

2.partition创建之后直接被下线

NewPartition/OnlinePartition

 Partition状态切换的过程如下:

状态切换

切换的时机

NonExistentPartition -> NewPartition

1.从zk上加载assigned replicas置kafkaControl内部的缓存中

 

NewPartition-> OnlinePartition

1.分配第一个live replica作为leader,其它libe replicas作为isr,并把信息写入到zk

OnlinePartition,OfflinePartition -> OnlinePartition

1.为partition重新选举新的leader和isr,并把信息写入到zk

 

NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition

1.仅仅是在kafkaControl中标记该状态为OfflinePartition

OfflinePartition -> NonExistentPartition

 

1.仅仅是在kafkaControl中标记该状态为NonExistentPartition


因此重点关注PartitionStateMachine的handleStateChange函数
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, 
                              leaderSelector: PartitionLeaderSelector, 
                              callbacks: Callbacks) { 
  val topicAndPartition = TopicAndPartition(topic, partition) 
  if (!hasStarted.get) 
    throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + 
                                          "the partition state machine has not started") 
                                            .format(controllerId, controller.epoch, topicAndPartition, targetState)) 
  val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) 
  try { 
    targetState match { 
      case NewPartition => 
//检查前置状态 
        assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) 
        //更新controllerContext中的partitionReplicaAssignment 
assignReplicasToPartitions(topic, partition) 
//修改partition的状态 
        partitionState.put(topicAndPartition, NewPartition) 
        val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") 
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" 
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, 
                                          assignedReplicas)) 
      case OnlinePartition => 
//检查前置状态 
        assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) 
        partitionState(topicAndPartition) match { 
          case NewPartition =>// NewPartition-> OnlinePartition 
            /* 1.根据partitionReplicaAssignment中信息选择第一个live的replica为leader,其余为isr 
     *2.将leader和isr持久化到zk 
             *3.更新controllerContext中的partitionLeadershipInfo 
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 
*/ 
            initializeLeaderAndIsrForPartition(topicAndPartition) 
          case OfflinePartition =>// OfflinePartition-> OnlinePartition 
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是OfflinePartitionLeaderSelector 
     *2.将leader和isr持久化到zk 
     *3.更新controllerContext中的partitionLeadershipInfo 
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 
*/ 
            electLeaderForPartition(topic, partition, leaderSelector) 
          case OnlinePartition =>// OnlinePartition -> OnlinePartition 
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是ReassignedPartitionLeaderSelector 
     *2.将leader和isr持久化到zk 
     *3.更新controllerContext中的partitionLeadershipInfo 
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 
*/ 
            electLeaderForPartition(topic, partition, leaderSelector) 
          case _ => // should never come here since illegal previous states are checked above 
        } 
//更新partition的状态 
        partitionState.put(topicAndPartition, OnlinePartition) 
        val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader 
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d" 
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) 
      case OfflinePartition => 
        //检查前置状态 
        assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) 
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" 
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) 
//更新partition的状态 
        partitionState.put(topicAndPartition, OfflinePartition) 
      case NonExistentPartition => 
        //检查前置状态 
        assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) 
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" 
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) 
//更新partition的状态 
        partitionState.put(topicAndPartition, NonExistentPartition) 
        // post: partition state is deleted from all brokers and zookeeper 
    } 
  } catch { 
    case t: Throwable => 
      stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" 
        .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) 
  } 
}

12.4 KafkaController PartitionLeaderSelector

当partition的状态发生切换时,特别发生如下切换:OfflinePartition-> OnlinePartition和OnlinePartition -> OnlinePartition时需要调用不同的PartitionLeaderSelector来确定leader和isr,当前一共支持5种PartitionLeaderSelector,分别为:NoOpLeaderSelector,OfflinePartitionLeaderSelector,ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector。

12.4.1 NoOpLeaderSelector

/** 
 * Essentially does nothing. Returns the current leader and ISR, and the current 
 * set of replicas assigned to a given topic/partition. 
 */ 
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { 
 
  this.logIdent = "[NoOpLeaderSelector]: " 
 
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 
    warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") 
    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) 
  } 
}
基本上啥也没做,就是把currentLeaderAndIsr和set of replicas assigned to a given topic/partition

12.4.2 OfflinePartitionLeaderSelector

class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) 
  extends PartitionLeaderSelector with Logging { 
  this.logIdent = "[OfflinePartitionLeaderSelector]: " 
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { 
      case Some(assignedReplicas) => 
        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) 
        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) 
        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 
        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 
        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { 
          case true =>//isr中的broker都离线了,则需要从asr中选择leader 
            if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, 
              topicAndPartition.topic)).uncleanLeaderElectionEnable) { 
              throw new NoReplicaOnlineException(("No broker in ISR for partition " + 
                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + 
                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) 
            } 
            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" 
              .format(topicAndPartition, liveAssignedReplicas.mkString(","))) 
            liveAssignedReplicas.isEmpty match { 
              case true =>//如果asr中的broker也都已经离线了,则这个topic/partition挂了 
                throw new NoReplicaOnlineException(("No replica for partition " + 
                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + 
                  " Assigned replicas are: [%s]".format(assignedReplicas)) 
              case false =>//如果asr中的broker有一些是在线的 
                ControllerStats.uncleanLeaderElectionRate.mark() 
                val newLeader = liveAssignedReplicas.head//取第一个为leader 
                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." 
                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) 
                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) 
            } 
          case false =>//isr中的broker有一些是在线的 
            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) 
            val newLeader = liveReplicasInIsr.head//选择第一个live的replica 
            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." 
                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) 
            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) 
        } 
        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) 
        (newLeaderAndIsr, liveAssignedReplicas) 
      case None => 
        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) 
    } 
  } 
}

12.4.3 ReassignedPartitionLeaderSelector

class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { 
  this.logIdent = "[ReassignedPartitionLeaderSelector]: " 
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 
//patition被重新分配的replicas 
    val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas 
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 
//在reassignedInSyncReplicas中筛选replica其所在的broker是live的和当前的replica是位于isr中的 
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && 
                                                                             currentLeaderAndIsr.isr.contains(r)) 
    val newLeaderOpt = aliveReassignedInSyncReplicas.headOption 
    newLeaderOpt match {//存在满足以上条件的replica,则筛选为leader 
      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, 
        currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) 
      case None =>//否则reassigned失败 
        reassignedInSyncReplicas.size match { 
          case 0 => 
            throw new NoReplicaOnlineException("List of reassigned replicas for partition " + 
              " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 
          case _ => 
            throw new NoReplicaOnlineException("None of the reassigned replicas for partition " + 
              "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 
        } 
    } 
  } 
}

12.4.4 PreferredReplicaPartitionLeaderSelector

class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector 
with Logging { 
  this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " 
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 
//默认选举第一个replica作为leader 
    val preferredReplica = assignedReplicas.head 
    // check if preferred replica is the current leader 
    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader 
    if (currentLeader == preferredReplica) {//如果已经实现,则退出 
      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" 
                                                   .format(preferredReplica, topicAndPartition)) 
    } else { 
      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + 
        " Trigerring preferred replica leader election") 
      // 检查这个replica是否位于isr和其所在的broker是否live,如果是的话,则其恢复成leader,此场景主要用于负载均衡的情况 
  if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { 
        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, 
          currentLeaderAndIsr.zkVersion + 1), assignedReplicas) 
      } else { 
        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + 
          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 
      } 
    } 
  } 
}

12.4.5 ControlledShutdownLeaderSelector

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) 
        extends PartitionLeaderSelector 
        with Logging { 
  this.logIdent = "[ControlledShutdownLeaderSelector]: " 
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 
    val currentLeader = currentLeaderAndIsr.leader 
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 
    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds 
//筛选出live状态的replica 
    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) 
//筛选出live状态的isr 
    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) 
    val newLeaderOpt = newIsr.headOption 
    newLeaderOpt match { 
      case Some(newLeader) =>//如果存在newLeader,选择其作为leader 
        debug("Partition %s : current leader = %d, new leader = %d" 
              .format(topicAndPartition, currentLeader, newLeader)) 
        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), 
         liveAssignedReplicas) 
      case None => 
        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + 
          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) 
    } 
  } 
}

12.5 KafkaController ReplicaStateMachine

它实现了topic的partition的replica状态切换功能,replica存在的状态如下:

状态名

状态存在的时间

有效的前置状态

NewReplica

1.replica被分配的时候,此时该replica还没有工作,其角色只能是follower

NonExistentReplica

 

OnlineReplica

1.replica开始工作,可能作为leader或者follower

NewReplica/OnlineReplica/ OfflineReplica
OfflineReplica

1.该replica挂了,比如说该replica所在的broker离线了

NewReplica, OnlineReplica
ReplicaDeletionStarted

1.开始删除该replica的时候

OfflineReplica
ReplicaDeletionSuccessful

1.replica成功响应删除该副本的请求的时候 ,此时kafkaControl内存中还保留此replica的信息

ReplicaDeletionStarted
ReplicaDeletionIneligible

1.如果该replica删除失败

ReplicaDeletionStarted
NonExistentReplica

1. replica信息被从KafkaControl内存中删除的时候

ReplicaDeletionSuccessful

replica状态切换的过程如下:

状态切换 切换的时机
NonExistentReplica-> NewReplica 1.KafkaControl 发送LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica -> OnlineReplica 1.当KafkaControl按需把new replica加入到asr中的时候,实际上NewReplica转化为OnlineReplica是一个很快的过程,中间存在的时间很短,其转化出现在onNewPartitionCreation
OnlineReplica,OfflineReplica-> OnlineReplica 1. KafkaControl 发送 LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica 1. kafkaControl发送StopReplicaRequest to the replica (w/o deletion)
2.kafkaControl 清除 this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
OfflineReplica->ReplicaDeletionStarted 1.kafkaControl发送StopReplicaRequest to the replica
ReplicaDeletionStarted->ReplicaDeletionSuccessful 1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionStarted->ReplicaDeletionIneligible 1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionSuccessful-> NonExistentReplica 1.kafkaControl remove the replica from the in memory partition replica assignment cache

因此重点关注ReplicaStateMachine的handleStateChange函数

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, 
callbacks: Callbacks) { 
val topic = partitionAndReplica.topic 
val partition = partitionAndReplica.partition 
val replicaId = partitionAndReplica.replica 
val topicAndPartition = TopicAndPartition(topic, partition) 
if (!hasStarted.get) 
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + 
"to %s failed because replica state machine has not started") 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) 
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) 
try { 
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) 
targetState match { 
case NewReplica =>//当客户端刚创建topic的时候,触发KafkaControl内部的回调onNewPartitionCreation 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) 
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) 
leaderIsrAndControllerEpochOpt match { 
case Some(leaderIsrAndControllerEpoch) => 
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) //NewReplica不可能是该Partition的leader,只有online状态才有leader 
throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" 
.format(replicaId, topicAndPartition) + "state as it is being requested to become leader") 
//封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), 
topic, partition, leaderIsrAndControllerEpoch, 
replicaAssignment) 
case None => // new leader request will be sent to this replica when one gets elected 
} 
//置状态为NewReplica 
replicaState.put(partitionAndReplica, NewReplica) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 
targetState)) 
case ReplicaDeletionStarted => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) 
//置状态为ReplicaDeletionStarted 
replicaState.put(partitionAndReplica, ReplicaDeletionStarted) 
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理,并且在收到reponse的时候回调TopicDeletionManager中的deleteTopicStopReplicaCallback,将那些成功删除的replica状态切换为ReplicaDeletionSuccessful,将那些删除失败的replica状态切换为ReplicaDeletionIneligible 
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, 
callbacks.stopReplicaResponseCallback) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
case ReplicaDeletionIneligible => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 
//置状态为ReplicaDeletionIneligible 
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
case ReplicaDeletionSuccessful => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 
//置状态为ReplicaDeletionIneligible 
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
case NonExistentReplica => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) 
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 
//更新partition的分布请求 
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) 
//删除该replica的状态 
replicaState.remove(partitionAndReplica) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
case OnlineReplica => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, 
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 
replicaState(partitionAndReplica) match { 
case NewReplica =>//基本上啥也没做 
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 
if(!currentAssignedReplicas.contains(replicaId))//按需添加replica 
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 
targetState)) 
case _ =>//可能之前已经存在,则向其发送leader和isr的request 
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 
case Some(leaderIsrAndControllerEpoch) => 
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, 
replicaAssignment) 
//置状态为OnlineReplica,感觉有点多余 
replicaState.put(partitionAndReplica, OnlineReplica) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
case None =>  
} 
} 
//置状态为OnlineReplica 
replicaState.put(partitionAndReplica, OnlineReplica) 
case OfflineReplica => 
//判断前置状态 
assertValidPreviousStates(partitionAndReplica, 
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理 
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) 
val leaderAndIsrIsEmpty: Boolean = 
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 
case Some(currLeaderIsrAndControllerEpoch) => 
//删除该replica 
controller.removeReplicaFromIsr(topic, partition, replicaId) match { 
case Some(updatedLeaderIsrAndControllerEpoch) => 
//此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica 
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 
if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { 
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), 
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) 
} 
//置状态为OfflineReplica 
replicaState.put(partitionAndReplica, OfflineReplica) 
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 
false 
case None => 
true 
} 
case None => 
true 
} 
if (leaderAndIsrIsEmpty)//不能没有leader 
throw new StateChangeFailedException( 
"Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" 
.format(replicaId, topicAndPartition)) 
} 
} 
catch { 
case t: Throwable => 
stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" 
.format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) 
} 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11815.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论