spark2.0源码阅读--spark作业提交

内容安排按照《apache spark 源码阅读》。这本书基于古老的spark 1.0.2。我看的代码基于2.0。这篇文章先粗略的过一遍spark的核心代码

SparkContext的初始化综述

原书3.2节 p27

sparkContext初始化时主要做这样几件事情:

  • 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv。sparkEnv的主要部件有
    • cacheManager:用于储存中间计算结果
    • mapOutputTracker:用来缓存MapStatus信息,并提供从MapOutMaster获取信息的功能。获取的Map out的信息根据master和worker有不同的用途:master上,用来记录ShuffleMapTasks所需的map out的源;worker上,仅仅作为cache用来执行shuffle计算。
    • shuffleManager:路由维护表
    • broadcastManage:广播
    • blockManager:块管理
    • connectionManager: 安全管理
    • httpFileServer: 文件存储服务器
    • sparkFilesDir:文件存储目录
    • metricsSystem:测量系统
  • 创建TaskScheduler。创建TaskScheduler最关键的一点就是根据master的环境变量来判断spark当前部署的方式,进而生成相应的schedulerBankend的子类
  • 依照上一步的TaskScheduler创建DAGScheduler并允许
  • 启动webUI

spark作业提交

原书4.1章,p33

以workcount为了讲一下spark作业提交:

sc.textFile("README").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).count()

1. sc.textFile("README")

生成一个HadoopRDD

2.flatMap(line=>line.split(" ")).map(word => (word, 1))

通过flatmap和map操作对rdd做转换。这两部生成的rdd都是窄依赖

3. reduceByKey(_ + _)

reduceByKey操作和上面两个map不同,因为他是一个宽依赖,也就是说这个操作时需要shuffle的,reduceByKey返回一个ShuffledRDD。这种函数一般都需要传入一个partition,默认是HashPartitioner。

另外reduceByKey是PariRddFunctions中的函数,却能被MappedRDD直接调用,这是scala的一个隐式转换语法特性,见函数rddToPairRDDFunctions

4. count()

spark中有两中操作,transformation和action。之前的操作都是transformation,只是记录数据的变化,不会真正触发程序对数据计算。而count是一个action,是真正会触发操作的。他们会调用sc.runjob()。

runjob中有一行会调用val cleanedFunc = clean(func),这里是对这个func做clean从而为序列化他做准备。

最终SparkContext的runjob会调用DAGScheduler的runjob,真正进入执行环节

前三步的结果都是弹性数据集(RDD),在每个数据集上记录了:

  • A list of partitions 由那些数据子集构成
  • A function for computing each split 子集中每条记录如何计算(计算因子)
  • A list of dependencies on other RDDs 依赖于的数据集(parent RDD)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 如何将该数据集的输出结果定位到下一个RDD中
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file) 位置信息

作业执行

作业执行的总体流程如下图

2016-10-18-214035

作业stage划分

原书4.2.1 p39

讲作业执行前花一点篇幅讲一下Actor Model和Akka

2016-10-18-215632

actor model中每个actor都是一个独立的实体,他们之间通过消息传递完成交互。每个actor的行为只有消息接收、消息处理、消息发送三种

上面作业提交之后DAGScheduler.runjob会将job提交到eventProcessLoop中(1.0是提交到一个actor中,其实是一样的),之后会有另一个线程将其拿出来处理。处理时会识别这个提交是JobSubmitted,于是调用dagScheduler.handleJobSubmitted()。hadleJobSubmitted主要负责依赖性分析,划分stage,生成finalStage进而ActiveJob。

handleJobSubmitted函数首先调用newResultStage函数,该函数会生成所有parentStages,然后生成最终的stage并返回。这个生成parentStages的过程是个递归的,自顶(finalStage)向下的过程。其中getParentStages会将rdd们划分成stage。当一个rdd是ShuffleDependency时,比如ShuffledRDD时,就会划分一个stage(代码如下)。这样,对一个rdd来说,从它开始往前追溯,所有不是ShuffleDependency的会划分当一个stage

1
2
3
4
5
6
7
8
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}

stage有这样几个参数red,numTask,parents: List[Stage]。说明他在rdd的基础上,已经知道有多少任务,同时也记录了自己的父Stage依赖。

之后handleJobSubmitted会依次调用submitStagesubmitWaitingStages。这两个函数的逻辑就是对于一个stage以及他的祖先stages,找到所有dependency是空的stage执行,其余的再次加到waitingStages这个hashmap里等待下一次调用。第一次调用submitStage会让能跑的stage先跑,将不能跑stage按照依赖关系加入waitingStages。加入Stage b依赖stage a,那么b一定在a后面加入,从而jobID一定大于a。之后调用一次submitWaitingStages时按照jobID从小到大submit即可。(这里好像也不是,如果submit是异步提交,那还是一次把所有的提交上去,然后有依赖的进入waitinglist,只不过前面有任务完成就会调用handleTaskCompletion函数,他会再调用submitWaiting尝试提交剩余task)

任务的创建和分发

原书4.2.3 P47

任务创建和分发的入口是DAGScheduler.submitMissingTasks(stage: Stage, jobId: Int)

该函数依照stage生成seq[Task[_]],具体分为ShuffleMapTask和ResultTask两种。task都会记录自己是哪个stage,哪个partition,同时还有一个域是taskBinary: Broadcast[Array[Byte]]。并且会将stage序列化广播到整个系统:

For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
For ResultTask, serialize and broadcast (rdd, func).

最后他会调用taskScheduler.submitTasks()提交一个taskset。提交taskset会调用backend.reviveOffers(),这里的backend有好几种,我们以CoarseGrainedSchedulerBackend为例讲讲接下来的流程。

CGSBackend的reviveOffers会发送一个ReviveOffers的消息,当然接收这个消息的也在CGSBackend。接收到这个消息他会调用一个makeOffers的函数,该函数

  1. 负责从executor中找到空闲的executor,
  2. 调用resourceOffers进行资源分配。资源分配是一个round-robin的策略来维持任务在集群上的load balance
  3. 并调用launchTask将任务发送给指定的executor。具体发送方式为executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

其中需要注意一点就是task序列化时的task依赖的文件和jar包是怎么传递的:关键在于TaskSetManager.resourceOffer中的一句话:val serializedTask: ByteBuffer = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/images/CoarseGrainedExecutorBackend-reviveOffers.png

这里再讲一讲backend中coarseGranied和fineGranied的区别,至少是mesos下的区别,这里的总结来自文档

  • In fine-grained mode, there is a single task in a single Spark executor that shares a single Mesos executor with the other Spark executors. In coarse-grained mode, there is a single Spark executor per Mesos executor with many Spark tasks.
  • coarseGrained默认首先预启动所有executor backend,所以他的overhead略大,但是每个任务执行前executor都是启动好的,所以他适合交互性任务
  • fine grained模式对于资源利用率比较高,适合batch任务和流任务

任务执行

原书4.2.4节 p53

首先executorBackend需要注册一个executor,在注册了executor的基础上,接收到LaunchTask事件后会调用executor.launchTask()函数。该函数会新建一个TaskRunner,并且用线程池执行它:threadPool.execute(taskrunner)

taskrunner会经过这样几个步骤:

  1. updateDependencies(taskFiles, taskJars):下载依赖文件

  2. 反序列化task

  3. 执行task.run():分为shuffleMapTask和ReduceTask两种

    • shuffleMapTask:因为shuffleMapTask的结果是要写到中间文件后执行shuffle操作的,所以最关键的一句就是writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]).这里首先都是要调用iterator来对结果进行遍历。rdd的iterator代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
    * This should ''not'' be called by users directly, but is available for implementors of custom
    * subclasses of RDD.
    */
    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
    } else {
    computeOrReadCheckpoint(split, context)
    }
    }

    主要是对rdd的每一条数据进行转换,并且最终会调用rdd的各种compute函数。compute函数就是对数据进行计算。

    之后会调用writer.write.这个writer根据shuffle的不同分为UnsafeShuffleWriter,BypassMergeSortShuffleWriterSortShuffleWriter,主要是通过不同的策略将task结果输出

    • ReduceTask: 主要是执行func(context, rdd.iterator(partition, context))。这个func是一个val func: (TaskContext, Iterator[_]) => _,
  4. 返回结果: Task执行结束后会将结果返回给各个组件,并更新自己的状态execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

结果返回

p48

2016-10-20-123615

schedulerBackend接收到statusUpdate之后做出如下操作:

  • 任务执行成功,将其从监视列表删除;如果作业中所有任务都完成,将占用资源释放。否则分配新的任务给刚完成任务的executor
  • 分配新的任务是由taskSchedulerImpl调用makeoffers(executorId)执行的。所以说spark执行task是依照executor个数分批执行
  • 如果ReduceTask执行成功,则DABScheduler发送TaskSucceed来通知整个作业的执行情况给监听者,比如jobWaiter。
  • 该消息会被DAGScheduler.runjob的waiter.completionFuture.value.get收到,如果程序成功执行则整个任务流程结束

储存机制

存储机制2.0变化比较大

shuffle结果的写入和读取

4.3.1章 p71

当一个task完成时,DAGScheduler会收到消息并且调用hadnleTaskCompletion。在这个函数中会判断task的类型,如果是reduceTask,就更新task完成数目并判断整个job是否完成。如果是shuffleMapTask,其中很重要的一步是将task输出的结果注册到MapOutputTrackerMaster中:mapOutputTracker.registerMapOutputsregisterMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false)。MapOutputTracker中实际存的是一个map<shuffleId, Array[MapStatus]>,MapStatus里存了文件的BlockManagerId。后面的task则会读取这些结果。

2016-10-20-153836

下面我们具体看一看shuffle怎么写入和读取

shuffle写入

shufflerMapTask.run()执行以后,会调用一个writer,这里以sortShuffleWriter为例进一步跟进shuffle写入。

sortShuffleWriter中的writer函数首先会建立一个tmp文件并生成一个shuffleBlockID(形如"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId),之后把相关的数据写到磁盘里(sorter.writePartitionedFile)并且建立索引(shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)。写入的操作调用的是blockManager.getDiskWrite

shuffle读取

shuffledRDD的compute函数是读取shuffleMapTask结果的起点

1
2
3
4
5
6
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}

这里的reader默认是BlockStoreShuffleReader,因为hashShuffle已经被2.0移除了SPARK-14667。这个class里面只有一个read函数,是读取shuffledata的逻辑所在。该函数主要有三步:

BlockStoreShuffleReader用来读取上游任务计算结果,read方法处理步骤:

  1. 从远端节点或本地读取中间计算结果
  2. 对InterruptIbleIterator执行聚合
  3. 对InterruptibleIterator排序,使用ExternalSorter的insertAll方法。

2.0可以选择combine是在map端做还是在reduce端做。

Store

原书4.3.2 p80

原来的存储系统都是有cacheManage这个组件的,在2.0里面这个组件被删了,getOrElseUpdate方法被实现在了blockManager中SPARK-12817。现在储存模块的结构如下图

Master 负责:

  • 接受各个 Slaves 注册
  • 保存整个 application 各个 blocks 的元数据
  • 给各个 Slaves 下发命令

Slave 负责:

  • 管理存储在其对应节点内存、磁盘上的 Blocks 数据
  • 接收并执行 Master 的命令
  • 更新 block 信息给 Master

储存模块的入口在SparkEnv中,在driver端构造 SparkContext 时会创建 SparkEnv 实例 _env,这时会调用create函数创建BlockManager。在 worker 进程起来的的时候,object CoarseGrainedExecutorBackend 初始化时会通过调用 SparkEnv#createExecutorEnv,在该函数中会创建 executor 端的 BlockManager,也即 Slave。这之后,CoarseGrainedExecutorBackend 才向 driver 注册 executor,然后再构造 Executor 实例。

BlockManager 实例在被创建后,不能直接使用,必须调用其 initialize 方法才能使用。对于 Master,是在 BlockManager 创建后就调用了 initialize 方法;对于 Slave,是在 Executor 的构造函数中调用 initialize 方法进行初始化。

initialize 方法中,会进行 BlockManager 的注册,具体操作时通过 driverRpcEndpointRef 发送 RegisterBlockManager 消息。

数据读写

当storagelevel不是空时,RDD.iterator会调用getOrCompute函数,该函数通过rddid和parititionid算出blockid,之后调用blockManager.getOrElseUpdate:

  • 如果该block存在,getOrElseUpdate返回一个blockResult
  • 如果不存在则getOrElseUpdat调用doPutIterator写入这个block。
    • 如果写入成功,getOrElseUpdat返回blockResult
    • 写入失败则返回一个iter。写入失败的原因常常是因为data太大放不进内存同时不允许放进磁盘

无论接收到什么,getOrCompute函数都会将其打包成一个InterruptibleIterator

数据写入流程

调用getOrElseUpdate函数时会判断block存在不存在,如果不存在会调用doPutIterator函数:

  • 步骤一:写入数据:
    • 如果需要写内存会调用putIteratorAsValues(非序列化)和putIteratorAsBytes(序列化)
      • 这两个函数都是gradually unroll的方式,物化一点写一点
      • 写不下了会根据情况写磁盘(diskStore.put(blockId))或者设置剩余数据的iter
    • 只写磁盘会调用diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
  • 步骤二:reportBlockStatus是通过和master的心跳异步同步的。(通过RPC 给Master endpoint发消息)
  • 步骤三:是否需要replicate:replicate是调用blockTransferService.uploadBlockSync通过Netty传输数据,传输过程是同步的,因此会block程序

也就是说blockManager在写入时会调用MemoryStore,DiskStore,blockTransferService以及rpc模块

数据读取流程

数据读取会先读取本地的block,如果没有再读取远程的block。

  • getLocalValues: 按照存储level读取本地,读取本地的时候需要给该block加一个读锁
  • getRemoteValues:从远程读取block的时候是一个同步的过程,不过是通过fetchBlcokSyncThreadUtils.awaitResult(result.future, Duration.Inf)同步的。

后面的计划

  • 接着看源码,把部署在不同的平台情况的看完
  • 是详细想想fine-grained oarse-grained时executor task core的关系
  • 详细看看shuffle,几种实现的区别,还有不同参数设置对应的实现
  • 看看序列化对程序的影响以及序列化的设置

参考文献


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/10/22/spark2-0源码阅读-spark作业提交/