深入解析yarn架构设计与技术实现-离线计算框架MapReduce

深入解析yarn架构设计与技术实现-MR

MR客户端

mr的客户端主要是通过ApplicationClientProtocol和MRClientProtocol两个rpc通信协议和ResourceManager以及ApplicationMaster通信。

具体来讲,在yarn上,client会创建一个YarnRunner作为真正的client。该client首先会调用ApplicationClientProtocol提交application,当application成功提交后会通过ClientCache得到clientServiceDelegate与ApplicationMaster直接交互来获取任务的执行信息以及控制任务。

MRAppmaster工作流程

yarn上的mr除了本地模式还有uber模式和no-uber模式

uber模式

uber模式是yarn针对小任务进行的优化的模式。对于小任务MRAppmaster让其重用一个container,按照先map后reduce的模式串行执行。如果一个任务满足下列条件,则被认为是小作业,会在uber模式执行。

  • map task不超过mapreduce.job.ubertask.maxmaps(默认9)
  • reduce Task数目不超过mapreduce.job.ubertask.maxreduces(默认是1)
  • 输入文件大小不超过mapreduce.job.ubertask.maxbytes(一个block大小)

链式作业不允许执行在uber模式下。

Non-Uber模式

Non-Uber模式下Map task和reduce task分为四个状态:

  • pending:刚启动,没有向resource发送资源请求
  • scheduled:已经向resource申请资源但没有分配到资源
  • assigned:已经分配到资源并且正在运行
  • completed:任务完成

Map task的生命周期为 scheduled->assigned->completed

Reduce task的生命周期为pending-> scheduled->assigned->completed

yanr上运行mr主要有两个关键问题,如何确定reduce的启动时间以及如何shuffle

为了防止reduce任务过早启动占用资源,需要在满足一下策略时才启动reduce task:

  1. mapreduce.job.reduce.slowstart.completedmaps:当map task完成比例达到该值时才启动reduce task。默认为0.05
  2. yarn.app.mapreduce.am.job.reduce.rampup.limit:在map task完成之前,最多启动reduce task的比例,默认为0.5
  3. yarn.app.mapreduce.am.job.reduce.preemption.limit:当map task需要资源但是暂时无法满足时,为了保证至少一个map task可以得到资源,最多可以抢占的reduce task的比例,默认为0.5

shuffle被yarn包装成一个shufle http server服务,在任务启动时让NM启动它。

MR作业的生命周期

mrAppMaster根据inputFormat首先将作业分解为若干mapTask和reduceTask,每个mapTask处理一个inputSplit数据分片。对于一个task由运行实例TaskAttemp完成。每个task可能会一个个顺序启动taskAttempt直到task执行成功,也可能同时启动多个运行实例,让他们竞争执行一个任务。

下图是一个不包含失败情况的job运行流程

2016-09-12-002746

资源申请

mr中map task和reduce task的资源申请和分配统一由containerallocator负责。如果是uber模式,则具体由LocalContainerAllocator进行资源分配,反之则由RMContainerAllocator实现。这里我们主要看RMContainerAllocator。

RMContainerAllocator的第一个核心函数是heartbeat()

1
2
3
4
5
6
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}

他最基本的动作是和RM维持心跳,告诉RM自己还活着。其次,他会调用getResources()函数请求资源。mr具体需要的资源信息是由ScheduledRequests 的一个对象维护的。该对象中有

1
2
3
4
5
private final LinkedList<TaskAttemptId> earlierFailedMaps
private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping
private final Map<TaskAttemptId, ContainerRequest> maps
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces

这样几个成员变量,其中maps和reduces就是maptask和reducetask的任务。containerRequest是一个描述资源需求的类,有

1
2
3
4
5
6
final TaskAttemptId attemptID;
final Resource capability;
final String[] hosts;
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;

分别描述资源量,node地址,刀架地址,优先级。earlierAttemptFailed不知道为啥注释掉了。因为一个文件在hdfs上会有3份,所以会申请node本地*3,同刀架*3还有any一共7份资源,任意一份申请成功则其他6个取消申请。

当RMContainerAllocator收到新的container资源后,会进一步对这些资源进行分配,这个在assign()函数中实现,其过程如下

  1. 判断收到的container包含的资源是否满足要求,如果不满足,则通过下次心跳通知resourceManager释放该container。
  2. 判断收到的container是否在黑名单中,如果是则寻找一个与该container匹配的任务并重新为该任务申请资源,同时通过下次心跳同时resourcemanager释放该container。
  3. 根据container的优先级分配资源。如果优先级为PRIORITY_FAST_FAIL_MAP,则分配给失败的map task;如果优先级为PRIORITY_REDUCE,则分配给reduce,否则分配给正常的map task。分配给正常的map task时会考虑locality。
  4. RMContainerAllocator发送事件给containerLauncherImpl,containerLauncherImpl异步的启动container

#推测执行

核心思想:某一时刻,判断一个任务是否拖后腿或者是否值得围棋启动备份任务时,采取的方法是:先假设为其启动了一个备份任务,估算期备份任务的完成时见estimatedendTime2;同样如果按照刺客该任务的计算速度,可以估算该任务最有可能的完成时间estimatedEndTime1,这样如果1与2的之差越大,则表示该任务备份执行的价值越大。默认情况下,DefaultSpeculator会启动一个线程,每隔一段时间扫描所有任务,预测其需不需要启动备份执行。

作业恢复

yarn的作业恢复是是任务级别的,采用基于日志的redo方式。在作业的运行过程中,记录作业或者任务的运行日志。当MRAppMaster重启恢复作业时,让作业沿着之前的断点继续开始执行。

当一个作业失败,MRAppMaster从第二次运行开始,将尝试读取前一次记录的日志,通过日志放回的方式回复作业运行前端状态,整个过程如下:

  1. MRAppMaster初始化时,解析前一次记录的事件日志,冰枪已经运行完成的任务存在completedTasksFromPreviousRun中
  2. MRAppMaster构造一个新的JobImpl对象,并将completedTasksPreviousRun传递给该对象的构造函数
  3. JobImpl经过初始化后,开始内部调度的TaskImpl,如果任务处于completedTasksFromPreviousRun中,则向TaslImpl发送一个T_RECOVER事件,以恢复该任务之前的运行状态;否则按照正常启动,向TaskImpl发送一个T_SCHEDULE时间
  4. TaskImpl收到T_RECOVER事件后,通过上次运行的信息恢复状态,即直接根据日志恢复为FAILED,KILLED或者SUCCEED状态。

这样,当MRAppMaster重启时,之前运行成功的任务就可以被恢复,但是之前正在运行的任务会在上次退出后被resourceManager回收container,所以只能重新运行。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/09/15/深入解析yarn架构设计与技术实现-离线计算框架MapReduce/