本期内容:
1、数据接收架构设计模式
2、数据接收源码彻底研究
1、Receiver接受数据的过程类似于MVC模式:
Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC。
Model就是Receiver,存储数据Control,就是ReceiverSupervisor,Driver是获得元数据,也就是View。
2、数据的位置信息会被封装到RDD里面。
3、Receiver接受数据,交给ReceiverSupervisor去存储数据。
4、ReceiverTracker是通过发送一个又一个的Job,每个Job只有一个Task,每个Task里面就只有一个ReceiverSupervisor,用这个函数启动每一个Receiver。
下面我们简单的看下Receiver启动流程,应用程序首先通过JobScheduler的start方法来启动receiverTracker的start方法:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() //receiver启动 jobGenerator.start() logInfo("Started JobScheduler") }
通过调用receiverTracker.start()方法来进行一系列的操作:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,获取receiver的状态 if (!skipReceiverLaunch) launchReceivers() //启动receiver logInfo("ReceiverTracker started") trackerState = Started } }
下面通过画图简单的描述下Receiver启动的内部机制:
参考博客:http://blog.csdn.net/hanburgud/article/details/51471047
http://lqding.blog.51cto.com/9123978/1774426
原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/194185.html