kafka源码解析之七KafkaRequestHandlerPool详解编程语言

KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threadsKafkaRequestHandler,每个KafkaRequestHandlerRequestChannel
. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列
class KafkaRequestHandlerPool(val brokerId: Int, 
                              val requestChannel: RequestChannel, 
                              val apis: KafkaApis, 
                              numThreads: Int) extends Logging with KafkaMetricsGroup { 
 
  /* a meter to track the average free capacity of the request handlers */ 
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) 
 
  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " 
  val threads = new Array[Thread](numThreads) 
  val runnables = new Array[KafkaRequestHandler](numThreads) 
  for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler 
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) 
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) 
    threads(i).start() 
  } 
 
  def shutdown() { 
    info("shutting down") 
    for(handler <- runnables) 
      handler.shutdown 
    for(thread <- threads) 
      thread.join 
    info("shut down completely") 
  } 
}

class KafkaRequestHandler(id: Int, 
                          brokerId: Int, 
                          val aggregateIdleMeter: Meter, 
                          val totalHandlerThreads: Int, 
                          val requestChannel: RequestChannel, 
                          apis: KafkaApis) extends Runnable with Logging { 
  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " 
 
  def run() { 
    while(true) { 
      try { 
        var req : RequestChannel.Request = null 
        while (req == null) { 
          // We use a single meter for aggregate idle percentage for the thread pool. 
          // Since meter is calculated as total_recorded_value / time_window and 
          // time_window is independent of the number of threads, each recorded idle 
          // time should be discounted by # threads. 
          val startSelectTime = SystemTime.nanoseconds 
          req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request 
          val idleTime = SystemTime.nanoseconds - startSelectTime 
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads) 
        } 
        if(req eq RequestChannel.AllDone) { 
          debug("Kafka request handler %d on broker %d received shut down command".format( 
            id, brokerId)) 
          return 
        } 
        req.requestDequeueTimeMs = SystemTime.milliseconds 
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) 
        apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i] 
      } catch { 
        case e: Throwable => error("Exception when handling request", e) 
      } 
    } 
  } 
  def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone) 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11820.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论