Co2y's Blog

Spark vs Flink

Spark和Flink是当今最主流的两个处理平台,早期Flink主要针对流处理,与之对应的是Spark Streaming,但随着两者的发展,许多地方逐渐统一。无论是Spark还是Flink都在解决MR针对迭代型数据处理需要反复存/读取硬盘的问题,并且在传统map,reduce算子的基础上扩充了许多算子。本文主要包括两个方面,Spark、Flink的介绍和比较。

介绍

Spark

Spark Core

RDD

Spark Core是以RDD为基础的,RDD包括两类操作transformation和action。transformation返回RDD,延迟执行,直到触发action。通常每一个transformation() 方法返回(new)一个 RDD,但是某些transformation() 比较复杂,会包含多个子 transformation()。总结一下RDD有如下特性:

  1. 它是不变的数据结构存储
  2. 它是支持跨集群的分布式数据结构
  3. RDD包含多个partition,可以根据数据记录的key对结构进行分区
  4. 提供了粗粒度的操作,且这些操作都支持分区
  5. 它将数据存储在内存中,从而提供了低延迟性,RDD可以存放于不同的存储介质中,RDD在内存吃紧时,会转储到磁盘中

架构

整个集群分为master节点和worker节点。一个运行在集群上的application包括driver和executors,driver(application.main())是由yarn调度的可以在master上也可以在worker上。executor在worker节点上,是具体工作的进程,它由许多的tasks组成,每个task是一个线程,线程数等于partition的个数。

job生成

每个application包括多个job,每个job对应rdd action,并且rdd包括多个stage,stage的划分是在逻辑执行图中由shuffle/result操作划分的,下面会介绍具体划分算法。整个Spark的运行过程由job =》逻辑执行图 =》DAG物理执行图 =》tasks。

  1. job会产生哪些RDD由transformation()语义决定。一些transformation(),比如cogroup()会被很多其他操作用到。
  2. RDD本身的依赖关系由transformation()生成的每一个RDD本身语义决定。如CoGroupedRDD依赖于所有参加cogroup()的RDDs。
  3. RDD中partition依赖关系分为NarrowDependency和ShuffleDependency。前者是完全依赖,后者是部分依赖。NarrowDependency里面又包含多种情况,只有前后两个RDD的partition个数以及partitioner都一样,才会出现NarrowDependency。Stage的划分算法就是从后往前推算,遇到ShuffleDependency就断开,遇到NarrowDependency就将其加入该stage。每个 stage里面task的数目由该stage最后一个RDD中的partition个数决定。
  4. 从数据处理逻辑的角度来看,MapReduce 相当于 Spark 中的 map() + reduceByKey(),但严格来讲 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,还包括shuffle的部分。相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。

Streaming

再说Spark Streaming,它对应的DStream是一段时间的RDD。Spark Streaming接收实时的输入数据流,然后将这些数据切分为批数据供Spark引擎处理,Spark引擎将数据生成最终的结果数据。DStream既可以利用从Kafka, Flume等源获取的输入数据流创建,也可以在其他DStream的基础上通过高阶函数获得。通常Spark streaming的事务是结合kafka来保证的。

基本概念

Flink核心是流处理引擎,在此之上提供了两个核心API DataStream和DataSet以及之上的Table API和SQL,它的SQL也是和Calcite集成的。此外Flink还提供了ML机器学习库和图计算库,这些都和Spark非常相似。

DataSet类似于Spark中的DataSet,DataStream类似于DStream,但是Spark的各个抽象是建立在RDD基础上的,DataSet是对RDD的封装,DataFrame是对RDD的封装,DStream内部也是RDD。而Flink强调万物皆流,它的DataSet和DataStream是在同一个底层抽象上两个不同的抽象,两者目前还无法统一,FLINK-2320正在解决这个问题。

时间窗口

对于流处理系统来说,流入的消息不存在上限,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接。消息的分段即称为窗口,流处理系统支持的窗口有很多类型,最常见的就是时间窗口,基于时间间隔对消息进行分段处理。

对于目前大部分流处理系统来说,时间窗口一般是根据Task所在节点的本地时钟进行切分,这种方式实现起来比较容易,不会产生阻塞。但是可能无法满足某些应用需求,比如:消息本身带有时间戳,用户希望按照消息本身的时间特性进行分段处理。

由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果。

Flink支持3种类型的时间窗口,分别适用于用户对于时间窗口不同类型的要求:

Operator Time

根据Task所在节点的本地时钟来切分的时间窗口。

Event Time

消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入Task,所以Task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放,如果乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。 Flink借鉴了Google的MillWheel项目,通过WaterMark来支持基于Event Time的时间窗口。

当操作符通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后才能开始数据处理。但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入,Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个操作符节点,然后也将WaterMark发送到下一个操作符节点。

为了保证能够处理所有属于某个时间窗口的消息,操作符必须等到大于这个时间窗口的WaterMark之后才能开始对该时间窗口的消息进行处理,相对于基于Operator Time的时间窗口,Flink需要占用更多内存,且会直接影响消息处理的延迟时间。对此,一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息。

对于基于Event Time时间窗口的操作符来说,流入WaterMark的时间戳与当前节点的时钟一致是最简单理想的状况,但是在实际环境中是不可能的,由于消息的乱序以及前面节点处理效率的不同,总是会有某些消息流入时间大于其本身的时间戳,真实WaterMark时间戳与理想情况下WaterMark时间戳的差别称为Time Skew,如图所示。

Time Skew决定了该WaterMark与上一个WaterMark之间的时间窗口所有数据需要缓存的时间,Time Skew时间越长,该时间窗口数据的延迟越长,占用内存的时间也越长,同时会对流处理系统的吞吐量产生负面影响。

Ingress Time

有时消息本身并不带有时间戳信息,但用户依然希望按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时可以在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。

多流合并

一个kafka的partition就是一个流,一个kafka topic的多个partition就是多个独立的流(offset彼此独立增长)。多个kafka topic显然是多个独立的流。流式统计经常需要把多个流合并统计到一起。这里会有两个问题,一个是不同流之间通常是无序的,如何保证一致性,这可以通过时间窗口等待来解决,但是如果不同流速度差异很大,因为内存是有限的,不能一直等待,这时就需要背压机制。

背压

像Flink这样的流处理系统需要能够优雅地应对背压问题。背压通常产生于这样一种场景:当一个系统接收数据的速率高于它在一个瞬时脉冲内能处理的数据。许多日常问题都会导致背压。例如,垃圾回收卡顿可能会导致流入的数据快速堆积,或者一个数据源可能生产数据的速度过快。背压如果不能得到正确地处理,可能会导致资源被耗尽或者甚至出现更糟的情况导致数据丢失。

直观的做法是上游产生的数据放到一个缓冲区中,并且做持久化,例如数据源本身是kafka。所以Flink中的背压是在传输过程中实现的,也就是整个pipeline的速率是可控的,对source进行限流来适配整个pipeline中最慢的组件。Flink运行时的构造部件是operators以及streams。每一个operator消费一个中间/过渡状态的流,对它们进行转换,然后生产一个新的流。Flink使用有效的分布式阻塞队列来作为有界的缓冲区,也就是每一个task都有一个缓冲区,从而使得task生产数据的速度跟消费的速度对等。

比较

这里比较的主要是两者在流处理方面的差异。

执行引擎

在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。

与其它流相比,Spark流处理是基于一个微批次来处理数据流的而非每一个事件。Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型。Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量。同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量。当然这一方面Spark也在进步,它也在把批处理与实时处理都作为计算引擎的地位。

容错

Flink和Spark容错实现也不一样。Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列。每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息,称为Lineage。Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。Lineage分为宽依赖和窄依赖,这其实对应的是stage的划分,窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用,而宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区。所以窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。有宽依赖的时候,需要在适当的时机设置数据检查点。

而Flink参考Chandy和Lamport算法实现了一致性快照。它按照用户自定义的分布式快照间隔时间,Flink会定时在所有数据源中插入一种特殊的快照标记消息,这些快照标记消息和其他消息一样在DAG中流动,但是不会被用户定义的业务逻辑所处理,每一个快照标记消息都将其所在的数据流分成两部分:本次快照数据和下次快照数据。快照标记消息沿着DAG流经各个操作符,当操作符处理到快照标记消息时,会对自己的状态进行快照,并存储起来。当一个操作符有多个输入的时候,Flink会将先抵达的快照标记消息及其之后的消息缓存起来,当所有的输入中对应该次快照的快照标记消息全部抵达后,操作符对自己的状态快照并存储,之后处理所有快照标记消息之后的已缓存消息。操作符对自己的状态快照并存储可以是异步与增量的操作,并不需要阻塞消息的处理。当所有的Data Sink(终点操作符)都收到快照标记信息并对自己的状态快照和存储后,整个分布式快照就完成了,同时通知数据源释放该快照标记消息之前的所有消息。若之后发生节点崩溃等异常情况时,只需要恢复之前存储的分布式快照状态,并从数据源重发该快照以后的消息就可以了。

其它

此外,以前说的两者差异还包括:memory management,pipelined execution等,这些两者现在都做到了off-heap的内存管理,把DataSet作为logical plan能够在运行时进行优化。Spark引入tungsten重写了代码生成帮助处理CPU瓶颈。

参考文献

  1. https://github.com/JerryLead/SparkInternals/
  2. http://geek.csdn.net/news/detail/56272
  3. https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html#levels-of-abstraction