正常启动kafka的命令如下:
查看kafka-server-start.sh脚本:
查看Kafka.Kafka类:
object Kafka extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
System.exit(1)
}
try {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)//组装配置信息
KafkaMetricsReporter.startReporters(serverConfig.props)
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown//注册shutdown时候的钩子函数
}
})
kafkaServerStartable.startup//启动kafkaServerStartable
kafkaServerStartable.awaitShutdown//正常情况下broker一直在这里wait
}
catch {
case e: Throwable => fatal(e)
}
System.exit(0)
}
}
其中上面的kafkaServerStartable封装了KafkaServer,最终执行startup的是KafkaServer。
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
private val server = new KafkaServer(serverConfig)
def startup() {
try {
server.startup()
AppInfo.registerInfo()
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
// KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
System.exit(1)
}
}
}
查看KafkaServer:
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private var startupComplete = new AtomicBoolean(false)
val brokerState: BrokerState = new BrokerState
val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
var offsetManager: OffsetManager = null
var kafkaHealthcheck: KafkaHealthcheck = null
var topicConfigManager: TopicConfigManager = null
var replicaManager: ReplicaManager = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
newGauge(
"BrokerState",
new Gauge[Int] {
def value = brokerState.currentState
}
)
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
try {
info("starting")
brokerState.newState(Starting)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkClient = initZk()
/* start log manager */
logManager = createLogManager(zkClient, brokerState)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes,
config.maxConnectionsPerIp,
config.connectionsMaxIdleMs,
config.maxConnectionsPerIpOverrides)
socketServer.startup()
//1.作为备:fetch 消息,跟上leader 2.作为主:刷新ISR
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
/* start offset manager */
//1.保存offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
//2.删除过期的offsetsCache
offsetManager = createOffsetManager()
kafkaController = new KafkaController(config, zkClient, brokerState)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
replicaManager.startup()
kafkaController.startup()
//处理topic的config改变通知
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
/* tell everyone we are alive */
//在/broker/id节点建立kafka的broker信息
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
registerStats()
startupComplete.set(true)
info("started")
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
shutdown()
throw e
}
}
}
执行完之后,Broker上的KafkaServer正式启动。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/11822.html