内容安排按照《apache spark 源码阅读》。这本书基于古老的spark 1.0.2。我看的代码基于2.0。这篇文章先粗略的过一遍spark的核心代码
SparkContext的初始化综述
原书3.2节 p27
sparkContext初始化时主要做这样几件事情:
- 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv。sparkEnv的主要部件有
- cacheManager:用于储存中间计算结果
- mapOutputTracker:用来缓存MapStatus信息,并提供从MapOutMaster获取信息的功能。获取的Map out的信息根据master和worker有不同的用途:master上,用来记录ShuffleMapTasks所需的map out的源;worker上,仅仅作为cache用来执行shuffle计算。
- shuffleManager:路由维护表
- broadcastManage:广播
- blockManager:块管理
- connectionManager: 安全管理
- httpFileServer: 文件存储服务器
- sparkFilesDir:文件存储目录
- metricsSystem:测量系统
- 创建TaskScheduler。创建TaskScheduler最关键的一点就是根据master的环境变量来判断spark当前部署的方式,进而生成相应的schedulerBankend的子类
- 依照上一步的TaskScheduler创建DAGScheduler并允许
- 启动webUI
spark作业提交
原书4.1章,p33
以workcount为了讲一下spark作业提交:
sc.textFile("README").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).count()
1. sc.textFile("README")
生成一个HadoopRDD
2.flatMap(line=>line.split(" ")).map(word => (word, 1))
通过flatmap和map操作对rdd做转换。这两部生成的rdd都是窄依赖
3. reduceByKey(_ + _)
reduceByKey操作和上面两个map不同,因为他是一个宽依赖,也就是说这个操作时需要shuffle的,reduceByKey返回一个ShuffledRDD。这种函数一般都需要传入一个partition,默认是HashPartitioner。
另外reduceByKey是PariRddFunctions中的函数,却能被MappedRDD直接调用,这是scala的一个隐式转换语法特性,见函数rddToPairRDDFunctions
4. count()
spark中有两中操作,transformation和action。之前的操作都是transformation,只是记录数据的变化,不会真正触发程序对数据计算。而count是一个action,是真正会触发操作的。他们会调用sc.runjob()。
runjob中有一行会调用val cleanedFunc = clean(func)
,这里是对这个func做clean从而为序列化他做准备。
最终SparkContext的runjob会调用DAGScheduler的runjob,真正进入执行环节
前三步的结果都是弹性数据集(RDD),在每个数据集上记录了:
- A list of partitions 由那些数据子集构成
- A function for computing each split 子集中每条记录如何计算(计算因子)
- A list of dependencies on other RDDs 依赖于的数据集(parent RDD)
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 如何将该数据集的输出结果定位到下一个RDD中
- Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file) 位置信息
作业执行
作业执行的总体流程如下图
作业stage划分
原书4.2.1 p39
讲作业执行前花一点篇幅讲一下Actor Model和Akka
actor model中每个actor都是一个独立的实体,他们之间通过消息传递完成交互。每个actor的行为只有消息接收、消息处理、消息发送三种
上面作业提交之后DAGScheduler.runjob会将job提交到eventProcessLoop中(1.0是提交到一个actor中,其实是一样的),之后会有另一个线程将其拿出来处理。处理时会识别这个提交是JobSubmitted,于是调用dagScheduler.handleJobSubmitted()。hadleJobSubmitted主要负责依赖性分析,划分stage,生成finalStage进而ActiveJob。
handleJobSubmitted
函数首先调用newResultStage
函数,该函数会生成所有parentStages,然后生成最终的stage并返回。这个生成parentStages的过程是个递归的,自顶(finalStage)向下的过程。其中getParentStages会将rdd们划分成stage。当一个rdd是ShuffleDependency时,比如ShuffledRDD时,就会划分一个stage(代码如下)。这样,对一个rdd来说,从它开始往前追溯,所有不是ShuffleDependency的会划分当一个stage
|
|
stage有这样几个参数red,numTask,parents: List[Stage]。说明他在rdd的基础上,已经知道有多少任务,同时也记录了自己的父Stage依赖。
之后handleJobSubmitted
会依次调用submitStage
和submitWaitingStages
。这两个函数的逻辑就是对于一个stage以及他的祖先stages,找到所有dependency是空的stage执行,其余的再次加到waitingStages这个hashmap里等待下一次调用。第一次调用submitStage
会让能跑的stage先跑,将不能跑stage按照依赖关系加入waitingStages。加入Stage b依赖stage a,那么b一定在a后面加入,从而jobID一定大于a。之后调用一次submitWaitingStages
时按照jobID从小到大submit即可。(这里好像也不是,如果submit是异步提交,那还是一次把所有的提交上去,然后有依赖的进入waitinglist,只不过前面有任务完成就会调用handleTaskCompletion函数,他会再调用submitWaiting尝试提交剩余task)
任务的创建和分发
原书4.2.3 P47
任务创建和分发的入口是DAGScheduler.submitMissingTasks(stage: Stage, jobId: Int)
。
该函数依照stage生成seq[Task[_]],具体分为ShuffleMapTask和ResultTask两种。task都会记录自己是哪个stage,哪个partition,同时还有一个域是taskBinary: Broadcast[Array[Byte]]
。并且会将stage序列化广播到整个系统:
For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
For ResultTask, serialize and broadcast (rdd, func).
最后他会调用taskScheduler.submitTasks()提交一个taskset。提交taskset会调用backend.reviveOffers(),这里的backend有好几种,我们以CoarseGrainedSchedulerBackend为例讲讲接下来的流程。
CGSBackend的reviveOffers会发送一个ReviveOffers的消息,当然接收这个消息的也在CGSBackend。接收到这个消息他会调用一个makeOffers的函数,该函数
- 负责从executor中找到空闲的executor,
- 调用resourceOffers进行资源分配。资源分配是一个round-robin的策略来维持任务在集群上的load balance
- 并调用launchTask将任务发送给指定的executor。具体发送方式为
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
其中需要注意一点就是task序列化时的task依赖的文件和jar包是怎么传递的:关键在于TaskSetManager.resourceOffer中的一句话:val serializedTask: ByteBuffer = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
这里再讲一讲backend中coarseGranied和fineGranied的区别,至少是mesos下的区别,这里的总结来自文档:
- In fine-grained mode, there is a single task in a single Spark executor that shares a single Mesos executor with the other Spark executors. In coarse-grained mode, there is a single Spark executor per Mesos executor with many Spark tasks.
- coarseGrained默认首先预启动所有executor backend,所以他的overhead略大,但是每个任务执行前executor都是启动好的,所以他适合交互性任务
- fine grained模式对于资源利用率比较高,适合batch任务和流任务
任务执行
原书4.2.4节 p53
首先executorBackend需要注册一个executor,在注册了executor的基础上,接收到LaunchTask事件后会调用executor.launchTask()函数。该函数会新建一个TaskRunner,并且用线程池执行它:threadPool.execute(taskrunner)
taskrunner会经过这样几个步骤:
updateDependencies(taskFiles, taskJars)
:下载依赖文件反序列化task
执行task.run():分为shuffleMapTask和ReduceTask两种
- shuffleMapTask:因为shuffleMapTask的结果是要写到中间文件后执行shuffle操作的,所以最关键的一句就是
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
.这里首先都是要调用iterator来对结果进行遍历。rdd的iterator代码如下:
123456789101112/*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*/final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}主要是对rdd的每一条数据进行转换,并且最终会调用rdd的各种compute函数。compute函数就是对数据进行计算。
之后会调用writer.write.这个writer根据shuffle的不同分为
UnsafeShuffleWriter
,BypassMergeSortShuffleWriter
和SortShuffleWriter
,主要是通过不同的策略将task结果输出- ReduceTask: 主要是执行
func(context, rdd.iterator(partition, context))
。这个func是一个val func: (TaskContext, Iterator[_]) => _,
- shuffleMapTask:因为shuffleMapTask的结果是要写到中间文件后执行shuffle操作的,所以最关键的一句就是
返回结果: Task执行结束后会将结果返回给各个组件,并更新自己的状态
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
结果返回
p48
schedulerBackend接收到statusUpdate之后做出如下操作:
- 任务执行成功,将其从监视列表删除;如果作业中所有任务都完成,将占用资源释放。否则分配新的任务给刚完成任务的executor
- 分配新的任务是由taskSchedulerImpl调用makeoffers(executorId)执行的。所以说spark执行task是依照executor个数分批执行
- 如果ReduceTask执行成功,则DABScheduler发送TaskSucceed来通知整个作业的执行情况给监听者,比如jobWaiter。
- 该消息会被DAGScheduler.runjob的
waiter.completionFuture.value.get
收到,如果程序成功执行则整个任务流程结束
储存机制
存储机制2.0变化比较大
shuffle结果的写入和读取
4.3.1章 p71
当一个task完成时,DAGScheduler会收到消息并且调用hadnleTaskCompletion。在这个函数中会判断task的类型,如果是reduceTask,就更新task完成数目并判断整个job是否完成。如果是shuffleMapTask,其中很重要的一步是将task输出的结果注册到MapOutputTrackerMaster中:mapOutputTracker.registerMapOutputsregisterMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false)
。MapOutputTracker中实际存的是一个map<shuffleId, Array[MapStatus]>
,MapStatus里存了文件的BlockManagerId。后面的task则会读取这些结果。
下面我们具体看一看shuffle怎么写入和读取
shuffle写入
shufflerMapTask.run()执行以后,会调用一个writer,这里以sortShuffleWriter为例进一步跟进shuffle写入。
sortShuffleWriter中的writer函数首先会建立一个tmp文件并生成一个shuffleBlockID(形如"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
),之后把相关的数据写到磁盘里(sorter.writePartitionedFile
)并且建立索引(shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp
)。写入的操作调用的是blockManager.getDiskWrite
shuffle读取
shuffledRDD的compute函数是读取shuffleMapTask结果的起点
|
|
这里的reader默认是BlockStoreShuffleReader,因为hashShuffle已经被2.0移除了SPARK-14667。这个class里面只有一个read函数,是读取shuffledata的逻辑所在。该函数主要有三步:
BlockStoreShuffleReader用来读取上游任务计算结果,read方法处理步骤:
- 从远端节点或本地读取中间计算结果
- 对InterruptIbleIterator执行聚合
- 对InterruptibleIterator排序,使用ExternalSorter的insertAll方法。
2.0可以选择combine是在map端做还是在reduce端做。
Store
原书4.3.2 p80
原来的存储系统都是有cacheManage这个组件的,在2.0里面这个组件被删了,getOrElseUpdate方法被实现在了blockManager中SPARK-12817。现在储存模块的结构如下图
Master 负责:
- 接受各个 Slaves 注册
- 保存整个 application 各个 blocks 的元数据
- 给各个 Slaves 下发命令
Slave 负责:
- 管理存储在其对应节点内存、磁盘上的 Blocks 数据
- 接收并执行 Master 的命令
- 更新 block 信息给 Master
储存模块的入口在SparkEnv中,在driver端构造 SparkContext 时会创建 SparkEnv 实例 _env,这时会调用create函数创建BlockManager。在 worker 进程起来的的时候,object CoarseGrainedExecutorBackend
初始化时会通过调用 SparkEnv#createExecutorEnv
,在该函数中会创建 executor 端的 BlockManager,也即 Slave。这之后,CoarseGrainedExecutorBackend 才向 driver 注册 executor,然后再构造 Executor 实例。
BlockManager 实例在被创建后,不能直接使用,必须调用其 initialize
方法才能使用。对于 Master,是在 BlockManager 创建后就调用了 initialize
方法;对于 Slave,是在 Executor 的构造函数中调用 initialize
方法进行初始化。
在 initialize
方法中,会进行 BlockManager 的注册,具体操作时通过 driverRpcEndpointRef 发送 RegisterBlockManager
消息。
数据读写
当storagelevel不是空时,RDD.iterator会调用getOrCompute函数,该函数通过rddid和parititionid算出blockid,之后调用blockManager.getOrElseUpdate:
- 如果该block存在,getOrElseUpdate返回一个blockResult
- 如果不存在则getOrElseUpdat调用doPutIterator写入这个block。
- 如果写入成功,getOrElseUpdat返回blockResult
- 写入失败则返回一个iter。写入失败的原因常常是因为data太大放不进内存同时不允许放进磁盘
无论接收到什么,getOrCompute函数都会将其打包成一个InterruptibleIterator
数据写入流程
调用getOrElseUpdate函数时会判断block存在不存在,如果不存在会调用doPutIterator
函数:
- 步骤一:写入数据:
- 如果需要写内存会调用
putIteratorAsValues
(非序列化)和putIteratorAsBytes
(序列化)- 这两个函数都是gradually unroll的方式,物化一点写一点
- 写不下了会根据情况写磁盘(diskStore.put(blockId))或者设置剩余数据的iter
- 只写磁盘会调用
diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
- 如果需要写内存会调用
- 步骤二:reportBlockStatus是通过和master的心跳异步同步的。(通过RPC 给Master endpoint发消息)
- 步骤三:是否需要replicate:replicate是调用blockTransferService.uploadBlockSync通过Netty传输数据,传输过程是同步的,因此会block程序
也就是说blockManager在写入时会调用MemoryStore,DiskStore,blockTransferService以及rpc模块
数据读取流程
数据读取会先读取本地的block,如果没有再读取远程的block。
- getLocalValues: 按照存储level读取本地,读取本地的时候需要给该block加一个读锁
- getRemoteValues:从远程读取block的时候是一个同步的过程,不过是通过
fetchBlcokSync
的ThreadUtils.awaitResult(result.future, Duration.Inf)
同步的。
后面的计划
- 接着看源码,把部署在不同的平台情况的看完
- 是详细想想fine-grained oarse-grained时executor task core的关系
- 详细看看shuffle,几种实现的区别,还有不同参数设置对应的实现
- 看看序列化对程序的影响以及序列化的设置
参考文献
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:http://thousandhu.github.io/2016/10/22/spark2-0源码阅读-spark作业提交/