spark streaming 数据结构分析

这篇文章是在阅读了spark streaming Programming Guide以后的一些总结。主要是spark streaming和spark的数据结构DStream和RDD的对比,以及DStream数据接收到发送到spark engine的一些细节。

Streaming 和DStream

spark streaming是一个流式的处理框架,通过从实时数据流接入数据,然后划分为小的批量传递给spark engine处理。

Spark streaming对这种持续数据做了一个高级抽象DStream(discretized stream,离散数据流)。在DStream里面就包含了一系列的RDDs。而作用于DSteam的算子都会被转化为对内部RDD的操作。作用在DStream上的操作,其实可以看做作用在其中每一个RDD的操作。和RDD类似,他也是先记录计算逻辑,直到遇到类似Dstream的action才进行实际操作。

Receiver

每个Dstream都对应着一个接收器Receiver。在spark streaming中,接收器都是长期运行的,在分配cpu是cpu core要大于接收器的总数,负责该应用只会接收数据而不会处理数据。Receiver除了原生的文件系统socker和akka actor以外还有一些是需要增加额外依赖的。比如kafka,flume的receiver。同时,也可以自定义receiver

自定义Receiver主要就是实现onStart()和onStop()两个非阻塞函数。一般都是在onstart里面启动一个线程来读数据,读到的数据调用store()存到spart中。

transform

得到Dstream以后会对他进行一系列的transform,这里不具体讲transform有哪些,只是说一下transform有两个大类。一类就是普通的transform,比如map,flatmap等。另一类是基于窗口的transform,这种算子可以对于某一滑动窗口时间内的数据施加特定的transformation算子:

上面这个是官方的例子,每个窗口大小3个单元,滑动2个单元。这里的单元就是Dstream的批次间隔。每次滑动和大小都必须是单元的整倍数。举一个最简单的例子:countByWindow(windowLength,slideInterval)表示返回数据流在一个滑动窗口内的元素个数 。具体有哪些算子看手册好了。

还有一种算子是输出算子,就是将数据推送到外部系统的,比如print(),savaAsHadoopFiles()等。这里有一个比较有趣的是foreachRDD(func),表示该算子接收一个函数func,func将作用于DStream的每个RDD上。这个有一些使用的注意事项和例子,看手册

DStream生成的细节

到上面的了解程度其实Spark Streaming就可以用了,但是也就是能启动起来的程度,完全不知道数据怎么从哪些executor以什么形式进入spark engine。这一章就从原理层面讨论下这个问题。看的参考文献都是看源码分析的,但是我连spark源码都没看 ,看streaming有点远。。。

注:spark里面应该是有一个BlockManager的东西来管理rdd数据的分布,这个是属于spark的storage模块,这里就不展开了(主要我也不会 - -)。他是一个master-slave架构。这里结合对streaming的了解,猜测应该是实现了集群之间远程数据传输。

数据接收

启动spark streaming以后集群会选择一台executor作为某一个receiver的supervisor并标记为active状态。之后

  1. receiverSupervisor会启动对应的receiver。
  2. receiver会启动新线程接收数据,通过调用ReceiverSupervisor.store 一条一条接收。
  3. ReceiverSupervisor调用blockGenerator.addData进行数据填充,这里实际上是把数据存到一个ArrayBuffer[Any]的currentBuffer中。

把currentbuffer中的数据转化为数据块有两个BlockGenerator线程:

  1. 第一个线程定期地调用updateCurrentBuffer函数生成batch,然后将batch封装成block,时间是由spark.streaming.blockInterval参数配置的,默认是200ms。然后将生成的black放到一个blocksForPushing的队列里
  2. 第二个线程不断调用keepPushingBlocks函数从blocksForPushing里拿到blocks,然后调用pushBlock方法将Block储存到BlockManager中。

中间我就一直很奇怪Receiver的block最后写在哪里,感觉是写在本地很大,但是感觉写到其他机子上有很奇怪。直到找到stackoverflow这篇

One streaming batch corresponds to one RDD. That RDD will have n partitions, where n = batch interval / block interval. Let’s say you have the standard 200ms block interval and a batch interval of 2 seconds, then you will have 10 partitions. Blocks are created by a receiver, and each receiver is allocated in a host. So, those 10 partitions are in a single node and are replicated to a second node.

When the RDD is submitted for processing, the hosts running the task will read the data from that host. Tasks executing on the same node will have “NODE_LOCAL” locality, while tasks executing on other nodes will have “ANY” locality and will take longer.

其实数据就是写在receiver所在的那个机器,只不过rdd真正搞起来执行任务时其他executor可以从这个机器拿数据。至于rdd和block的关系,其实partition是一个逻辑分区的概念,对于的物理分区就是一个block。具体可以见这个文章

如何增加并行度

增加并行度有两个方法

  1. 优化里有讲一个参数spark.streaming.blockInterval,是这只接收器的阻塞间隔。这个是生成block的时间间隔,时间越短block约多,对应rdd的partition也就越多
  2. 用多个Receiver去读取数据,操作的时候再union。

参考文献

这个文章问了搞清楚这个block查了很多东西,主要就是对spark还不熟,直接看streaming底层太费劲了


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/07/28/spark-streaming-数据结构分析/