Anatomy a mr job run
MR job运行牵扯到5个独立的部分。
- client,负责提交mr job
- yarn resourcemanager。安排mr的计算资源
- yarn node manager。集群中启动和监控计算单位(compute containers)
- MR application master,管理一个mr job的运行。app master和mr task都被rsource manager调度并被node manager管理。
Job init
- 从resource manager得到一个新的app id(step 2)
- 检查job的output。如果output存在或者未指定,则报错
- 计算input的划分
- 将job相关文件copy到hdfs上,放在一个和job id相关的文件夹里
- submit job(step 4)
job Initialization
- resource manager 收到job提交以后.yarn scheduler 分配一个container给job,然后resource manager让这个container的node manager启动一个app master进程(step5)
- app master会启动一些bookkeeping objects来追踪job的运行并接受job的report (step6)
- app master得到input的split信息(step 7),并为每个split启动一个map task。同时创建一系列reduce task objects “determined by the maoreduce.job.reduces property”
task assignment
当任务不能本地执行时,app master会去resource manager请求新的container资源
task execution
当task被resource manager分配到具体的container上时, app master会去启动这个job。(step 9)然后这个job会去hdfs获得数据并执行。
task运行的main class是yarnChild,它在一个“dedicated jvm”中,不会影响node manager的运行
progress and status update
map的执行百分比直接对应于input被读了多少。
reduce首先分为三个阶段,copy,sort,reduce。如果一个reduce已经跑了一般的input,那么他的完成度是5/6,因为是copy(占1/3),sort(1/3)再加上一半的reduce。
child process 通过umbilical interface和parent通信。而client直接通过mrAppMaster得到运行状态。
Failure
task failure
第一种failure:task扔出一个runtime expection, 则task jvm将其报告给parent app master,然后app master将其标注为failed并释放container资源
第二种failure: jvm突然退出。这事node manager得到它退出的消息,并将这个消息通知app master。
当task失败时,app master会重启这个task,并且尽量不在之前那个node启动。当一个task失败4次时,他就不会被重启。这个值可以配置:mapreduce.map.maxattempts
和mapreduce.reduce.maxattempts
。
如果有些任务可以忍受小部分task失败,则可以配置mapreduce.map.faulyres.maxpercent
和mapreduce.reduce.failures.maxprecent
application master failure
app master和resource manager保持心跳通信,当app master失败是,resource manager会在一个新的container中启动一个新的instance。然后这个instance会利用job history来recover tasks state。
client会从app master获取mr运行状况,正常情况他会cache app master的地址。当app master失败时,client会得到一个timeout,这时他会去resource manager那里得到新的app master地址。
node manager failure
如果一个node manager挂了或者反映太慢导致了和resource manager的time out,resource manager就会将这个node manager移除。同时这个node上的task和app master也会被重启,即使是finish的map task也会被rerun,因为中间结果写在node的local store上。
resource manager failure
resource manager本身是一个单点,所以需要HA。HA主要是通过standby 的 resource manager和zookeeper完成。
这里有一些参考文献:YARN ResourceManager HA配置详解 ,ResourceManager High Availability-apache,YARN (MRv2) ResourceManager High Availability-Cloudera
shuffle and sort
Map side
map首先将输出写到一个内存buffer中,buffer默认大小100m,当达到spill precent时,就会尝试写磁盘。写磁盘时首先map会将发往不同reduce的数据分开,同时对于每个分块,后台线程会依据key做in memory排序,如果有combiner,则在这里运行combiner
每次到达spillthreshold,一个新的spill会被建立,当task完成时,会有一个spill files的合并。
reduce side
- coyp pharse:从不同的数据源copy 数据(map结束时会告诉app master,reduce周期性的从master那里询问map的host)
- 当数据拷贝时,会从内存写入硬盘,这里会进行一个简单的排序,并且会运行combiner
- sort pharse: 对文件分组进行归并排序。排序到最后一次归并时,直接进行reduce
- reduce task
sort
- partial sort(只有reduce内部数据有序):map和reduce什么都不用做,reduce会自动对内部的key进行排序输出 p257
- total sort: 利用randomSamoler采样,然后
job.setPartitionerClass
显示指定reduce间数据的划分。之后reduce自动对自己内部的数据排序 - secondary sort:第一列数据有序的前提下第二列也有序。
- 将两列组合成一个key,自己写一个
parttitiion extends Partitioner
并job.setPartitionerClass
,这个是要让第一列分到同一个reduce - 自己写
keyComoarator extends WritableComparatpr
并且job.setSortComparatorClass
,这事告诉rudece如何排序 - 写一个
GroupComparator extends WritableComoarator
并且job.setGroupingComoaratorClass
,这个set的作用是”Define the comparator that controls which keys are grouped together for a single call to reduce”,也就是确定一次调用reduce函数时他的一个key的iterator都会遍历那些值。 - 参考文献
- 将两列组合成一个key,自己写一个
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。