深入解析yarn架构设计与技术实现-yarn 应用程序设计方法

yarn应用程序设计方法

当需要讲一个新的程序运行在yarn上时,需要编写两个组件Client和ApplicationMaster:

  • client负责向resourceManager提交ApplicationMaster,并查询任务云翔状态
  • ApplicationMaster负责向ResourceManager申请资源(以container形式),并与nodemanager通信以启动各个container。同时AM需要监控各个任务运行状态并在失败是为其重新申请资源

其中涉及三个RPC协议:

  • ApplicationClientProtocol:Client与ResourceManager通信
  • ApplicationMasterProtocol: ApplicationMaster和ResourceManager通信
  • Container《anagementProtocol: ApplicationMaster和NodeManager通信

客户端编写

客户端编写可以参考org.apache.hadoop.yarn.client.api.YarnClient。这个类提供了基本的client和yarn通信的函数的封装。另外可以查看TestYarnClient来查看yarnClient的具体使用。

clinet与yarn通信,也就是与resourceManager通信。它主要分为几步:

  1. client通过ApplicationClientProtocol#getNewApplication从RM获取applicationID。除了书上说的,这里贴一段单元测试中的代码分析:

    1
    2
    3
    4
    5
    6
    7
    8
    //建立resourceManager的rpc client
    private ApplicationClientProtocol rmClient;
    this.rmClient = (ApplicationClientProtocol) rpc.getProxy(...);
    YarnClientApplication newApp = rmClient.createApplication();
    ApplicationId appId =
    newApp.getNewApplicationResponse().getApplicationId();
  2. Client将ApplicationMaster提交到ResourceManager。主要是设置application id等,具体可以见下面的注释。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // Create launch context for app master
    ApplicationSubmissionContext appContext
    = Records.newRecord(ApplicationSubmissionContext.class);
    // set the application id
    appContext.setApplicationId(appId);
    // set the application name
    appContext.setApplicationName("test");
    // Set the priority for the application master
    Priority pri = Records.newRecord(Priority.class);
    pri.setPriority(1);
    appContext.setPriority(pri);
    // Set the queue to which this application is to be submitted in the RM
    appContext.setQueue("default");
    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer
    = Records.newRecord(ContainerLaunchContext.class);
    appContext.setAMContainerSpec(amContainer);
    appContext.setResource(Resource.newInstance(1024, 1));
    appContext.setUnmanagedAM(unmanaged);
    // Submit the application to the applications manager
    rmClient.submitApplication(appContext);

    这里面除了很明确的几个参数,还有几个:

    • unmanged:是否由客户端自己启动ApplicationMaster
    • amContainer: ContainerLaunchContext的对象,用来描述ApplicationMaster所在的container的信息
  3. 还有一些常用的rpc函数,比如:
    • rmClient.getApplicationReport(appId):查看application
    • client.killApplication:停止一个任务

这里注意appContext和amContainer都是Records生成的。这个Record主要是用来构造一个可序列化的对象。而可序列化是rpc的基本条件。

client一般是直接和ResourceManager通信。但是在applicationMaster生成后也可以直接与applicationMaster通信。比如mr的任务会通过MRClientProtocol与MRAppMaster通信,不过我没找到代码。。。

ApplicationMaster

AM的编写主要是两部分,一部分是AM和ResourceManager通信的部分。另一部分是AM和Nodemanager通信的部分。

AM-RM

AM和ResourceManager通信主要分为3个步骤:

  1. 通过ApplicationClientProtocol#registerApplicationMaster向resourceManager注册。
  2. 通过ApplicationClientProtocol#allocate向resourceManager申请资源。resourceManager会返回给它可用的资源封装。这里注意allocate会被不断轮询调用,他会维持AM与RM的周期性心跳。RM靠这个保证AM还活着
  3. 任务结束时调用ApplicationClientProtocol#finishApplicationMaster来告诉RM任务结束并退出。

yarn提供了一个AM-RM的编程库,里面实现了一个同步的AMRMClientImpl和一个异步的AMRMClientAsyncImpl。一般都使用异步的,他将操作封装成一个时间放在一个线程池执行,可以看到里面有两个子类CallbackHandlerThread和 HeartbeatThread。使用这个的时候主要是要实现自己的CallbackHandler,实现AMRMClientAsync.CallbackHandler接口。接口中定义了各种情况下应该进行什么操作。而具体什么时候调用这些函数在HeartbeatThread的run()和CallbackHandlerThread()中都已经写好了,我们不用关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public interface CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
public void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
public void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
public void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
public void onNodesUpdated(List<NodeReport> updatedNodes);
public float getProgress();
/**
* Called when error comes from RM communications as well as from errors in
* the callback itself from the app. Calling
* stop() is the recommended action.
*
* @param e
*/
public void onError(Throwable e);
}

同时,用户可以用ApplicationClient

AM-NM

AM与Nodemanager的通信也分为三步。

  1. AM将申请到的资分配给内部任务,并且启动NM上的container。这个主要是靠ContainerManagementProtocol#startContainer函数
  2. AM通过ContainerManagementProtocol#getContainerStatus轮询container的状态。如果container失败了Am会进行容错处理
  3. 当Container结束时,通过ContainerManagementProtocol#stopContainer来释放相关资源

Yarn同样提供了一个编程库:NMClientImpl和NMClientAsyncImpl。也是只需要实现NMClientAsync.CallbackHandler,其他的事情两个Impl帮你做就好。

AM-NM与AM-RM唯一不同的一点在于AM要与多个counter通信.NMClientImpl维护一个ConcurrentMap<ContainerId, StartedContainer> startedContainers储存所有的container。而在NMClientAsyncImpl中,有一个BlockingQueue<ContainerEvent> events将事件缓存起来,也就是说AM是异步的将与container的通信都包装成event的形式处理的。主线程负责将各种事件put进evnets,而另外一个eventDispatcherThread线程异步处理各种事件。

Distributed Shell

Distributed shell 是一个很好的学习yarn application的示例程序。它主要有各两个类,org.apache.hadoop.yarn.applications.distributedshell.Clientorg.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster

Client 主要看run函数,他有一个成员变量yarnClient封装了和RM的rpc。之后run函数就按照步骤给RM提交application。最后函数调用monitorApplication()轮询的查看application的运行状态并在运行结束时返回。

ApplicationMaster更是展示了如何使用yarn提供的编程库。他有两个重要的成员变量:amRmClient和nmClientAsync分别负责和rm以及和nm的通信。启动两个client以后,他有一下几步

  1. 首先init application设置各种初始变量,就是处理运行时传进来的参数
  2. amRmClient调用registerApplicationMaster注册一个application
  3. 设置container的meemory,vCore信息
  4. amRMclient调用addContainerRequest申请资源
  5. 轮询查看client的状态。这里主要看numCompletedContainers.get() 是否等于 numTotalContainers。而这个numCompletedContainers是由RMCallbackHandler设置的,也就是nmClientAsync内部的子线程设置。
  6. 完成最后的收尾操作,如unregister等

其中1是在init函数,2-4是在run函数,5、6是在finish函数。利用yarn的库其实很多处理逻辑,比如am通过nm检查container状态等都省了,我们只需要提供自己的callbackHandler即可。

至于书上的思考题,主要就是application run的时候设置一下申请container的属性来控制container的位置。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/07/17/深入解析yarn架构设计与技术实现-yarn-应用程序设计方法/