kafka源码解析之六SocketServer详解编程语言

class SocketServer(val brokerId: Int, 
                   val host: String, 
                   val port: Int, 
                   val numProcessorThreads: Int, 
                   val maxQueuedRequests: Int, 
                   val sendBufferSize: Int, 
                   val recvBufferSize: Int, 
                   val maxRequestSize: Int = Int.MaxValue, 
                   val maxConnectionsPerIp: Int = Int.MaxValue, 
                   val connectionsMaxIdleMs: Long, 
                   val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { 
this.logIdent = "[Socket Server on Broker " + brokerId + "], " 
private val time = SystemTime 
private val processors = new Array[Processor](numProcessorThreads) 
@volatile private var acceptor: Acceptor = null 
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)// 
/* a meter to track the average free capacity of the network processors */ 
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) 
/** 
 * Start the socket server 
 */ 
def startup() { 
  val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) 
  for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求 
    processors(i) = new Processor(i,  
                                  time,  
                                  maxRequestSize,  
                                  aggregateIdleMeter, 
                                  newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), 
                                  numProcessorThreads,  
                                  requestChannel, 
                                  quotas, 
                                  connectionsMaxIdleMs) 
    Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() 
  } 
 
  newGauge("ResponsesBeingSent", new Gauge[Int] { 
    def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } 
  }) 
  // register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener 
  requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) 
  // start accepting connections 接受网络连接请求 
  this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) 
  Utils.newThread("kafka-socket-acceptor", acceptor, false).start() 
  acceptor.awaitStartup 
  info("Started") 
} 
}

Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request

private[kafka] class Acceptor(val host: String,  
                              val port: Int,  
                              private val processors: Array[Processor], 
                              val sendBufferSize: Int,  
                              val recvBufferSize: Int, 
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { 
  val serverChannel = openServerSocket(host, port) 
 
  /** 
   * Accept loop that checks for new connection attempts 
   */ 
  def run() { 
    serverChannel.register(selector, SelectionKey.OP_ACCEPT); 
    startupComplete() 
    var currentProcessor = 0 
    while(isRunning) { 
      val ready = selector.select(500) 
      if(ready > 0) { 
        val keys = selector.selectedKeys() 
        val iter = keys.iterator() 
        while(iter.hasNext && isRunning) { 
          var key: SelectionKey = null 
          try { 
            key = iter.next 
            iter.remove() 
            if(key.isAcceptable) 
               accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request 
            else 
               throw new IllegalStateException("Unrecognized key state for acceptor thread.") 
            // round robin to the next processor thread 轮训 
            currentProcessor = (currentProcessor + 1) % processors.length 
          } catch { 
            case e: Throwable => error("Error while accepting connection", e) 
          } 
        } 
      } 
    } 
    debug("Closing server socket and selector.") 
    swallowError(serverChannel.close()) 
    swallowError(selector.close()) 
    shutdownComplete() 
  } 
}
那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { 
  private var responseListeners: List[(Int) => Unit] = Nil 
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收 
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放 
  for(i <- 0 until numProcessors) 
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() 
}
即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:

kafka源码解析之六SocketServer详解编程语言

代码如下:
private[kafka] class Processor(val id: Int, 
                               val time: Time, 
                               val maxRequestSize: Int, 
                               val aggregateIdleMeter: Meter, 
                               val idleMeter: Meter, 
                               val totalProcessorThreads: Int, 
                               val requestChannel: RequestChannel, 
                               connectionQuotas: ConnectionQuotas, 
                               val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) { 
override def run() { 
  startupComplete() 
  while(isRunning) { 
    // setup any new connections that have been queued up 
    configureNewConnections() 
    // register any new responses for writing 
    processNewResponses()//receive 对应阻塞队列responseQueue的response 
    val startSelectTime = SystemTime.nanoseconds 
    val ready = selector.select(300) 
    currentTimeNanos = SystemTime.nanoseconds 
    val idleTime = currentTimeNanos - startSelectTime 
    idleMeter.mark(idleTime) 
    // We use a single meter for aggregate idle percentage for the thread pool. 
    // Since meter is calculated as total_recorded_value / time_window and 
    // time_window is independent of the number of threads, each recorded idle 
    // time should be discounted by # threads. 
    aggregateIdleMeter.mark(idleTime / totalProcessorThreads) 
 
    trace("Processor id " + id + " selection time = " + idleTime + " ns") 
    if(ready > 0) { 
      val keys = selector.selectedKeys() 
      val iter = keys.iterator() 
      while(iter.hasNext && isRunning) { 
        var key: SelectionKey = null 
        try { 
          key = iter.next 
          iter.remove() 
          if(key.isReadable) 
            read(key)//获取connection的request 
          else if(key.isWritable) 
            write(key)//写相应request的response 
          else if(!key.isValid) 
            close(key) 
          else 
            throw new IllegalStateException("Unrecognized key state for processor thread.") 
        } catch { 
          case e: EOFException => { 
            info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress)) 
            close(key) 
          } case e: InvalidRequestException => { 
            info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) 
            close(key) 
          } case e: Throwable => { 
            error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) 
            close(key) 
          } 
        } 
      } 
    } 
    maybeCloseOldestConnection 
  } 
  debug("Closing selector.") 
  closeAll() 
  swallowError(selector.close()) 
  shutdownComplete() 
} 
 
def read(key: SelectionKey) { 
  lruConnections.put(key, currentTimeNanos) 
  val socketChannel = channelFor(key) 
  var receive = key.attachment.asInstanceOf[Receive] 
  if(key.attachment == null) { 
    receive = new BoundedByteBufferReceive(maxRequestSize) 
    key.attach(receive) 
  } 
  val read = receive.readFrom(socketChannel) 
  val address = socketChannel.socket.getRemoteSocketAddress(); 
  trace(read + " bytes read from " + address) 
  if(read < 0) { 
    close(key) 
  } else if(receive.complete) { 
    val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request 
    requestChannel.sendRequest(req)//把request发送给requestChannel 
    key.attach(null) 
    // explicitly reset interest ops to not READ, no need to wake up the selector just yet 
    key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) 
  } else { 
    // more reading to be done 
    trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) 
    key.interestOps(SelectionKey.OP_READ) 
    wakeup() 
  } 
} 
 
def write(key: SelectionKey) { 
  val socketChannel = channelFor(key) 
  val response = key.attachment().asInstanceOf[RequestChannel.Response] 
  val responseSend = response.responseSend//获取response的内容 
  if(responseSend == null) 
    throw new IllegalStateException("Registered for write interest but no response attached to key.") 
  val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket 
  trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) 
  if(responseSend.complete) { 
    response.request.updateRequestMetrics() 
    key.attach(null) 
    trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) 
    key.interestOps(SelectionKey.OP_READ) 
  } else { 
    trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) 
    key.interestOps(SelectionKey.OP_WRITE) 
    wakeup() 
  } 
} 
}

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11821.html

(0)
上一篇 2021年7月19日 11:52
下一篇 2021年7月19日 11:52

相关推荐

发表回复

登录后才能评论