Spark Streaming 3. 数据广播与检查点


1. 缓存/持久化

和RDD类似, DStream也支持将数据持久化到内存中, 只需要调用DStream的persist()方法, 该方法内部会调用DStream中每个RDD的persist()方法, 进而将数据持久化到内存中, 这对于可能需要计算很多次的DStream非常有用,(例如: 对于同一批数据调用多个算子), 对于基于滑动窗口的算子, 如reduceByWindow 和reduceByKeyAndWindow , 或者有状态的算子, 如:UpdateStateByKey, 数据持久化就更重要了, 因此滑动窗口算子产生的DStream对象会默认自动持久化到内存中, 不需要开发者手动调用persist

对于从网络中接收到的输入数据流, (如:kafka,flume,socket等) 默认的持久化级别会将数据持久化到两个不同的节点互为副本,以便支持容错.

2. 检查点

一般来说Streaming应用都需要7*24小时长期运行, 所以必须对一些与业务逻辑无关的故障有很好的容错,(比如:系统故障、 JVM崩溃等). 对于这些可能性, spark Streaming 必须在检查点保存足够的信息到一些可以容错的外部存储中, 以便能够随时从故障中恢复回来, 所以, 检查点需要保存下面两种数据:

  • 元数据检查点(Metadata checkpointing) - 保存流式计算逻辑的定义信息和外部可容错存储系统(如: HDFS), 主要用途是用于故障后恢复应用程序本身,元数据包括:
    • Configuration - 创建Streaming应用程序的配置信息
    • Dstream operations - 定义流式处理逻辑的DStream操作信息
    • Incomplete batches - 已经排队但未处理完的批次信息
  • 数据检查点(data checkpointing) - 将生成的RDD保存到可靠的存储中. 这堆一些需要夸批次组数据或者状态的算子来说很有必要, 在这些转换算子中,往往新生成的RDD是依赖于前几个批次的RDD, 因为随着时间的推移, 有可能产生很长的依赖链条. 为了避免在恢复数据时候需要恢复整个依赖链条上所有的数据,

总之 , 元数据检查点主要是为了恢复驱动器节点上的故障, 而数据或RDD检查点事为了支持对有状态操作的恢复

2.1 何时启用检查点

如果有一下情况出现, 你就必须启用检查点了

  • 如果使用了有状态的转换算子(Usage of stateful transformations) - 不管是用了 updateStateByKey 还是用了reduceByKeyAndWindow (有”反归约” 函数的那个版本), 你都必须配置检查点目录来周期性的保存RDD检查点
  • 支持驱动器故障中恢复(Recovering from failures of the driver running the application ) - 这时候需要元数据检查点以便恢复流式处理的进度信息

注意一些简单的流式应用,如果没有前面说到的状态转换算子, 则完全可以不开启检查点, 不过这样的话, 驱动器(driver)故障回复后, 有可能会丢失部分数据,(有些已经接受单还为处理的数据可能会丢失), 不过通常这点丢失是可接受的, 很多spark streaming 应用也是这样运行的, 对非hadoop环境的支持未来还会继续改进

2.2 如何配置检查点

检查点的启用, 只需要设置好保存检查点的目录即可,一般会将这些目录设置为一些可容错的, 可靠性较高的文件系统(如:hdfs, s3等), 开发者只需要调用streamingContext.checkpint(checkpoingDirectory). 设置好检查点, 你就可以使用前面提到的有状态转换的算子了, 另外 如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:

  • 如果驱动器是首次启动,就需要new一个新的StreamingContext, 并定义好所有的数据流处理,然后调用StreamingContext.start()
  • 如果程序是故障后重启, 就需要从检查点目录中的数据中重新构建StreamingContext对象,不过这种行为可以用StreamingContext.getOrCreate()来实现
// 首次创建StreamingContext并定义好数据流处理逻辑
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // 新建一个StreamingContext对象
  val lines = ssc.socketTextStream(...) // 创建DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // 设置好检查点目录
  ssc
}
// 创建新的StreamingContext对象,或者从检查点构造一个
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 无论是否是首次启动都需要设置的工作在这里
context. ...
// 启动StreamingContext对象
context.start()
context.awaitTermination()

如果checkpoingDirectory目录存在, 则context对象会从检查点重新构建出来, 如果该目录不存在(如: 首次运行,) 则functionTocreateContext函数会被调用, 创建一个新的StreamingContext对象并定义好DStream数据流,完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。

除了使用getOrCreate之外, 开发者还需要确保驱动器进程能在故障之后重启, 这一点只能由应用部署环境基础设施来保证, 进一步讨论见部署(Deployment)这一节

另外需要注意的是, RDD检查点会增加额外的保存数据的开销, 这可能会导致数据流的处理事件变长, 因此, 你必须仔细的调整检查点时间间隔, 如果批次间隔太小, (如:1秒) 那么对每个批次保存检查点数据将大大减小吞吐量, 另一方面, 检查点保存过于频道会导致系统信息和任务个数的增加, 同样会影响系统性能, 对于需要RDD检查点的有状态转换算子, 默认的时间间隔是批次间隔的整数倍, 且最小10秒,开发人员可以自定义这个时间间隔:

dstream.checkpoint(checkpoinginterval), 一般推荐设置为批次间隔时间的5到10倍

3. 累加器和广播变量

首先需要注意的是, 累加器(Accumulators) 和广播变量(Broadcast variables) 是无法从Spark Streaming的检查点中恢复回来的, 所以如果你开启了检查点功能,同时在使用累加器和广播变量, 那么你最好是使用懒惰模式实例化的单例模式, 因为这样 累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化 代码如下:

object WordBlacklist {
  @volatile private var instance: Broadcast[Seq[String]] = null
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}
object DroppedWordsCounter {
  @volatile private var instance: Accumulator[Long] = null
  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // 获取现有或注册新的blacklist广播变量
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // 获取现有或注册新的 droppedWordsCounter 累加器
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts
})

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark Streaming 4. 程序部署监控调优 Spark Streaming 4. 程序部署监控调优
1. 部署应用要运行一个Spark Streaming 应用,你首先需要具备以下条件: 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见 deployment guide。 给Spark应用打个JAR包 –
2018-03-09
下一篇 
Spark Streaming 2. DStream 介绍 Spark Streaming 2. DStream 介绍
1. DStreams(离散数据流)离散数据流(DStream) 是spark Streaming最基本的抽象,它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD 组成
2018-02-26
  目录