yarn应用程序设计方法
当需要讲一个新的程序运行在yarn上时,需要编写两个组件Client和ApplicationMaster:
- client负责向resourceManager提交ApplicationMaster,并查询任务云翔状态
- ApplicationMaster负责向ResourceManager申请资源(以container形式),并与nodemanager通信以启动各个container。同时AM需要监控各个任务运行状态并在失败是为其重新申请资源
其中涉及三个RPC协议:
- ApplicationClientProtocol:Client与ResourceManager通信
- ApplicationMasterProtocol: ApplicationMaster和ResourceManager通信
- Container《anagementProtocol: ApplicationMaster和NodeManager通信
客户端编写
客户端编写可以参考org.apache.hadoop.yarn.client.api.YarnClient。这个类提供了基本的client和yarn通信的函数的封装。另外可以查看TestYarnClient来查看yarnClient的具体使用。
clinet与yarn通信,也就是与resourceManager通信。它主要分为几步:
client通过ApplicationClientProtocol#getNewApplication从RM获取applicationID。除了书上说的,这里贴一段单元测试中的代码分析:
12345678//建立resourceManager的rpc clientprivate ApplicationClientProtocol rmClient;this.rmClient = (ApplicationClientProtocol) rpc.getProxy(...);YarnClientApplication newApp = rmClient.createApplication();ApplicationId appId =newApp.getNewApplicationResponse().getApplicationId();Client将ApplicationMaster提交到ResourceManager。主要是设置application id等,具体可以见下面的注释。
123456789101112131415161718192021222324252627// Create launch context for app masterApplicationSubmissionContext appContext= Records.newRecord(ApplicationSubmissionContext.class);// set the application idappContext.setApplicationId(appId);// set the application nameappContext.setApplicationName("test");// Set the priority for the application masterPriority pri = Records.newRecord(Priority.class);pri.setPriority(1);appContext.setPriority(pri);// Set the queue to which this application is to be submitted in the RMappContext.setQueue("default");// Set up the container launch context for the application masterContainerLaunchContext amContainer= Records.newRecord(ContainerLaunchContext.class);appContext.setAMContainerSpec(amContainer);appContext.setResource(Resource.newInstance(1024, 1));appContext.setUnmanagedAM(unmanaged);// Submit the application to the applications managerrmClient.submitApplication(appContext);这里面除了很明确的几个参数,还有几个:
- unmanged:是否由客户端自己启动ApplicationMaster
- amContainer: ContainerLaunchContext的对象,用来描述ApplicationMaster所在的container的信息
- 还有一些常用的rpc函数,比如:
- rmClient.getApplicationReport(appId):查看application
- client.killApplication:停止一个任务
这里注意appContext和amContainer都是Records生成的。这个Record主要是用来构造一个可序列化的对象。而可序列化是rpc的基本条件。
client一般是直接和ResourceManager通信。但是在applicationMaster生成后也可以直接与applicationMaster通信。比如mr的任务会通过MRClientProtocol与MRAppMaster通信,不过我没找到代码。。。
ApplicationMaster
AM的编写主要是两部分,一部分是AM和ResourceManager通信的部分。另一部分是AM和Nodemanager通信的部分。
AM-RM
AM和ResourceManager通信主要分为3个步骤:
- 通过ApplicationClientProtocol#registerApplicationMaster向resourceManager注册。
- 通过ApplicationClientProtocol#allocate向resourceManager申请资源。resourceManager会返回给它可用的资源封装。这里注意allocate会被不断轮询调用,他会维持AM与RM的周期性心跳。RM靠这个保证AM还活着
- 任务结束时调用ApplicationClientProtocol#finishApplicationMaster来告诉RM任务结束并退出。
yarn提供了一个AM-RM的编程库,里面实现了一个同步的AMRMClientImpl和一个异步的AMRMClientAsyncImpl。一般都使用异步的,他将操作封装成一个时间放在一个线程池执行,可以看到里面有两个子类CallbackHandlerThread和 HeartbeatThread。使用这个的时候主要是要实现自己的CallbackHandler,实现AMRMClientAsync.CallbackHandler接口。接口中定义了各种情况下应该进行什么操作。而具体什么时候调用这些函数在HeartbeatThread的run()和CallbackHandlerThread()中都已经写好了,我们不用关系。
|
|
同时,用户可以用ApplicationClient
AM-NM
AM与Nodemanager的通信也分为三步。
- AM将申请到的资分配给内部任务,并且启动NM上的container。这个主要是靠ContainerManagementProtocol#startContainer函数
- AM通过ContainerManagementProtocol#getContainerStatus轮询container的状态。如果container失败了Am会进行容错处理
- 当Container结束时,通过ContainerManagementProtocol#stopContainer来释放相关资源
Yarn同样提供了一个编程库:NMClientImpl和NMClientAsyncImpl。也是只需要实现NMClientAsync.CallbackHandler,其他的事情两个Impl帮你做就好。
AM-NM与AM-RM唯一不同的一点在于AM要与多个counter通信.NMClientImpl维护一个ConcurrentMap<ContainerId, StartedContainer> startedContainers
储存所有的container。而在NMClientAsyncImpl中,有一个BlockingQueue<ContainerEvent> events
将事件缓存起来,也就是说AM是异步的将与container的通信都包装成event的形式处理的。主线程负责将各种事件put进evnets,而另外一个eventDispatcherThread线程异步处理各种事件。
Distributed Shell
Distributed shell 是一个很好的学习yarn application的示例程序。它主要有各两个类,org.apache.hadoop.yarn.applications.distributedshell.Client
和org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster
Client 主要看run函数,他有一个成员变量yarnClient封装了和RM的rpc。之后run函数就按照步骤给RM提交application。最后函数调用monitorApplication()轮询的查看application的运行状态并在运行结束时返回。
ApplicationMaster更是展示了如何使用yarn提供的编程库。他有两个重要的成员变量:amRmClient和nmClientAsync分别负责和rm以及和nm的通信。启动两个client以后,他有一下几步
- 首先init application设置各种初始变量,就是处理运行时传进来的参数
- amRmClient调用registerApplicationMaster注册一个application
- 设置container的meemory,vCore信息
- amRMclient调用addContainerRequest申请资源
- 轮询查看client的状态。这里主要看numCompletedContainers.get() 是否等于 numTotalContainers。而这个numCompletedContainers是由RMCallbackHandler设置的,也就是nmClientAsync内部的子线程设置。
- 完成最后的收尾操作,如unregister等
其中1是在init函数,2-4是在run函数,5、6是在finish函数。利用yarn的库其实很多处理逻辑,比如am通过nm检查container状态等都省了,我们只需要提供自己的callbackHandler即可。
至于书上的思考题,主要就是application run的时候设置一下申请container的属性来控制container的位置。
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:http://thousandhu.github.io/2016/07/17/深入解析yarn架构设计与技术实现-yarn-应用程序设计方法/