Spark Streaming 4. 程序部署监控调优


1. 部署应用

要运行一个Spark Streaming 应用,你首先需要具备以下条件:

  • 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见 deployment guide

  • 给Spark应用打个JAR包 – 你需要将你的应用打成一个JAR包。如果使用spark-submit 提交应用,那么你不需要提供Spark和Spark Streaming的相关JAR包。但是,如果你使用了高级数据源(advanced sources – 如:Kafka、Flume、Twitter等),那么你需要将这些高级数据源相关的JAR包及其依赖一起打包并部署。例如,如果你使用了TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。

  • 为执行器(executor)预留足够的内存 – 执行器必须配置预留好足够的内存,因为接收到的数据都得存在内存里。注意,如果某些窗口长度达到10分钟,那也就是说你的系统必须知道保留10分钟的数据在内存里。可见,到底预留多少内存是取决于你的应用处理逻辑的。

  • 配置检查点 – 如果你的流式应用需要检查点,那么你需要配置一个Hadoop API兼容的可容错存储目录作为检查点目录,流式应用的信息会写入这个目录,故障恢复时会用到这个目录下的数据。详见前面的检查点小节。

  • 配置驱动程序自动重启 – 流式应用自动恢复的前提就是,部署基础设施能够监控驱动器进程,并且能够在其故障时,自动重启之。不同的集群管理器有不同的工具来实现这一功能:

    • Spark独立部署 – Spark独立部署集群可以支持将Spark应用的驱动器提交到集群的某个worker节点上运行。同时,Spark的集群管理器可以对该驱动器进程进行监控,一旦驱动器退出且返回非0值,或者因worker节点原始失败,Spark集群管理器将自动重启这个驱动器。详见Spark独立部署指南(Spark Standalone guide)。
    • YARN – YARN支持和独立部署类似的重启机制。详细请参考YARN的文档。
    • Mesos – Mesos上需要用Marathon来实现这一功能。

配置WAL(write ahead log)- 从Spark 1.2起,我们引入了write ahead log来提高容错性。如果启用这个功能,则所有接收到的数据都会以write ahead log形式写入配置好的检查点目录中。这样就能确保数据零丢失(容错语义有详细的讨论)。用户只需将 spark.streaming.receiver.writeAheadLog 设为true。不过,这同样可能会导致接收器的吞吐量下降。不过你可以启动多个接收器并行接收数据,从而提升整体的吞吐量(more receivers in parallel)。如下代码:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另外,建议在启用WAL后禁用掉接收数据多副本功能,因为WAL其实已经是存储在一个多副本存储系统中了。你只需要把存储级别设为 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系统)存储WAL,一定要记得启用这两个标识:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更详细请参考: Spark Streaming Configuration

设置好最大接收速率 – 如果集群可用资源不足以跟上接收数据的速度,那么可以在接收器设置一下最大接收速率,即:每秒接收记录的条数。相关的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 还需要设置 spark.streaming.kafka.maxRatePerPartition。从Spark 1.5起,我们引入了backpressure的概念来动态地根据集群处理速度,评估并调整该接收速率。用户只需将 spark.streaming.backpressure.enabled设为true即可启用该功能。

2. 升级应用代码

升级Spark Streaming应用程序代码,可以使用以下两种方式:

新的Streaming程序和老的并行跑一段时间,新程序完成初始化以后,再关闭老的。注意,这种方式适用于能同时发送数据到多个目标的数据源(即:数据源同时将数据发给新老两个Streaming应用程序)。

老程序能够优雅地退出(参考 StreamingContext.stop(…) or JavaStreamingContext.stop(…) ),即:确保所收到的数据都已经处理完毕后再退出。然后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式需要数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),因为在新旧程序交接的这个空档时间,数据需要在数据源处缓存。目前还不能支持从检查点重启,因为检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种情况下,只能要么指定一个新的检查点目录,要么删除老的检查点目录。

3. 应用监控

除了Spark自身的监控能力(monitoring capabilities)之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。

web UI上有两个度量特别重要:

  • 批次处理耗时(Processing Time) – 处理单个批次耗时
  • 批次调度延时(Scheduling Delay) -各批次在队列中等待时间(等待上一个批次处理完)

如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。

Spark Streaming程序的处理进度可以用StreamingListener接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。

4. 性能调优

要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情:

  1. 提高集群资源利用率,减少单批次处理耗时。
  2. 设置合适的批次大小,以便使数据处理速度能跟上数据接收速度。

4.1 减少批次处理时间

减少Spark对每个批次的处理时间

4.2 数据接收并发度

跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。

如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver的运行跨worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下:

val numStreams = 5

val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }

val unifiedStream = streamingContext.union(kafkaStreams)

unifiedStream.print()

另一个可以考虑优化的参数就是接收器的阻塞间隔, 该参数由配置参数(configuration parameter) spark.streaming.blockInterval 决定. 大多数接收器都会讲数据合并成一个个数据块, 然后再保存到Spark内存中, 对于map类算子来说, 每个批次中的数据块个数会决定处理这批数据并行任务的个数, 每个接收器每批次数据处理任务书约等于(批次间隔/ 数据块间隔). 例如, 对于2秒的批次间隔, 如果数据块间隔为200ms, 则创建的并发任务数为10, 如果任务书太少, (少于单机cpu core个数), 则资源利用不够充分. 如需增加这个任务数, 对于给定的批次间隔为来说, 只需要减少数据块间隔即可. 不过我们还是建议数据块间隔至少50ms , 否则任务的启动开销就太高了.

另一个切分接收数据流的方法是, 将输入流数据划分为多个分区(使用inputStream.repartition()). 该操作会在处理前,将数据散开重新分发到集群中多个节点上

4.3 数据处理并发度

在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(见PairDStreamFunctions)。

4.4 数据序列化

调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化:

  • 输入数据: 默认地,接收器收到的数据是以 StorageLevel.MEMORY_AND_DISK_SER_2 的存储级别存储到执行器(executor)内存中的。也就是说,收到的数据会被序列化以减少GC开销,同时保存两个副本以容错。同时,数据会优先保存在内存里,当内存不足时才吐出到磁盘上。很明显,这个过程中会有数据序列化的开销 – 接收器首先将收到的数据反序列化,然后再以spark所配置指定的格式来序列化数据。
  • Streaming算子所生产的持久化的RDDs: Streaming计算所生成的RDD可能会持久化到内存中。例如,基于窗口的算子会将数据持久化到内存,因为窗口数据可能会多次处理。所不同的是,spark core默认用 StorageLevel.MEMORY_ONLY 级别持久化RDD数据,而spark streaming默认使用StorageLevel.MEMORY_ONLY_SER 级别持久化接收到的数据,以便尽量减少GC开销。

不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销,详见Spark Tuning Guide。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见Configuration Guide)。

在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。

4.5 任务启动开销

如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销:

  • 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。
  • 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。详见Running on Mesos guide

这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。

4.6 设置合适的批次间隔

要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。

根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。

要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用StreamingListener接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值)

4.7 内存调优

Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。强烈建议你读一读那篇文档。本节中,我们只是讨论一下几个专门用于Spark Streaming的调优参数。

Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,如果想要用一个窗口算子操纵最近10分钟的数据,那么你的集群至少需要在内存里保留10分钟的数据;另一个例子是updateStateByKey,如果key很多的话,相对应的保存的key的state也会很多,而这些都需要占用内存。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。

一般情况下,streaming接收器接收到的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,如果内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大降低streaming应用的性能,因此还是建议根据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,然后再放大,再评估。

另一个内存调优的方向就是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现大量的或耗时较长的JVM垃圾回收暂停。

以下是一些能够帮助你减少内存占用和GC开销的参数或手段:

  • DStream持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗。
  • 清除旧数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的。Spark Streaming会根据所使用的transformation算子来清理旧数据。例如,你用了一个窗口操作处理最近10分钟的数据,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以设置 streamingContext.remember 以保留更长时间段的数据(比如:你可能会需要交互式地查询更老的数据)。
  • CMS垃圾回收器(CMS Garbage Collector): 为了尽量减少GC暂停的时间,我们强烈建议使用CMS垃圾回收器(concurrent mark-and-sweep GC)。虽然CMS GC会稍微降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个比较恒定的水平上。最后,你需要确保在驱动器(通过spark-submit中的–driver-java-options设置)和执行器(使用spark.executor.extraJavaOptions配置参数)上都设置了CMS GC。
  • 其他提示: 如果还想进一步减少GC开销,以下是更进一步的可以尝试的手段:

配合Tachyon使用堆外内存来持久化RDD。详见Spark编程指南(Spark Programming Guide

使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark Streaming 5. 容错语义 Spark Streaming 5. 容错语义
1. 容错语义的背景要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。 RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的依赖信息,其中每个算子都以可容
2018-03-15
下一篇 
Spark Streaming 3. 数据广播与检查点 Spark Streaming 3. 数据广播与检查点
1. 缓存/持久化和RDD类似, DStream也支持将数据持久化到内存中, 只需要调用DStream的persist()方法, 该方法内部会调用DStream中每个RDD的persist()方法, 进而将数据持久化到内存中, 这对于可能需
2018-03-04
  目录