KafkaApis真正负责内部request的业务逻辑,kafka_2.10-0.8.2.0目前支持11种request请求,具体的request类别在Broker处理的request的来源中已经提到,即下图:
接下来主要叙述下各个不同request的来源,以及request的处理流程:
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val offsetManager: OffsetManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)//获取偏移量
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}
11.1 RequestKeys.ProduceKey
来源:生成者发送消息至KAFKA集群/或者消费者提交偏移量至kafka集群,客户端根据分区函数把message发往属于该message的partition的leader的Broker。
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
val (produceRequest, offsetCommitRequestOpt) =
if (request.requestId == RequestKeys.OffsetCommitKey) {//消费者提交偏移量至kafka集群,由参数offsets.storage决定,如果offsets.storage=kafka,则提交至kafka内部名为__consumer_offsets的topic的log
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
(producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
} else {//正常的producer请求
(request.requestObj.asInstanceOf[ProducerRequest], None)
}
if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) {
warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " +
"be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " +
"and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks))
}
val sTime = SystemTime.milliseconds
//消息持久化
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
if(produceRequest.requiredAcks == 0) {//acks == 0,即不需要ack,没啥需要特别做的
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
// to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
if (numPartitionsInError != 0) {//只有在错误的情况下才关闭conn使客户端感知到
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)
} else {
if (firstErrorCode == ErrorMapping.NoError)//更新offsetsCache
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
if (offsetCommitRequestOpt.isDefined) {//但是如果是消费者提交偏移量至kafka集群的情况,则需要响应
val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else//否则不需要回应任何内容,即Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction)
中的Send为null
requestChannel.noOperation(request.processor, request)
}
} //acks == 1,即需要立即返回response
else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
if (firstErrorCode == ErrorMapping.NoError) {
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
//acks=1,则需要立即响应
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else
{//这个地方没加注释,应该是ack == -1的情况
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
produceRequest.ackTimeoutMs.toLong,
produceRequest,
statuses,
offsetCommitRequestOpt)
// add the produce request for watch if it's not satisfied, otherwise send the response back
//检查DelayedProduce是否被满足,如果满足,则返回response,否则结束流程
val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
if (satisfiedByMe)
producerRequestPurgatory.respond(delayedRequest)
}
// we do not need the data anymore
produceRequest.emptyData()
}
在以上的流程中,produceRequest.requiredAcks==0和produceRequest.requiredAcks==1的情况比较好理解,那么produceRequest.requiredAcks==-1呢?当需要ack的时候,服务端什么时候给生产者返回response呢?接下来就来解释:
1)min.insync.replicas代表最小处于同步状态的副本数,当producer发送消息给对应topic的partition的leader的时候,已经获得确认的acks数目=1(即该partition的isr中的leader),那什么时候acks增加呢?
2)当该partition的isr的其它副本fetch message的时候,acks相应递增
3)该partition的leader接受到其它副本同步消息的request之后,会检查获得确认的acks数目是否大于等于requiredAcks,只要大于等于requiredAcks,则立马向客户端发送response
假设某个topic的partition 0情况如下:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3,4 Isr: 1,2,3
其produceRequest.requiredAcks=2,则当接受到broker 1接受到ProduceKey时,则获得确认的acks数目=1,由于broker2,broker3是实时不断开启fetch线程向broker 1同步数据的,即FetchRequest,当同步到最新的message的时候,获得确认的acks数目递增1,当确保至少2,3中有一个已经同步上之后,客户端才会接受到response
更详细的流程会在生成者章节讲解
11.2 RequestKeys. FetchKey
来源:replica fetch其对应leader的最新的message或者consumer的fetch其对应的topic的message
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val dataRead = replicaManager.readMessageSets(fetchRequest)//从replicaManager读出数据
// if the fetch request comes from the follower,
// update its corresponding log end offset
if(fetchRequest.isFromFollower)//如果是follower的fetch request,更新follower的leo,还可能需要更新ISR,最好需要把ISR等相关信息写入zk
recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
// check if this fetch request can be satisfied right away
val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
//fetch request是可以delay的,但满足如下要求时是需要立刻返回
// send the data immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
if(fetchRequest.maxWait <= 0 ||
fetchRequest.numPartitions <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
errorReadingData) {
debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
.format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
//否则产生delay fetcher request,比如没新数据的时候,后续有数据时会unblock这些request
debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
fetchRequest.clientId))
// create a list of (topic, partition) pairs to use as keys for this delayed request
val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq
val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
dataRead.mapValues(_.offset))
// add the fetch request for watch if it's not satisfied, otherwise send the response back
val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
if (satisfiedByMe)
fetchRequestPurgatory.respond(delayedFetch)
}
}
其中DelayedFetch的原理和DelayedProduce的一样,当消费者fetch的时候如果没有新message产生,或者没有达到一次传输的最小值minBytes,都会在服务端阻塞住,直到新数据到来,消费者fetch到想fetch的数据的时候,才会给客户端返回response
11.3 RequestKeys. OffsetsKey
来源:消费者发送,获取某个topic的偏移量
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
val responseMap = offsetRequest.requestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
try {
// ensure leader exists
val localReplica = if(!offsetRequest.isFromDebuggingClient)
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
else
replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
val offsets = {//从logManager根据topicAndPartition获取偏移量,比如LatestTime或者EarliestTime
val allOffsets = fetchOffsets(replicaManager.logManager,
topicAndPartition,
partitionOffsetRequestInfo.time,
partitionOffsetRequestInfo.maxNumOffsets)
if (!offsetRequest.isFromOrdinaryClient) {
allOffsets
} else {
val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))//过滤掉hw以后的offsets,因为那些都不是应该用户可见的
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
} catch {
// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case utpe: UnknownTopicOrPartitionException =>
warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage))
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
case nle: NotLeaderForPartitionException =>
warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
case e: Throwable =>
warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
}
})
val response = OffsetResponse(offsetRequest.correlationId, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
11.4 RequestKeys. MetadataKey
来源:获取MetadataCache中的信息,关于topic的元数据信息,生成者,消费者,普通调用者都可以发送此条指令。
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
流程简单,此处省略
11.5 RequestKeys. LeaderAndIsrKey
来源:Controller发现某个topic的partition的leader和isr发生改变时,通知相应的broker(比如说leader挂了,重新选择新的leader)
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
try {//根据control的指令,接受到该指令的broker可能成为某个topic的partition的leader或者follower
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
11.6 RequestKeys. StopReplicaKey
来源:Controller发送 停止该broker对某个topic的partition的fetch动作,比如topic的isr发生变化(即asr被重新分配了)
def handleStopReplicaRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)//最重要的就是停止fetch线程
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
11.7 RequestKeys. UpdateMetadataKey
来源:Controlle发现topic的元数据信息发生变化时,通知相应的Broker
def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {//更新的时钟必须是最新的,否则无法进行更新
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
"old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
controllerEpoch)
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateControllerEpochErrorMessage)
} else {//主要是更新MetadataCache中的内容
metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
controllerEpoch = updateMetadataRequest.controllerEpoch
}
}
}
11.8 RequestKeys. ControlledShutdownKey
来源:Broker关机时向Controller(leader)发送关机的指令
def handleControlledShutdownRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
//如果关机的broker是某个topic的partition的leader,则需要重新选举
//如果关机的broker不是某个topic的partition的leader,则保持不变,并下要关机的broker发送stopreplic的request(已经要关机了,其实发不发送无所谓)
//除此之外更新一些状态信息
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
ErrorMapping.NoError, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
}
11.9 RequestKeys. OffsetCommitKey
来源:消费者提交偏移量至KAFKA,内部又根据配置提交至ZK或者LOG
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
if (offsetCommitRequest.versionId == 0) {
// version 0 stores the offsets in ZK
//保存至zk
val responseInfo = offsetCommitRequest.requestInfo.map{
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
ensureTopicExists(topicAndPartition.topic)
if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
topicAndPartition.partition, metaAndError.offset.toString)
(topicAndPartition, ErrorMapping.NoError)
}
} catch {
case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
}
val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {//当作producer请求,保存在topic为 __consumer_offsets的log中
// version 1 and above store the offsets in a special Kafka topic
handleProducerOrOffsetCommitRequest(request)
}
}
11.10 RequestKeys. OffsetFetchKey
来源:消费者发送,获取自己提交到KAFKA上的偏移量,和OffsetCommitKey相对应
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
if (offsetFetchRequest.versionId == 0) {//配置在zk上,则从zk上获取
// version 0 reads offsets from ZK
val responseInfo = offsetFetchRequest.requestInfo.map( t => {
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
try {
ensureTopicExists(t.topic)
val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
payloadOpt match {
case Some(payload) => {
(t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
}
case None => (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
ErrorMapping.UnknownTopicOrPartitionCode))
}
} catch {
case e: Throwable =>
(t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
}
})
val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {//否则从kafka内部的log中获取
// version 1 reads offsets from Kafka
val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
)
val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
val knownStatus =
if (knownTopicPartitions.size > 0)
offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
else
Map.empty[TopicAndPartition, OffsetMetadataAndError]
val status = unknownStatus ++ knownStatus
val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
trace("Sending offset fetch response %s for correlation id %d to client %s."
.format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
}
11.11 RequestKeys. ConsumerMetadataKey
来源:一般情况下消费者发送,获取kafka中topic为__consumer_offsets的元数据信息
def handleConsumerMetadataRequest(request: RequestChannel.Request) {
val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
val response =
offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
partitionMetadata.leader.map { leader =>
ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
}.getOrElse(errorResponse)
}.getOrElse(errorResponse)
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11817.html