spark源码(七)Worker receive 方法


receive 方法其实是大量的case,分别对应处理不同的场景

    case msg: RegisterWorkerResponse 
    case SendHeartbeat
    case WorkDirCleanup
    case MasterChanged
    case ReconnectWorker
    case LaunchExecutor
    case executorStateChanged: ExecutorStateChanged
    case KillExecutor(masterUrl, appId, execId)
    case LaunchDriver(driverId, driverDesc, resources_)
    case KillDriver(driverId)
    case driverStateChanged @ DriverStateChanged(driverId, state, exception)
    case ReregisterWithMaster
    case ApplicationFinished(id)
    case DecommissionWorker
    case WorkerSigPWRReceived

一. RegisterWorkerResponse 详解

    private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
        msg match {
            case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) =>
                val preferredMasterAddress = if (preferConfiguredMasterAddress) {
                  masterAddress.toSparkURL
                } else {
                  masterRef.address.toSparkURL
                }
                if (duplicate) {
                  logWarning(s"Duplicate registration at master $preferredMasterAddress")
                }    
                logInfo(s"Successfully registered with master $preferredMasterAddress")
                registered = true
                changeMaster(masterRef, masterWebUiUrl, masterAddress)/*更新master信息*/
                forwardMessageScheduler.scheduleAtFixedRate(
                  () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
                  0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)/*启动一个定时任务  开始心跳*/
                if (CLEANUP_ENABLED) {
                  logInfo(
                    s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
                  forwardMessageScheduler.scheduleAtFixedRate(
                    () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
//给自己发送一个清理目录的消息??这是干嘛的
                    CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
                }
                val execs = executors.values.map { e =>
                  new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
                }
                //给master发送一个当前状态的信息
                masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
            case RegisterWorkerFailed(message) =>
                if (!registered) {
                  logError("Worker registration failed: " + message)
                  System.exit(1)
                }
            case MasterInStandby => // Ignore. Master not yet ready.
        }
    }

二. SendHeartbeat 详解

    if (connected) { sendToMaster(Heartbeat(workerId, self)) }

    private def sendToMaster(message: Any): Unit = {
        master match {
          case Some(masterRef) => masterRef.send(message)//给master发送一个心跳信息
          case None =>
            logWarning(
              s"Dropping $message because the connection to master has not yet been established")
        }
    }

三. WorkDirCleanup 详解

    //所有的executors + drivers 目录
    val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
    try {
      val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
        val appDirs = workDir.listFiles()
        if (appDirs == null) {
          throw new IOException("ERROR: Failed to list files in " + appDirs)
        }
        appDirs.filter { dir =>
          val appIdFromDir = dir.getName
          val isAppStillRunning = appIds.contains(appIdFromDir)
          //当前是一个目录,并且不运行了,APP_DATA_RETENTION_SECONDS 是过了这个时间就清理目录配置项
          dir.isDirectory && !isAppStillRunning &&
            !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
        }.foreach { dir =>
          logInfo(s"Removing directory: ${dir.getPath}")
          Utils.deleteRecursively(dir)
          if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
              conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
            //是移除shuffle服务中的文件的   并不是是清理所用文件的
            //我也记得是任务kill的时候会看到清理目录的日志的
            shuffleService.applicationRemoved(dir.getName)
          }
        }
      }(cleanupThreadExecutor)

      cleanupFuture.failed.foreach(e =>
        logError("App dir cleanup failed: " + e.getMessage, e)
      )(cleanupThreadExecutor)
    } catch {
      case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
        logWarning("Failed to cleanup work dir as executor pool was shutdown")
    }

四. MasterChanged 详解

    logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
    //worker节点只是 被动接受主节点改变的事实  改变自身的配置即可
    //不存在 消息发送
    changeMaster(masterRef, masterWebUiUrl, masterRef.address)

    val executorResponses = executors.values.map { e =>
      WorkerExecutorStateResponse(new ExecutorDescription(
        e.appId, e.execId, e.cores, e.state), e.resources)
    }
    val driverResponses = drivers.keys.map { id =>
      WorkerDriverStateResponse(id, drivers(id).resources)}
    //把当前任务的每个状态给 master汇报一下就行了
    masterRef.send(WorkerSchedulerStateResponse(
      workerId, executorResponses.toList, driverResponses.toSeq))

五. ReconnectWorker 详解

    registerWithMaster() //这个方法上面有介绍的

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/289127.html

(0)
上一篇 2022年9月13日
下一篇 2022年9月13日

相关推荐

发表回复

登录后才能评论