1、创建并启动 Client
YarnClient 内容通过 ApplicationClientProtocol 与 ResourceManager 通信,向 RM 的ApplicationsManager
申请 Application。
跟踪进去可以在 YarnClientImpl
找到 rpc:this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
2、通过YarnClient 创建 Application
GetNewApplicationResponse
中除了包含 ApplicationId
,还包括集群最大/最小资源,给任务启动设置的资源作参考。
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
3、关键:完善 ApplicationSubmissionContext
需要在 ApplicationSubmissionContext
中定义 RM 启动 AM 时所需的全部信息,主要包括:
- app 信息:id,name
- 队列、优先级信息
- 提交用户
- ContainerLaunchContext:定义 AM 启动所需信息
- RECT
- Resources (binaries, jars, files etc.):其中包括 Application master jar
- Environment settings (CLASSPATH etc.)
- Command to be executed
- security Tokens
- RECT
// 3 完善 ApplicationSubmissionContext 所需内容
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId applicationId = appContext.getApplicationId();
// 3.1 设置application name
appContext.setApplicationName("my-test-app");
// 3.2 设置ContainerLaunchContext
// localResources, env, commands 等
// application master 的 jar 放到 localResources 中
// 这部分较长省略,请到代码中查看
ContainerLaunchContext amContainerCtx = createAMContainerLaunchContext(
conf, app.getApplicationSubmissionContext().getApplicationId());
appContext.setAMContainerSpec(amContainerCtx);
// 3.3 设置优先级
Priority pri = Priority.newInstance(0);
appContext.setPriority(pri);
// 3.4 设置队列
appContext.setQueue("default");
// 3.5 设置 am 资源
int amMemory = 2048;
int amVCores = 2;
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
4、提交 Application
提交后,RM 接收到 Application,根据资源请求分配容器,最终将 AM 启动在容器中。
这里交给 YarnClientImpl
执行 rmClient.submitApplication(request)
,通过 RPC ApplicationClientProtocol
提交到 RM
ApplicationId appId = yarnClient.submitApplication(appContext);
5、获取任务进度信息
ApplicationReport report = yarnClient.getApplicationReport(appId);
log.info("Got application report " +
", clientToAMToken=" + report.getClientToAMToken()
+ ", appDiagnostics=" + report.getDiagnostics()
+ ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue()
+ ", appMasterRpcPort=" + report.getRpcPort()
+ ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser());
6、kill Application
当 Application 运行了过长的时间或者其他的原因,client 可以 kill application。
流程是:client 像 RM 发送 kill 信号,再传递给 AM
yarnClient.killApplication(appId);
二)涉及的通信协议
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/293273.html