kafka源码解析之三Broker的启动详解编程语言

正常启动kafka的命令如下:

kafka源码解析之三Broker的启动详解编程语言

查看kafka-server-start.sh脚本:

kafka源码解析之三Broker的启动详解编程语言

查看Kafka.Kafka类:

object Kafka extends Logging { 
 
  def main(args: Array[String]): Unit = { 
    if (args.length != 1) { 
      println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) 
      System.exit(1) 
    } 
    try { 
      val props = Utils.loadProps(args(0)) 
      val serverConfig = new KafkaConfig(props)//组装配置信息 
      KafkaMetricsReporter.startReporters(serverConfig.props) 
      val kafkaServerStartable = new KafkaServerStartable(serverConfig) 
      // attach shutdown handler to catch control-c 
      Runtime.getRuntime().addShutdownHook(new Thread() { 
        override def run() = { 
          kafkaServerStartable.shutdown//注册shutdown时候的钩子函数 
        } 
      }) 
      kafkaServerStartable.startup//启动kafkaServerStartable 
      kafkaServerStartable.awaitShutdown//正常情况下broker一直在这里wait 
    } 
    catch { 
      case e: Throwable => fatal(e) 
    } 
    System.exit(0) 
  } 
}

其中上面的kafkaServerStartable封装了KafkaServer,最终执行startup的是KafkaServer。

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { 
  private val server = new KafkaServer(serverConfig) 
  def startup() { 
    try { 
      server.startup() 
      AppInfo.registerInfo() 
    } 
    catch { 
      case e: Throwable => 
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) 
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code 
        System.exit(1) 
    } 
  } 
}
查看KafkaServer:
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { 
  this.logIdent = "[Kafka Server " + config.brokerId + "], " 
  private var isShuttingDown = new AtomicBoolean(false) 
  private var shutdownLatch = new CountDownLatch(1) 
  private var startupComplete = new AtomicBoolean(false) 
  val brokerState: BrokerState = new BrokerState 
  val correlationId: AtomicInteger = new AtomicInteger(0) 
  var socketServer: SocketServer = null 
  var requestHandlerPool: KafkaRequestHandlerPool = null 
  var logManager: LogManager = null 
  var offsetManager: OffsetManager = null 
  var kafkaHealthcheck: KafkaHealthcheck = null 
  var topicConfigManager: TopicConfigManager = null 
  var replicaManager: ReplicaManager = null 
  var apis: KafkaApis = null 
  var kafkaController: KafkaController = null 
  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) 
  var zkClient: ZkClient = null 
  newGauge( 
    "BrokerState", 
    new Gauge[Int] { 
      def value = brokerState.currentState 
    } 
  ) 
  /** 
   * Start up API for bringing up a single instance of the Kafka server. 
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers 
   */ 
  def startup() { 
    try { 
      info("starting") 
      brokerState.newState(Starting) 
      isShuttingDown = new AtomicBoolean(false) 
      shutdownLatch = new CountDownLatch(1) 
      /* start scheduler */ 
      kafkaScheduler.startup() 
      /* setup zookeeper */ 
      zkClient = initZk() 
      /* start log manager */ 
      logManager = createLogManager(zkClient, brokerState) 
      logManager.startup() 
      socketServer = new SocketServer(config.brokerId, 
        config.hostName, 
        config.port, 
        config.numNetworkThreads, 
        config.queuedMaxRequests, 
        config.socketSendBufferBytes, 
        config.socketReceiveBufferBytes, 
        config.socketRequestMaxBytes, 
        config.maxConnectionsPerIp, 
        config.connectionsMaxIdleMs, 
        config.maxConnectionsPerIpOverrides) 
      socketServer.startup() 
      //1.作为备:fetch 消息,跟上leader 2.作为主:刷新ISR 
      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) 
      /* start offset manager */ 
      //1.保存offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] 
      //2.删除过期的offsetsCache 
      offsetManager = createOffsetManager() 
      kafkaController = new KafkaController(config, zkClient, brokerState) 
      /* start processing requests */ 
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) 
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) 
      brokerState.newState(RunningAsBroker) 
      Mx4jLoader.maybeLoad() 
      replicaManager.startup() 
      kafkaController.startup() 
      //处理topic的config改变通知 
      topicConfigManager = new TopicConfigManager(zkClient, logManager) 
      topicConfigManager.startup() 
      /* tell everyone we are alive */ 
      //在/broker/id节点建立kafka的broker信息 
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) 
      kafkaHealthcheck.startup() 
      registerStats() 
      startupComplete.set(true) 
      info("started") 
    } 
    catch { 
      case e: Throwable => 
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) 
        shutdown() 
        throw e 
    } 
  } 
}
执行完之后,Broker上的KafkaServer正式启动。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/11822.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论