1. 缓存/持久化
和RDD类似, DStream也支持将数据持久化到内存中, 只需要调用DStream的persist()方法, 该方法内部会调用DStream中每个RDD的persist()方法, 进而将数据持久化到内存中, 这对于可能需要计算很多次的DStream非常有用,(例如: 对于同一批数据调用多个算子), 对于基于滑动窗口的算子, 如reduceByWindow 和reduceByKeyAndWindow , 或者有状态的算子, 如:UpdateStateByKey, 数据持久化就更重要了, 因此滑动窗口算子产生的DStream对象会默认自动持久化到内存中, 不需要开发者手动调用persist
对于从网络中接收到的输入数据流, (如:kafka,flume,socket等) 默认的持久化级别会将数据持久化到两个不同的节点互为副本,以便支持容错.
2. 检查点
一般来说Streaming应用都需要7*24小时长期运行, 所以必须对一些与业务逻辑无关的故障有很好的容错,(比如:系统故障、 JVM崩溃等). 对于这些可能性, spark Streaming 必须在检查点保存足够的信息到一些可以容错的外部存储中, 以便能够随时从故障中恢复回来, 所以, 检查点需要保存下面两种数据:
- 元数据检查点(Metadata checkpointing) - 保存流式计算逻辑的定义信息和外部可容错存储系统(如: HDFS), 主要用途是用于故障后恢复应用程序本身,元数据包括:
- Configuration - 创建Streaming应用程序的配置信息
- Dstream operations - 定义流式处理逻辑的DStream操作信息
- Incomplete batches - 已经排队但未处理完的批次信息
- 数据检查点(data checkpointing) - 将生成的RDD保存到可靠的存储中. 这堆一些需要夸批次组数据或者状态的算子来说很有必要, 在这些转换算子中,往往新生成的RDD是依赖于前几个批次的RDD, 因为随着时间的推移, 有可能产生很长的依赖链条. 为了避免在恢复数据时候需要恢复整个依赖链条上所有的数据,
总之 , 元数据检查点主要是为了恢复驱动器节点上的故障, 而数据或RDD检查点事为了支持对有状态操作的恢复
2.1 何时启用检查点
如果有一下情况出现, 你就必须启用检查点了
- 如果使用了有状态的转换算子(Usage of stateful transformations) - 不管是用了 updateStateByKey 还是用了reduceByKeyAndWindow (有”反归约” 函数的那个版本), 你都必须配置检查点目录来周期性的保存RDD检查点
- 支持驱动器故障中恢复(Recovering from failures of the driver running the application ) - 这时候需要元数据检查点以便恢复流式处理的进度信息
注意一些简单的流式应用,如果没有前面说到的状态转换算子, 则完全可以不开启检查点, 不过这样的话, 驱动器(driver)故障回复后, 有可能会丢失部分数据,(有些已经接受单还为处理的数据可能会丢失), 不过通常这点丢失是可接受的, 很多spark streaming 应用也是这样运行的, 对非hadoop环境的支持未来还会继续改进
2.2 如何配置检查点
检查点的启用, 只需要设置好保存检查点的目录即可,一般会将这些目录设置为一些可容错的, 可靠性较高的文件系统(如:hdfs, s3等), 开发者只需要调用streamingContext.checkpint(checkpoingDirectory). 设置好检查点, 你就可以使用前面提到的有状态转换的算子了, 另外 如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:
- 如果驱动器是首次启动,就需要new一个新的StreamingContext, 并定义好所有的数据流处理,然后调用StreamingContext.start()
- 如果程序是故障后重启, 就需要从检查点目录中的数据中重新构建StreamingContext对象,不过这种行为可以用StreamingContext.getOrCreate()来实现
// 首次创建StreamingContext并定义好数据流处理逻辑
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 新建一个StreamingContext对象
val lines = ssc.socketTextStream(...) // 创建DStreams
...
ssc.checkpoint(checkpointDirectory) // 设置好检查点目录
ssc
}
// 创建新的StreamingContext对象,或者从检查点构造一个
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 无论是否是首次启动都需要设置的工作在这里
context. ...
// 启动StreamingContext对象
context.start()
context.awaitTermination()
如果checkpoingDirectory目录存在, 则context对象会从检查点重新构建出来, 如果该目录不存在(如: 首次运行,) 则functionTocreateContext函数会被调用, 创建一个新的StreamingContext对象并定义好DStream数据流,完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。
除了使用getOrCreate之外, 开发者还需要确保驱动器进程能在故障之后重启, 这一点只能由应用部署环境基础设施来保证, 进一步讨论见部署(Deployment)这一节
另外需要注意的是, RDD检查点会增加额外的保存数据的开销, 这可能会导致数据流的处理事件变长, 因此, 你必须仔细的调整检查点时间间隔, 如果批次间隔太小, (如:1秒) 那么对每个批次保存检查点数据将大大减小吞吐量, 另一方面, 检查点保存过于频道会导致系统信息和任务个数的增加, 同样会影响系统性能, 对于需要RDD检查点的有状态转换算子, 默认的时间间隔是批次间隔的整数倍, 且最小10秒,开发人员可以自定义这个时间间隔:
dstream.checkpoint(checkpoinginterval), 一般推荐设置为批次间隔时间的5到10倍
3. 累加器和广播变量
首先需要注意的是, 累加器(Accumulators) 和广播变量(Broadcast variables) 是无法从Spark Streaming的检查点中恢复回来的, 所以如果你开启了检查点功能,同时在使用累加器和广播变量, 那么你最好是使用懒惰模式实例化的单例模式, 因为这样 累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化 代码如下:
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long] = null
def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// 获取现有或注册新的blacklist广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取现有或注册新的 droppedWordsCounter 累加器
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts
})