深入解析yarn架构设计与技术实现-资源调度器

深入解析yarn架构设计与技术实现-资源调度器

Yarn资源调度器的基本框架

yarn的资源调度器是可以配置的,默认的是FIFO、CapacityScheduler和Fair Scheduler三种。

这三个scheduler都继承一个AbstractYarnScheduler。这个抽象类extends AbstractService implements ResourceScheduler 。extends Abstractservice说明是一个服务,而实现了ResourceScheduler的函数是scheduler的主要功能。 ResourceScheduler本质是个eventHandler。他一共处理10种事件,分别是:

  • NODE_ADDED: 集群中增加一个节点
  • NODE_REMOVED: 集群中移除一个节点
  • NODE_RESOURCE_UPDATE: 集群中有一个节点的资源增加了
  • NODE_LABELS_UPDATE:更新node labels。node labels可以用于划分一个cluster的node,每个node可以有一个label,具体见文档YARN Node Labels
  • NODE_UPDATE:该时间是NodeManager通过心跳和ResourceManager通信时发送的,会汇报该node的资源使用情况,同时出发一次分配操作。
  • APP_ADDED: 增加一个Application
  • APP_REMOVED:移除一个application
  • APP_ATTEMPT_ADDED:增加一个application Attempt
  • APP_ATTEMPT_REMOVED:移除一个application attempt
  • CONTAINER_EXPIRED:回收一个超时的container

Yarn的资源现在主要以cpu和内存为主,对于cpu,支持将物理核划分为虚拟核,但是调度时必须是虚拟核的整倍数。

YARN支持的调度语义:

  • 请求某个节点上的特定资源量
  • 请求某个特定机架上的特定资源量
  • 将某些节点加入(或移除)黑名单,不再为自己分配这些节点上的资源
  • 请求归还某些资源

不支持的调度语义:

  • 请求任意节点上的特定资源量
  • 请求任意机架上的特定资源量
  • 请求一组或几组符合某种特质的资源
  • 超细粒度资源。比如CPU性能要求、绑定CPU等
  • 动态调整Container资源,允许根据需要动态调整Container资源量

资源调度模型

yarn采用双层资源调度模型,第一层中ResourceManager中的资源调度器将资源分配给ApplicationMaster,第二层ApplicationMaster在自己内部再分配资源。资源分配是个异步的过程:

  1. NodeManager通过周期心跳汇报节点信息
  2. ResourceManager为NodeManager返回一个心跳应答,包括需要释放的container列表等信息
  3. ResourceManager收到的NodeManager信息触发一个Node_UPDATED事件,之后会按照一定策略将该节点上的资源分配到各个应用,并将分配结果放到一个内存数据结构中
  4. ApplicationMaster向resourceManager发送心跳,获得最新分配的container资源
  5. ApplicationMaster将收到的新container分配给内部任务

资源保证机制

Yarn采用的是增量分配资源的机制,当应用程序申请资源暂时无法保证时,优先为应用程序预留节点上的资源,直到累计释放的空闲资源达到其要求。这样的分配虽然会导致浪费,但是不会使得一个application饿死

资源分配算法

为了支持多维资源分配,Yarn使用的是DRF(Dominant Resouce Fairness,主资源公平调度算法)。他首先将多资源问题简化,将每个需求中占用资源总额比例最大的那个资源作为主资源,之后对所有需求只看主资源进行划分。

划分的过程类似max-min算法,目标是最大化所有主资源中最小的,每次分配都是分配给目前dominant share中最小的需求者一份他需求的资源。具体可以参考这偏文章

资源抢占模型

yarn中每个队列可以设置一个最小资源量和最大资源量。其中最小资源是资源紧缺情况下每个队列需保证的资源量,而最大资源是极端情况下队列也不能超过的资源量。通常情况下,为了提高资源利用率,当队列不需要任何资源时,并不会满足其最小资源,而是将资源分配给其他队列,如资源调度器会将负载较轻的队列(比如A)的资源暂时分给负载重的队列B。而当负载较轻的队列A突然收到任务时,调度器会将分出去的资源还给该队列。

由于在A收到任务时B可能正在使用从A借过来的资源,立即杀了这些container意味着浪费了这些container正在运行的计算,所以一般是这样的步骤:

  1. SchedulingEditPolicy探测到需要抢占的资源,通过DROP_RESERVATION和PREEMPT_CONTAINER发送给ResourceManager
  2. ResourceManager调用ResourceScheduler的dropContainerReservation和preemptContainer函数,标注带抢占的container。
  3. ResourceManager把需要抢占的container和资源总量通过心跳发送给ApplicationMaster。
  4. ApplicationMaster收到请求后,可能做这三种操作:
    1. 杀死这些container
    2. 选择并杀死其他container以凑个需求的资源数量
    3. 不做处理,等待container自行结束释放资源或者被ResourceManager杀死
  5. SchedulingEditPolicy探测到一段时间内,ApplicationMaster未自行杀死约定container,则将这些container封装到KILL_CONTAINER事件中发送给ResourceManager。
  6. ResourceManager调用ResourceScheduler的killContainer函数杀死这些Container(通过和NodeManager的心跳真正实现杀死动作)
  7. ResourceManager收到来自ApplicationMaster的心跳,通过心跳将已经杀死的container列表发给他。

其中有个问题就是要确定抢占发生时到底抢占哪些队列的资源。基本思想是在最开始时大家按照minResource的比例分配所有资源,如果队列A需要的资源比按比例分配给你的资源多,则把这部分分配给其他队列。而当A新进来一批应用导致资源需求变化时,重新计算每个队列的资源需求,确定从哪些队列抢占资源。这里我猜测如果队列A在新应用来时拥有资源大于minResource则不会抢占,否则大家抢来抢去kill来kill是对资源的浪费。

另外可以注意到,抢占不是立即发生的,Yarn首先会选择优先级低的container,同时是RM发送给AM让其处理,而且强制回收也是有一定的时间间隔。这里主要是因为container已经在执行,直接kill掉无疑是浪费了他已经运行的部分。

Yarn的层级队列管理机制

yarn的队列是层级的,每个队列可以包含子队列,用户只能讲任务提交到叶子队列。管理员可以配置每个叶子队列对应的操作系统用户和用户组,也可以配置每个队列的管理员。管理员可以杀死队列中的任何应用程序,改变任何应用的优先级等。

队列的命名用.来连接,比如ROOTROOT.AROOT.A.A1

Capactiy scheduler

这里只是稍微简单讲一下Capacity scheduler,因为我们用的是fair schedule,下一章会详细讲那个。Capacity scheduler以队列为单位划分资源,每个队列可以设定一定比例的资源最低保证和使用上限;而当一个队列的资源有剩余时,可以将其剩余资源暂时共享给其他队列。

Capacity scheduler的一个典型过程是这样的:

  1. 应用程序初始化

    1. 应用程序提交到ResourceManager上后,ResourceManager会向Capacity Scheduler发送SchedulerEventType.APP_ADD事件。
    2. Capacity Scheduler收到该事件后蒋伟引用程序创建一个FiCaSchedulerApp对象跟踪和维护该应用程序的运行时信息,同时将程序提交到对应的叶子队列
    3. 叶子队列会对程序进行合法化检查,检查通过才算提交成功
  2. 资源调度:当ResourceManager收到nodemanager的心跳信息后,将向Capacity Scheduler发送NODE_UPDATE事件,CS收到这个事件后会干练件事情

    1. 处理心跳信息:

      1. node上的新启动了container,cs会向rm发送RMContainerEventType.LUNCH事件将该container标记为已启动,使得RM不会将该container作为超时回收的目标
      2. 对于运行完成的Container,资源调度器回收其资源
      3. Node本身的资源变化也会在此进行处理
    2. 资源分配:

      1. 选择队列:从根节点开始按照子队列资源使用率从小到大依次遍历。直到找到一个满足需求的队列
      2. 选定一个子队列后,CS按照提交时间对叶子节点的应用程序进行排序,依次遍历队列中的Application,选择合适的application
      3. 选定应用后,CS先满足优先级高的container,再满足低的。对于同一优先级,先选择满足本地性的container(即数据在本地存储的)

Fair Scheduler

Fair Schedule的配置文件分为两个部分,一部分在yarn-site.xml 里面,用于配置调度器级别的参数。另一个路径是可配置的,默认是fair-scheduler.xml,主要用于配置各个队列的资源量,权重等信息

Fair Schedule 也是接受scheduler的各种事件,然后进行处理。同Capacity Scheduler一样,他也是依次选择队列,应用程序和container三个级别的资源分配策略。不同的地方书中给了表格,见下表。

2016-08-10-162333

下面从代码级别讲一下一个application是如何通过Fair Scheduler被调度到一个Node上去的。
第一部分是提交一个application和applicationattempt到Scheduler:

  1. RMApp从NEW_SAVING状态转移到SUBMITTED状态,会发送一个AppAddedSchedulerEvent到Scheduler
  2. FScheduler收到AppAddedSchedulerEvent,会将应用加入相应的队列(将RMApp assign给一个FSLeafQueue)并给RMApp返回一个 RMAppEventType.APP_ACCEPTED事件。这里如果失败的话则会发送RMAppRejectedEvent
  3. RMApp收到RMAppEventType.APP_ACCEPTED事件会转为accepted状态,同时调用createAndStartNewAttempt函数生成一个新的RMAPPAttempt,并且给这个RMAPPAttempt发送RMAppStartAttemptEvent事件
  4. 新的RMAppAttemtp收到RMAppStartAttemptEvent后切换到submitted状态,并给Scheduler发送AppAttemptAddedSchedulerEvent
  5. Scheduler会将这个Attempt加入FSLeafQueue,同时返回一个RMAppAttemptEventType.ATTEMPT_ADDED事件
  6. RMAppAttempt收到RMAppAttemptEventType.ATTEMPT_ADDED事件,会从submited状态转化为scheduled状态

2016-08-10-162423

第二部分是真正的将这个applicationAttempt调度到一个node上去。这个是通过一个Scheduler的Node_Update事件触发。这个事件会出发函数FairScheduler.nodeUpdate(RMNode nm)。该函数首先处理在node上new launched以及comleted的container。之后调用attemptScheduling函数尝试分配资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
synchronized void attemptScheduling(FSSchedulerNode node) {
//do some prepare
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
FSQueue queue = reservedAppSchedulable.getQueue();
//do some prepare
node.getReservedAppSchedulable().assignReservedContainer(node);
}
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
if (!queueMgr.getRootQueue().assignContainer(node).equals(
Resources.none())) {
assignedContainers++;
assignedContainer = true;
}
if (!assignedContainer) { break; }
if (!assignMultiple) { break; }
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
updateRootQueueMetrics();
}

这一段代码中需要注意两点,第一它是先分配reserved container,没有reserved applications再分配新的applicationAttempt。这是因为scheduler的资源分配方式是增量预留,前面讨论过这种方式如果一个container申请资源暂时无法保证时,优先为应用程序预留节点上的资源,直到累计释放的空闲资源达到其要求。所以有这样一个检查。看代码时忘了了这个东西,想了好久才明白这段是干嘛的

第二点就是分配新资源是在23行的assignContainer(node)这个函数。从root队列逐渐往下分到leafQueue。最终调用FSAppAttempt.assignContainer(),分配时遵循如下原则:

For each priority, see if we can schedule a node local, rack local or off-switch request. Rack of off-switch requests may be delayed (not scheduled) in order to promote better locality.

参考文献


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/08/10/深入解析yarn架构设计与技术实现-资源调度器/