深入解析yarn架构设计与技术实现-资源调度器
Yarn资源调度器的基本框架
yarn的资源调度器是可以配置的,默认的是FIFO、CapacityScheduler和Fair Scheduler三种。
这三个scheduler都继承一个AbstractYarnScheduler。这个抽象类extends AbstractService implements ResourceScheduler 。extends Abstractservice说明是一个服务,而实现了ResourceScheduler的函数是scheduler的主要功能。 ResourceScheduler本质是个eventHandler。他一共处理10种事件,分别是:
- NODE_ADDED: 集群中增加一个节点
- NODE_REMOVED: 集群中移除一个节点
- NODE_RESOURCE_UPDATE: 集群中有一个节点的资源增加了
- NODE_LABELS_UPDATE:更新node labels。node labels可以用于划分一个cluster的node,每个node可以有一个label,具体见文档YARN Node Labels
- NODE_UPDATE:该时间是NodeManager通过心跳和ResourceManager通信时发送的,会汇报该node的资源使用情况,同时出发一次分配操作。
- APP_ADDED: 增加一个Application
- APP_REMOVED:移除一个application
- APP_ATTEMPT_ADDED:增加一个application Attempt
- APP_ATTEMPT_REMOVED:移除一个application attempt
- CONTAINER_EXPIRED:回收一个超时的container
Yarn的资源现在主要以cpu和内存为主,对于cpu,支持将物理核划分为虚拟核,但是调度时必须是虚拟核的整倍数。
YARN支持的调度语义:
- 请求某个节点上的特定资源量
- 请求某个特定机架上的特定资源量
- 将某些节点加入(或移除)黑名单,不再为自己分配这些节点上的资源
- 请求归还某些资源
不支持的调度语义:
- 请求任意节点上的特定资源量
- 请求任意机架上的特定资源量
- 请求一组或几组符合某种特质的资源
- 超细粒度资源。比如CPU性能要求、绑定CPU等
- 动态调整Container资源,允许根据需要动态调整Container资源量
资源调度模型
yarn采用双层资源调度模型,第一层中ResourceManager中的资源调度器将资源分配给ApplicationMaster,第二层ApplicationMaster在自己内部再分配资源。资源分配是个异步的过程:
- NodeManager通过周期心跳汇报节点信息
- ResourceManager为NodeManager返回一个心跳应答,包括需要释放的container列表等信息
- ResourceManager收到的NodeManager信息触发一个Node_UPDATED事件,之后会按照一定策略将该节点上的资源分配到各个应用,并将分配结果放到一个内存数据结构中
- ApplicationMaster向resourceManager发送心跳,获得最新分配的container资源
- ApplicationMaster将收到的新container分配给内部任务
资源保证机制
Yarn采用的是增量分配资源的机制,当应用程序申请资源暂时无法保证时,优先为应用程序预留节点上的资源,直到累计释放的空闲资源达到其要求。这样的分配虽然会导致浪费,但是不会使得一个application饿死
资源分配算法
为了支持多维资源分配,Yarn使用的是DRF(Dominant Resouce Fairness,主资源公平调度算法)。他首先将多资源问题简化,将每个需求中占用资源总额比例最大的那个资源作为主资源,之后对所有需求只看主资源进行划分。
划分的过程类似max-min算法,目标是最大化所有主资源中最小的,每次分配都是分配给目前dominant share中最小的需求者一份他需求的资源。具体可以参考这偏文章
资源抢占模型
yarn中每个队列可以设置一个最小资源量和最大资源量。其中最小资源是资源紧缺情况下每个队列需保证的资源量,而最大资源是极端情况下队列也不能超过的资源量。通常情况下,为了提高资源利用率,当队列不需要任何资源时,并不会满足其最小资源,而是将资源分配给其他队列,如资源调度器会将负载较轻的队列(比如A)的资源暂时分给负载重的队列B。而当负载较轻的队列A突然收到任务时,调度器会将分出去的资源还给该队列。
由于在A收到任务时B可能正在使用从A借过来的资源,立即杀了这些container意味着浪费了这些container正在运行的计算,所以一般是这样的步骤:
- SchedulingEditPolicy探测到需要抢占的资源,通过DROP_RESERVATION和PREEMPT_CONTAINER发送给ResourceManager
- ResourceManager调用ResourceScheduler的dropContainerReservation和preemptContainer函数,标注带抢占的container。
- ResourceManager把需要抢占的container和资源总量通过心跳发送给ApplicationMaster。
- ApplicationMaster收到请求后,可能做这三种操作:
- 杀死这些container
- 选择并杀死其他container以凑个需求的资源数量
- 不做处理,等待container自行结束释放资源或者被ResourceManager杀死
- SchedulingEditPolicy探测到一段时间内,ApplicationMaster未自行杀死约定container,则将这些container封装到KILL_CONTAINER事件中发送给ResourceManager。
- ResourceManager调用ResourceScheduler的killContainer函数杀死这些Container(通过和NodeManager的心跳真正实现杀死动作)
- ResourceManager收到来自ApplicationMaster的心跳,通过心跳将已经杀死的container列表发给他。
其中有个问题就是要确定抢占发生时到底抢占哪些队列的资源。基本思想是在最开始时大家按照minResource的比例分配所有资源,如果队列A需要的资源比按比例分配给你的资源多,则把这部分分配给其他队列。而当A新进来一批应用导致资源需求变化时,重新计算每个队列的资源需求,确定从哪些队列抢占资源。这里我猜测如果队列A在新应用来时拥有资源大于minResource则不会抢占,否则大家抢来抢去kill来kill是对资源的浪费。
另外可以注意到,抢占不是立即发生的,Yarn首先会选择优先级低的container,同时是RM发送给AM让其处理,而且强制回收也是有一定的时间间隔。这里主要是因为container已经在执行,直接kill掉无疑是浪费了他已经运行的部分。
Yarn的层级队列管理机制
yarn的队列是层级的,每个队列可以包含子队列,用户只能讲任务提交到叶子队列。管理员可以配置每个叶子队列对应的操作系统用户和用户组,也可以配置每个队列的管理员。管理员可以杀死队列中的任何应用程序,改变任何应用的优先级等。
队列的命名用.
来连接,比如ROOT
,ROOT.A
,ROOT.A.A1
。
Capactiy scheduler
这里只是稍微简单讲一下Capacity scheduler,因为我们用的是fair schedule,下一章会详细讲那个。Capacity scheduler以队列为单位划分资源,每个队列可以设定一定比例的资源最低保证和使用上限;而当一个队列的资源有剩余时,可以将其剩余资源暂时共享给其他队列。
Capacity scheduler的一个典型过程是这样的:
应用程序初始化
- 应用程序提交到ResourceManager上后,ResourceManager会向Capacity Scheduler发送SchedulerEventType.APP_ADD事件。
- Capacity Scheduler收到该事件后蒋伟引用程序创建一个FiCaSchedulerApp对象跟踪和维护该应用程序的运行时信息,同时将程序提交到对应的叶子队列
- 叶子队列会对程序进行合法化检查,检查通过才算提交成功
资源调度:当ResourceManager收到nodemanager的心跳信息后,将向Capacity Scheduler发送NODE_UPDATE事件,CS收到这个事件后会干练件事情
处理心跳信息:
- node上的新启动了container,cs会向rm发送RMContainerEventType.LUNCH事件将该container标记为已启动,使得RM不会将该container作为超时回收的目标
- 对于运行完成的Container,资源调度器回收其资源
- Node本身的资源变化也会在此进行处理
资源分配:
- 选择队列:从根节点开始按照子队列资源使用率从小到大依次遍历。直到找到一个满足需求的队列
- 选定一个子队列后,CS按照提交时间对叶子节点的应用程序进行排序,依次遍历队列中的Application,选择合适的application
- 选定应用后,CS先满足优先级高的container,再满足低的。对于同一优先级,先选择满足本地性的container(即数据在本地存储的)
Fair Scheduler
Fair Schedule的配置文件分为两个部分,一部分在yarn-site.xml 里面,用于配置调度器级别的参数。另一个路径是可配置的,默认是fair-scheduler.xml,主要用于配置各个队列的资源量,权重等信息
Fair Schedule 也是接受scheduler的各种事件,然后进行处理。同Capacity Scheduler一样,他也是依次选择队列,应用程序和container三个级别的资源分配策略。不同的地方书中给了表格,见下表。
下面从代码级别讲一下一个application是如何通过Fair Scheduler被调度到一个Node上去的。
第一部分是提交一个application和applicationattempt到Scheduler:
- RMApp从NEW_SAVING状态转移到SUBMITTED状态,会发送一个AppAddedSchedulerEvent到Scheduler
- FScheduler收到AppAddedSchedulerEvent,会将应用加入相应的队列(将RMApp assign给一个FSLeafQueue)并给RMApp返回一个 RMAppEventType.APP_ACCEPTED事件。这里如果失败的话则会发送RMAppRejectedEvent
- RMApp收到RMAppEventType.APP_ACCEPTED事件会转为accepted状态,同时调用createAndStartNewAttempt函数生成一个新的RMAPPAttempt,并且给这个RMAPPAttempt发送RMAppStartAttemptEvent事件
- 新的RMAppAttemtp收到RMAppStartAttemptEvent后切换到submitted状态,并给Scheduler发送AppAttemptAddedSchedulerEvent
- Scheduler会将这个Attempt加入FSLeafQueue,同时返回一个RMAppAttemptEventType.ATTEMPT_ADDED事件
- RMAppAttempt收到RMAppAttemptEventType.ATTEMPT_ADDED事件,会从submited状态转化为scheduled状态
第二部分是真正的将这个applicationAttempt调度到一个node上去。这个是通过一个Scheduler的Node_Update事件触发。这个事件会出发函数FairScheduler.nodeUpdate(RMNode nm)。该函数首先处理在node上new launched以及comleted的container。之后调用attemptScheduling函数尝试分配资源。
|
|
这一段代码中需要注意两点,第一它是先分配reserved container,没有reserved applications再分配新的applicationAttempt。这是因为scheduler的资源分配方式是增量预留,前面讨论过这种方式如果一个container申请资源暂时无法保证时,优先为应用程序预留节点上的资源,直到累计释放的空闲资源达到其要求。所以有这样一个检查。看代码时忘了了这个东西,想了好久才明白这段是干嘛的
第二点就是分配新资源是在23行的assignContainer(node)这个函数。从root队列逐渐往下分到leafQueue。最终调用FSAppAttempt.assignContainer(),分配时遵循如下原则:
For each priority, see if we can schedule a node local, rack local or off-switch request. Rack of off-switch requests may be delayed (not scheduled) in order to promote better locality.
参考文献
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:http://thousandhu.github.io/2016/08/10/深入解析yarn架构设计与技术实现-资源调度器/