Spark Streaming 5. 容错语义


1. 容错语义的背景

要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。

  1. RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的依赖信息,其中每个算子都以可容错的数据集作为输入数据。
  2. 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的依赖信息以及相应的原始输入数据集重新计算出来。
  3. 假定所有RDD transformation算子计算过程都是确定性的,那么通过这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关。

Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据,这会导致Streaming的数据源并不可靠(尤其是对于使用了fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不同worker节点上的Spark执行器来实现(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据:

  • 接收并保存了副本的数据 – 数据不会因为单个worker节点故障而丢失,因为有副本!
  • 接收但尚未保存副本数据 – 因为数据并没有副本,所以一旦故障,只能从数据源重新获取。

此外,还有两种可能的故障类型需要考虑:

  • Worker节点故障 – 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。
  • Driver节点故障 – 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失。

有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。

2. 容错语义的定义

流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障):

  • 至多一次(At most once): 每条记录要么被处理一次,要么就没有处理。
  • 至少一次(At least once): 每条记录至少被处理过一次(一次或多次)。这种保证能确保没有数据丢失,比“至多一次”要强。但有可能出现数据重复。
  • 精确一次(Exactly once): 每条记录都精确地只被处理一次 – 也就是说,既没有数据丢失,也不会出现数据重复。这是三种保证中最强的一种。

3. 基础语义

任何流式处理系统一般都会包含以下三个数据处理步骤:

  1. 数据接收(Receiving the data): 从数据源拉取数据。
  2. 数据转换(Transforming the data): 将接收到的数据进行转换(使用DStream和RDD transformation算子)。
  3. 数据推送(Pushing out the data): 将转换后最终数据推送到外部文件系统,数据库或其他展示系统。

如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:

  1. 数据接收: 不同数据源提供的保证不同,下一节再详细讨论。
  2. 数据转换: 所有的数据都会被“精确一次”处理,这要归功于RDD提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。
  3. 数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。

4. 接收数据语义

不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。

4.1 从文件接收数据

如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次,不多也不少。

4.2 基于接收器接收数据

对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:

  1. 可靠接收器 – 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据。
  2. 不可靠接收器 – 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。

对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来说,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。

为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。

下表总结了故障情况下的各种语义:

部署场景 Worker 故障 Driver 故障
Spark 1.1及以前版本 或者 Spark 1.2及以后版本,且未开启WAL 若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 若使用不可靠接收器,则缓存数据和已保存数据都可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证
Spark 1.2及以后版本,并启用WAL 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证

5. 从Kafka Direct API接收数据

从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。(改功能截止Spark 1.6.1还是实验性的)更详细的说明见:Kafka Integration Guide

6. 输出算子的语义

输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式:

  • 幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。

  • 事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式:

    • 用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。
    • 基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交
  }
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
机器学习中的数据清洗与特征处理综述 机器学习中的数据清洗与特征处理综述
转载美团文档,原文地址为: https://tech.meituan.com/machinelearning-data-feature-process.html 1. 背景随着美团交易规模的逐步增大,积累下来的业务数据和交易数据越来越多,这
2018-03-21
下一篇 
Spark Streaming 4. 程序部署监控调优 Spark Streaming 4. 程序部署监控调优
1. 部署应用要运行一个Spark Streaming 应用,你首先需要具备以下条件: 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见 deployment guide。 给Spark应用打个JAR包 –
2018-03-09
  目录