kafka源码解析之十一KafkaApis详解编程语言

KafkaApis真正负责内部request的业务逻辑,kafka_2.10-0.8.2.0目前支持11种request请求,具体的request类别在Broker处理的request的来源中已经提到,即下图:

kafka源码解析之十一KafkaApis详解编程语言

接下来主要叙述下各个不同request的来源,以及request的处理流程:

class KafkaApis(val requestChannel: RequestChannel, 
                val replicaManager: ReplicaManager, 
                val offsetManager: OffsetManager, 
                val zkClient: ZkClient, 
                val brokerId: Int, 
                val config: KafkaConfig, 
                val controller: KafkaController) extends Logging { 
def handle(request: RequestChannel.Request) { 
  try{ 
    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) 
    request.requestId match { 
      case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) 
      case RequestKeys.FetchKey => handleFetchRequest(request) 
      case RequestKeys.OffsetsKey => handleOffsetRequest(request)//获取偏移量 
      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) 
      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) 
      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) 
      case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) 
      case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) 
      case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) 
      case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) 
      case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) 
      case requestId => throw new KafkaException("Unknown api code " + requestId) 
    } 
  } catch { 
    case e: Throwable => 
      request.requestObj.handleError(e, requestChannel, request) 
      error("error when handling request %s".format(request.requestObj), e) 
  } finally 
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds 
} 
}

11.1 RequestKeys.ProduceKey

来源:生成者发送消息至KAFKA集群/或者消费者提交偏移量至kafka集群,客户端根据分区函数把message发往属于该message的partition的leader的Broker。

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { 
  val (produceRequest, offsetCommitRequestOpt) = 
    if (request.requestId == RequestKeys.OffsetCommitKey) {//消费者提交偏移量至kafka集群,由参数offsets.storage决定,如果offsets.storage=kafka,则提交至kafka内部名为__consumer_offsets的topic的log 
      val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] 
      OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest) 
      (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) 
    } else {//正常的producer请求 
      (request.requestObj.asInstanceOf[ProducerRequest], None) 
    } 
  if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) { 
    warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " + 
      "be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " + 
      "and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks)) 
  } 
  val sTime = SystemTime.milliseconds 
//消息持久化 
  val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) 
  debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) 
  val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) 
  val numPartitionsInError = localProduceResults.count(_.error.isDefined) 
  if(produceRequest.requiredAcks == 0) {//acks == 0,即不需要ack,没啥需要特别做的 
    // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since 
    // no response is expected by the producer the handler will send a close connection response to the socket server 
    // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata 
    if (numPartitionsInError != 0) {//只有在错误的情况下才关闭conn使客户端感知到 
      info(("Send the close connection response due to error handling produce request " + 
        "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") 
        .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) 
      requestChannel.closeConnection(request.processor, request) 
    } else { 
      if (firstErrorCode == ErrorMapping.NoError)//更新offsetsCache 
        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) 
      if (offsetCommitRequestOpt.isDefined) {//但是如果是消费者提交偏移量至kafka集群的情况,则需要响应 
        val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) 
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
      } else//否则不需要回应任何内容,即Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) 
中的Send为null 
        requestChannel.noOperation(request.processor, request) 
    } 
  } //acks == 1,即需要立即返回response 
  else if (produceRequest.requiredAcks == 1 || 
    produceRequest.numPartitions <= 0 || 
    numPartitionsInError == produceRequest.numPartitions) { 
    if (firstErrorCode == ErrorMapping.NoError) { 
      offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) 
    } 
    val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap 
    val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) 
      .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) 
//acks=1,则需要立即响应 
    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
  } else 
 
  {//这个地方没加注释,应该是ack == -1的情况 
    // create a list of (topic, partition) pairs to use as keys for this delayed request 
    val producerRequestKeys = produceRequest.data.keys.toSeq 
    val statuses = localProduceResults.map(r => 
      r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap 
    val delayedRequest =  new DelayedProduce( 
      producerRequestKeys, 
      request, 
      produceRequest.ackTimeoutMs.toLong, 
      produceRequest, 
      statuses, 
      offsetCommitRequestOpt) 
    // add the produce request for watch if it's not satisfied, otherwise send the response back 
//检查DelayedProduce是否被满足,如果满足,则返回response,否则结束流程 
    val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) 
    if (satisfiedByMe) 
      producerRequestPurgatory.respond(delayedRequest) 
  } 
  // we do not need the data anymore 
  produceRequest.emptyData() 
}

在以上的流程中,produceRequest.requiredAcks==0和produceRequest.requiredAcks==1的情况比较好理解,那么produceRequest.requiredAcks==-1呢?当需要ack的时候,服务端什么时候给生产者返回response呢?接下来就来解释:

1)min.insync.replicas代表最小处于同步状态的副本数,当producer发送消息给对应topic的partition的leader的时候,已经获得确认的acks数目=1(即该partition的isr中的leader),那什么时候acks增加呢?
2)当该partition的isr的其它副本fetch message的时候,acks相应递增
3)该partition的leader接受到其它副本同步消息的request之后,会检查获得确认的acks数目是否大于等于requiredAcks,只要大于等于requiredAcks,则立马向客户端发送response
假设某个topic的partition 0情况如下:
Topic: test Partition: 0          Leader: 1            Replicas: 1,2,3,4         Isr: 1,2,3
其produceRequest.requiredAcks=2,则当接受到broker 1接受到ProduceKey时,则获得确认的acks数目=1,由于broker2,broker3是实时不断开启fetch线程向broker 1同步数据的,即FetchRequest,当同步到最新的message的时候,获得确认的acks数目递增1,当确保至少2,3中有一个已经同步上之后,客户端才会接受到response
更详细的流程会在生成者章节讲解	

11.2 RequestKeys. FetchKey

来源:replica fetch其对应leader的最新的message或者consumer的fetch其对应的topic的message
def handleFetchRequest(request: RequestChannel.Request) { 
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] 
  val dataRead = replicaManager.readMessageSets(fetchRequest)//从replicaManager读出数据 
  // if the fetch request comes from the follower, 
  // update its corresponding log end offset 
  if(fetchRequest.isFromFollower)//如果是follower的fetch request,更新follower的leo,还可能需要更新ISR,最好需要把ISR等相关信息写入zk 
    recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) 
  // check if this fetch request can be satisfied right away 
  val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum 
  val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) => 
    errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError)) 
  //fetch request是可以delay的,但满足如下要求时是需要立刻返回 
  // send the data immediately if 1) fetch request does not want to wait 
  //                              2) fetch request does not require any data 
  //                              3) has enough data to respond 
  //                              4) some error happens while reading data 
  if(fetchRequest.maxWait <= 0 || 
    fetchRequest.numPartitions <= 0 || 
    bytesReadable >= fetchRequest.minBytes || 
    errorReadingData) { 
    debug("Returning fetch response %s for fetch request with correlation id %d to client %s" 
      .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) 
    val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) 
    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) 
  } else { 
    //否则产生delay fetcher request,比如没新数据的时候,后续有数据时会unblock这些request 
    debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, 
      fetchRequest.clientId)) 
    // create a list of (topic, partition) pairs to use as keys for this delayed request 
    val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq 
    val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, 
      dataRead.mapValues(_.offset)) 
 
    // add the fetch request for watch if it's not satisfied, otherwise send the response back 
    val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) 
    if (satisfiedByMe) 
      fetchRequestPurgatory.respond(delayedFetch) 
  } 
}

其中DelayedFetch的原理和DelayedProduce的一样,当消费者fetch的时候如果没有新message产生,或者没有达到一次传输的最小值minBytes,都会在服务端阻塞住,直到新数据到来,消费者fetch到想fetch的数据的时候,才会给客户端返回response

11.3 RequestKeys. OffsetsKey

来源:消费者发送,获取某个topic的偏移量
def handleOffsetRequest(request: RequestChannel.Request) { 
  val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] 
  val responseMap = offsetRequest.requestInfo.map(elem => { 
    val (topicAndPartition, partitionOffsetRequestInfo) = elem 
    try { 
      // ensure leader exists 
      val localReplica = if(!offsetRequest.isFromDebuggingClient) 
        replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) 
      else 
        replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition) 
      val offsets = {//从logManager根据topicAndPartition获取偏移量,比如LatestTime或者EarliestTime 
        val allOffsets = fetchOffsets(replicaManager.logManager, 
          topicAndPartition, 
          partitionOffsetRequestInfo.time, 
          partitionOffsetRequestInfo.maxNumOffsets) 
        if (!offsetRequest.isFromOrdinaryClient) { 
          allOffsets 
        } else { 
          val hw = localReplica.highWatermark.messageOffset 
          if (allOffsets.exists(_ > hw))//过滤掉hw以后的offsets,因为那些都不是应该用户可见的 
            hw +: allOffsets.dropWhile(_ > hw) 
          else 
            allOffsets 
        } 
      } 
      (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) 
    } catch { 
      // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages 
      // are typically transient and there is no value in logging the entire stack trace for the same 
      case utpe: UnknownTopicOrPartitionException => 
        warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( 
          offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage)) 
        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) 
      case nle: NotLeaderForPartitionException => 
        warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( 
          offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) 
        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) 
      case e: Throwable => 
        warn("Error while responding to offset request", e) 
        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) 
    } 
  }) 
  val response = OffsetResponse(offsetRequest.correlationId, responseMap) 
  requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
}

11.4 RequestKeys. MetadataKey

来源:获取MetadataCache中的信息,关于topic的元数据信息,生成者,消费者,普通调用者都可以发送此条指令。
def handleTopicMetadataRequest(request: RequestChannel.Request) { 
  val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] 
  val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) 
  val brokers = metadataCache.getAliveBrokers 
  trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) 
  val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) 
  requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
}
流程简单,此处省略

11.5 RequestKeys. LeaderAndIsrKey

来源:Controller发现某个topic的partition的leader和isr发生改变时,通知相应的broker(比如说leader挂了,重新选择新的leader)
def handleLeaderAndIsrRequest(request: RequestChannel.Request) { 
  // ensureTopicExists is only for client facing requests 
  // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they 
  // stop serving data to clients for the topic being deleted 
  val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] 
  try {//根据control的指令,接受到该指令的broker可能成为某个topic的partition的leader或者follower 
    val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) 
    val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) 
    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) 
  } catch { 
    case e: KafkaStorageException => 
      fatal("Disk error during leadership change.", e) 
      Runtime.getRuntime.halt(1) 
  } 
}

11.6 RequestKeys. StopReplicaKey

来源:Controller发送 停止该broker对某个topic的partition的fetch动作,比如topic的isr发生变化(即asr被重新分配了)
def handleStopReplicaRequest(request: RequestChannel.Request) { 
  // ensureTopicExists is only for client facing requests 
  // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they 
  // stop serving data to clients for the topic being deleted 
  val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] 
  val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)//最重要的就是停止fetch线程 
  val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) 
  requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) 
  replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() 
}

11.7 RequestKeys. UpdateMetadataKey

来源:Controlle发现topic的元数据信息发生变化时,通知相应的Broker
def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { 
  replicaStateChangeLock synchronized { 
    if(updateMetadataRequest.controllerEpoch < controllerEpoch) {//更新的时钟必须是最新的,否则无法进行更新 
      val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + 
        "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId, 
        updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, 
        controllerEpoch) 
      stateChangeLogger.warn(stateControllerEpochErrorMessage) 
      throw new ControllerMovedException(stateControllerEpochErrorMessage) 
    } else {//主要是更新MetadataCache中的内容 
      metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger) 
      controllerEpoch = updateMetadataRequest.controllerEpoch 
    } 
  } 
}

11.8 RequestKeys. ControlledShutdownKey

来源:Broker关机时向Controller(leader)发送关机的指令
def handleControlledShutdownRequest(request: RequestChannel.Request) { 
  // ensureTopicExists is only for client facing requests 
  // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they 
  // stop serving data to clients for the topic being deleted 
  val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] 
//如果关机的broker是某个topic的partition的leader,则需要重新选举 
//如果关机的broker不是某个topic的partition的leader,则保持不变,并下要关机的broker发送stopreplic的request(已经要关机了,其实发不发送无所谓) 
//除此之外更新一些状态信息 
  val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) 
  val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, 
    ErrorMapping.NoError, partitionsRemaining) 
  requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) 
}

11.9 RequestKeys. OffsetCommitKey

来源:消费者提交偏移量至KAFKA,内部又根据配置提交至ZK或者LOG
def handleOffsetCommitRequest(request: RequestChannel.Request) { 
  val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] 
  if (offsetCommitRequest.versionId == 0) { 
    // version 0 stores the offsets in ZK 
    //保存至zk 
    val responseInfo = offsetCommitRequest.requestInfo.map{ 
      case (topicAndPartition, metaAndError) => { 
        val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) 
        try { 
          ensureTopicExists(topicAndPartition.topic) 
          if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { 
            (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) 
          } else { 
            ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + 
              topicAndPartition.partition, metaAndError.offset.toString) 
            (topicAndPartition, ErrorMapping.NoError) 
          } 
        } catch { 
          case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) 
        } 
      } 
    } 
    val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId) 
    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
  } else {//当作producer请求,保存在topic为 __consumer_offsets的log中 
    // version 1 and above store the offsets in a special Kafka topic 
    handleProducerOrOffsetCommitRequest(request) 
  } 
}

11.10 RequestKeys. OffsetFetchKey

来源:消费者发送,获取自己提交到KAFKA上的偏移量,和OffsetCommitKey相对应
def handleOffsetFetchRequest(request: RequestChannel.Request) { 
  val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] 
  if (offsetFetchRequest.versionId == 0) {//配置在zk上,则从zk上获取 
    // version 0 reads offsets from ZK 
    val responseInfo = offsetFetchRequest.requestInfo.map( t => { 
      val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) 
      try { 
        ensureTopicExists(t.topic) 
        val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1 
        payloadOpt match { 
          case Some(payload) => { 
            (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError)) 
          } 
          case None => (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, 
            ErrorMapping.UnknownTopicOrPartitionCode)) 
        } 
      } catch { 
        case e: Throwable => 
          (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, 
            ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) 
      } 
    }) 
    val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) 
    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
  } else {//否则从kafka内部的log中获取 
    // version 1 reads offsets from Kafka 
    val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => 
      metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty 
    ) 
    val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap 
    val knownStatus = 
      if (knownTopicPartitions.size > 0) 
        offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap 
      else 
        Map.empty[TopicAndPartition, OffsetMetadataAndError] 
    val status = unknownStatus ++ knownStatus 
 
    val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) 
 
    trace("Sending offset fetch response %s for correlation id %d to client %s." 
      .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) 
    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
  } 
}

11.11 RequestKeys. ConsumerMetadataKey

来源:一般情况下消费者发送,获取kafka中topic为__consumer_offsets的元数据信息 

def handleConsumerMetadataRequest(request: RequestChannel.Request) { 
  val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] 
  val partition = offsetManager.partitionFor(consumerMetadataRequest.group) 
  // get metadata (and create the topic if necessary) 
  val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head 
 
  val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) 
  val response = 
    offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => 
      partitionMetadata.leader.map { leader => 
        ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId) 
      }.getOrElse(errorResponse) 
    }.getOrElse(errorResponse) 
 
  trace("Sending consumer metadata %s for correlation id %d to client %s." 
    .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId)) 
  requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11817.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论