Spark Streaming 7. Structured Streaming 入门


本文转自:Spark结构式流编程指南

1. 概览

Structured Streaming 是一个可拓展,容错的,基于 Spark SQL 执行引擎的流处理引擎。使用小量的静态数据模拟流处理。伴随流数据的到来,SparkSQL 引擎会逐渐连续处理数据并且更新结果到最终的 Table 中。你可以在 Spark SQL 上引擎上使用 DataSet/DataFrame API 处理流数据的聚集,事件窗口,和流与批次的连接操作等。最后 Structured Streaming 系统快速,稳定,端到端的恰好一次保证,支持容错的处理

1.1 基本代码示例

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 把每行文本按空格分割成单词
val words = lines.as[String].flatMap(_.split(" "))

// 对单词进行计数
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()

在 Spark Streaming 中读取网络流数据通常需要三个步骤:

  1. 首先使用 spark.readStream 方法创建一个流式 DataFrame。
  2. 然后使用 format 方法指定输入格式,这里使用 “socket” 表示从网络套接字中读取数据。
  3. 最后使用 option 方法指定主机名和端口号以及其他必要参数。

接下来,我们将读取到的数据按空格分割成单词,并使用 groupBycount 方法对单词进行计数。最后,使用 writeStream 方法将结果输出到控制台,并设置输出模式为 “complete”,表示在更新整个结果时输出所有记录。最后,调用 awaitTermination 方法等待作业完成。

总之,这段代码演示了如何使用 Spark Structured Streaming 来读取网络数据,并对其进行简单的处理和统计。

编程模型

结构化流的关键思想是将实时数据流视为一个连续附加的表

基本概念

将输入的数据当成一个输入的表格,每一个数据当成输入表的一个新行,如下图所示:

structured-streaming-model

Output 是写入到外部存储的写方式,写入方式有不同的模式:

  • Complete 模式: 将整个更新表写入到外部存储,写入整个表的方式由存储连接器决定。
  • Append 模式:只有自上次触发后在结果表中附加的新行将被写入外部存储器。这仅适用于结果表中的现有行不会更改的查询。
  • Update 模式:只有自上次触发后在结果表中更新的行将被写入外部存储器( 在Spark 2.0中尚不可用)。注意,这与完全模式不同,因为此模式不输出未更改的行。

处理事件时间和延迟数据

事件时间 是嵌入在数据本身中的时间。对于许多应用程序,您可能希望在此事件时间操作。例如,如果要获取 IoT 设备每分钟生成的事件数,则可能需要使用生成数据的时间(即数据中的事件时间),而不是 Spark 接收的时间他们。此事件时间在此模型中非常自然地表示 - 来自设备的每个事件都是表中的一行,事件时间是该行中的一个列值。这允许基于窗口的聚合(例如每分钟的事件数)仅仅是偶数时间列上的特殊类型的分组和聚合 - 每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,可以在静态数据集(例如 来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗的聚合查询,使得用户的生活更容易。

此外,该模型自然地处理基于其事件时间比预期到达的数据。由于 Spark 正在更新结果表,因此当存在延迟数据时,它可以完全控制更新旧聚合,以及清除旧聚合以限制中间状态数据的大小。由于 Spark 2.1 支持水印,允许用户指定后期数据的阈值,并允许引擎相应地清除旧的状态。稍后将在 “ 窗口操作 ” 部分中对此进行详细说明。

容错语义

提供端到端的一次性语义是结构化流的设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源,接收器和执行引擎,以可靠地跟踪处理的确切进展,以便它可以通过重新启动 和 重新处理来处理任何类型的故障。假定每个流源具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)以跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流接收器被设计为用于处理再处理的幂等。结合使用可重放源和幂等宿,结构化流可以确保在任何故障下的端到端的一次性语义。

2. 使用DataFrame和DataSet API

从 Spark 2.0 开始,DataFrames 和 Datasets 可以表示静态,有界数据,以及流式,无界数据。与静态 DataSets / DataFrames 类似,您可以使用公共入口点 SparkSession从流源创建流 DataFrames / DataSets,并对它们应用与静态DataFrames / Datasets 相同的操作。如果您不熟悉 Datasets / DataFrames,强烈建议您使用 DataFrame / Dataset 编程指南熟悉它们。

2.1 创建数据框流和数据集流

Streaming DataFrames 可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口(Scala / Java / Python docs)创建。类似于用于创建静态 DataFrame 的读取接口,您可以指定源 - 数据格式,模式,选项等的详细信息。

2.1.1 数据源

在Spark 2.0,有几个内置的数据源:

  • 文件源:将写入目录中的文件读取为数据流。支持的文件格式有text,csv,json,parquet。请参阅 DataStreamReader 界面的文档以获取更新的列表,以及每种文件格式支持的选项。注意,文件必须原子地放置在给定目录中,在大多数文件系统中,可以通过文件移动操作来实现。
  • Kafka 源:从kafka拉取数据,支持kafka broker versions 0.10.0 or higher.从kafka集成指南获取更多信息。
  • Socket 源(测试用):从套接字连接读取 UTF8 文本数据。监听服务器套接字在驱动程序。注意,这应该仅用于测试,因为这不提供端到端容错保证

这些示例生成无类型的流式 DataFrames,这意味着在编译时不检查 DataFrame 的模式,仅在提交查询时在运行时检查。一些操作,如map,flatMap 等,需要在编译时知道类型。要做到这些,你可以使用与静态 DataFrame 相同的方法将这些无类型的流 DataFrames 转换为类型化流数据集。有关更多详细信息,请参阅 SQL 编程指南。此外,有关支持的流媒体源的更多详细信息将在文档中稍后讨论。

2.1.2 数据框/数据集流的模式推理和分区

默认情况下,基于文件的源的结构化流要求您指定模式,而不是依靠 Spark 自动推断。此限制确保即使在发生故障的情况下,一致的模式也将用于流式查询。对于临时用例,可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

当名为 key = value 的子目录存在时,发生分区发现,并且列表将自动递归到这些目录中。如果这些列出现在用户提供的模式中,它们将由 Spark 根据正在读取的文件的路径填充。当查询开始时,组成分区方案的目录必须存在,并且必须保持静态。例如,可以添加/ data / year = 2016 / when / data / year = 2015 /存在,但是更改分区列是无效的(即通过创建目录/ data / date = 2016-04-17 /)。

2.2 流式DataFrames/Datasets上的操作

您可以对流式 DataFrames 数据集应用各种操作 - 从无类型,类似SQL的操作(例如 select,where,groupBy )到类型化的 RDD 类操作(例如map,filter,flatMap)。有关更多详细信息,请参阅SQL编程指南。让我们来看看一些你可以使用的示例操作。

2.2.1 基本操作 - 选择,投影,聚合

case class DeviceData(device: String, type: String, signal: Double, time: DateTime)

val df: DataFrame = ... // 包含IOT设备数据的流式DataFrame,其结构为 { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // 包含IOT设备数据的流式Dataset

// 选择信号强度大于10的设备
df.select("device").where("signal > 10")      // 使用非类型化 API   
ds.filter(_.signal > 10).map(_.device)         // 使用类型化 API

// 按设备类型统计更新数量
df.groupBy("type").count()                          // 使用非类型化 API

// 按设备类型统计信号强度平均值
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal))    // 使用类型化 API

代码中注释的第一行定义了一个 case class,用于描述 IOT 设备数据的结构。接下来,我们使用 Spark 读取相关的数据源,创建了一个包含 IOT 设备数据的流式 DataFrame,并将其转换成了流式 Dataset,以便后续进行类型化的操作。接下来的注释分别解释了两种不同的方式处理数据:

第一种方式是通过选择信号强度大于 10 的设备来过滤数据,代码中展示了如何使用非类型化 API 和类型化 API 分别实现该功能。

第二种方式是对数据进行聚合操作,代码中展示了如何使用非类型化 API 和类型化 API 分别计算更新数量和信号强度平均值。可以看到,使用类型化 API 可以更加清晰地表达数据处理的逻辑,代码可读性更高。

2.2.2 事件时间上的窗口操作

滑动事件时间窗口上的聚合通过结构化流直接进行。理解基于窗口的聚合的关键思想与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,对于行的事件时间落入的每个窗口维持聚合值。让我们用插图来理解这一点。

想象一下,我们的快速示例被修改,流现在包含行以及生成行的时间。我们不想运行字数,而是计算10分钟内的字数,每5分钟更新一次。也就是说,在10 分钟窗口 12:00 - 12:10 , 12:05 - 12:15 , 12:10-12:20 等之间接收的词中的字数。注意,12:00 -12:10 意味着数据在12:00 之后但在 12:10 之前到达。现在,考虑在12:07 收到的一个字。这个单词应该增加对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数。因此,计数将通过分组键(即字)和窗口(可以从事件时间计算)来索引。结果表将如下所示:

由于此窗口类似于分组,因此在代码中,可以使用 groupBy() 和 window() 操作来表示窗口化聚合。您可以在 Scala / Java / Python中查看以下示例的完整代码。

2.2.3 处理延迟数据和水位线

现在考虑如果事件中的一个到达应用程序的迟到会发生什么。例如,例如,在12:04(即事件时间)生成的词可以由应用在12:11 接收到。应用程序应使用时间 12:04 而不是 12:11 来更新窗口 12:00 - 12:10 的旧计数。这在我们的基于窗口的分组中自然地发生 - 结构化流可以长时间地保持部分聚合的中间状态,使得晚期数据可以正确地更新旧窗口的聚集,如下所示:

但是,要运行此查询的天数,系统必须绑定其累积的中间内存中状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在 Spark 2.1 中,我们引入了水印,让我们的引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧的状态。您可以通过指定事件时间列和根据事件时间预计数据延迟的阈值来定义查询的水印。对于在时间T 开始的特定窗口,引擎将保持状态并允许后期数据更新状态,直到(由引擎看到的最大事件时间 - 后期阈值 > T)。换句话说,阈值内的晚数据将被聚合,但晚于阈值的数据将被丢弃。让我们用一个例子来理解这个。我们可以使用 withWatermark() 在上面的例子中轻松定义水印,如下所示。

import spark.implicits._

val words = ... // 流式 DataFrame,包含 { timestamp: Timestamp, word: String } 的结构

// 按窗口和单词对数据进行分组,并计算每个分组的数量
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")  // 在时间戳列上设置 watermark 为 10 分钟
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),  // 设置窗口大小为 10 分钟,滑动间隔为 5 分钟
        $"word")
    .count()

在这个例子中,我们定义查询的水印对列 “ timestamp ” 的值,并且还定义 “ 10分钟 ” 作为允许数据超时的阈值。如果此查询在 Append输出模式(稍后在“输出模式”部分中讨论)中运行,则引擎将从列 “ timestamp ” 跟踪当前事件时间,并在最终确定窗口计数和添加之前等待事件时间的额外 “ 10分钟 ” 他们到结果表。这是一个例证。

如图所示,由引擎跟踪的最大事件时间是蓝色虚线,并且在每个触发的开始处设置为( 最大事件时间 - ‘10分钟’ )的水印是红色线。例如,当引擎观察数据(12:14,狗),它将下一个触发器的水印设置为12:04。对于窗口 12:00 - 12:10,部分计数保持为内部状态,而系统正在等待延迟数据。在系统发现数据(即(12:21,owl))使得水印超过12:10之后,部分计数被最终确定并附加到表。此计数将不会进一步更改,因为所有超过 12:10 的 “太晚” 数据将被忽略。

请注意,在追加输出模式下,系统必须等待 “延迟阈值” 时间才能输出窗口的聚合。如果数据可能很晚,(例如1天),并且您希望部分计数而不等待一天,这可能不是理想的。将来,我们将添加更新输出模式,这将允许每次更新聚合写入到每个触发器。

用于清除聚合状态的水印的条件重要的是要注意,水印应当满足以下条件以清除聚合查询中的状态( 从Spark 2.1开始,将来会改变)。

  • 输出模式必须为追加。完成模式要求保留所有聚合数据,因此不能使用水印来删除中间状态。有关每种输出模式的语义的详细说明,请参见 “ 输出模式 ” 部分。

  • 聚合必须具有事件时列,或事件时列上的窗口。

  • withWatermark 必须在与聚合中使用的时间戳列相同的列上调用。

    例如:df.withWatermark("time", "1 min").groupBy("time2").count() 在Append输出模式下无效,因为水印是在与聚合列不同的列上定义的。

  • 其中在要使用水印细节的聚合之前必须调用withWatermark。

    例如:df.groupBy("time").count().withWatermark("time", "1 min") 在Append输出模式中无效。

2.2.4 Join操作

流 DataFrames 可以与静态 DataFrames 连接以创建新的流 DataFrames。这里有几个例子。

// 这段代码使用 Spark 读取数据,并对两个 DataFrame 进行 Join 操作。
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // 使用静态 DataFrame 进行内连接
streamingDf.join(staticDf, "type", "right_join")  // 使用静态 DataFrame 进行右外连接

其中 staticDfstreamingDf 分别是静态 DataFrame 和流式 DataFrame,可以通过 Spark 读取相关的数据源来创建它们。这里使用了 join 方法进行 Join 操作,第一个参数是要 Join 的 DataFrame,第二个参数是 Join 的列名,在这里是 "type"。第三个参数为可选参数,指定 Join 类型,这里用了 "right_join" 表示右外连接。

2.2.5 不支持的操作

但是,请注意,所有适用于静态 DataFrames 数据集的操作在流式 DataFrames 数据集中不受支持。虽然这些不支持的操作中的一些将在未来的Spark版本中得到支持,但还有一些基本上难以有效地在流数据上实现。例如,输入流数据集不支持排序,因为它需要跟踪流中接收的所有数据。因此,这在根本上难以有效地执行。从Spark 2.0开始,一些不受支持的操作如下:

  • 在流数据集上还不支持多个流聚集(即,流DF上的聚合链)。
  • 在流数据集上不支持限制和获取前N行。
  • 不支持对流数据集进行不同操作。
  • 排序操作仅在聚合后在完整输出模式下支持流数据集。
  • 条件支持流式传输和静态数据集之间的外连接。
  • 不支持带有流数据集的完全外连接
  • 不支持左外部连接与右侧的流数据集
  • 不支持左侧的流数据集的右外部联接
  • 尚不支持两个流数据集之间的任何类型的连接。

此外,还有一些Dataset方法不能用于流数据集。它们是将立即运行查询并返回结果的操作,这对流数据集没有意义。相反,这些功能可以通过显式地启动流查询来完成(参见下一部分)。

  • count() - 无法从流数据集返回单个计数。
          相反,使用 ds.groupBy.count() 返回包含运行计数的流数据集。
  • foreach() - 而是使用 ds.writeStream.foreach(…)(参见下一节)。
  • show() - 而是使用控制台接收器(请参阅下一节)。

如果您尝试任何这些操作,您将看到一个 AnalysisException 如 “ 操作XYZ不支持与流DataFrames 数据集 ”。

2.3 启动流式查询

一旦定义了最终结果 DataFrame / Dataset,剩下的就是启动流计算。为此,您必须使用通过 Dataset.writeStream() 返回的DataStreamWriter。您必须在此界面中指定以下一个或多个。

  • 输出接收器的详细信息:数据格式,位置等
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:(可选)指定查询的唯一名称以进行标识。
  • 触发间隔:可选择指定触发间隔。如果未指定,系统将在上一个处理完成后立即检查新数据的可用性。如果由于先前处理尚未完成而错过触发时间,则系统将尝试在下一触发点处触发,而不是在处理完成之后立即触发。
  • 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统将写入所有检查点信息的位置。这应该是HDFS兼容的容错文件系统中的目录。检查点的语义将在下一节中更详细地讨论。

2.3.1 输出模式

有几种类型的输出模式:

  • 附加模式(默认) - 这是默认模式,其中只有自上次触发后添加到结果表中的新行将输出到接收器。这仅支持那些添加到结果表中的行从不会更改的查询。因此,该模式保证每行只输出一次(假设容错宿)。例如,只有select,where,map,flatMap,filter,join等的查询将支持 Append 模式。
  • 完成模式 - 每次触发后,整个结果表将输出到接收器。聚合查询支持此选项。
  • 更新模式 - (在Spark 2.1中不可用)只有结果表中自上次触发后更新的行才会输出到接收器。更多信息将在未来版本中添加。

不同类型的流查询支持不同的输出模式。这里是兼容性矩阵:

查询类型 支持的输出模式
无聚合的查询 Append 支持完整模式,因为不可能将所有数据保存在结果表中。
带有聚合的聚合 在带有水印的event-time上聚合 Append, Complete 追加模式使用水印来删除旧的聚合状态。但是,窗口聚合的输出会延迟到在withWatermark()中指定的延迟阈值时间之后。因为根据模式的语义,只有在行数据最终确定后(即越过水印之后),才能将它们添加到结果表中。有关更多详细信息,请参见“延迟数据”部分。完整模式不会删除旧的聚合状态,因为按定义该模式会保留结果表中的所有数据。
其他聚合 Complete 不支持追加模式,因为聚合可以更新,从而违反了此模式的语义。完整模式不会删除旧的聚合状态,因为按定义该模式会保留结果表中的所有数据。

2.3.2 输出接收器

有几种类型的内置输出接收器:

  • 文件接收器 - 将输出存储到目录。
  • Foreach sink - 对输出中的记录运行任意计算。有关详细信息,请参阅后面的部分。
  • 控制台接收器(用于调试) - 每次有触发器时将输出打印到控制台 stdout。这应该用于低数据量上的调试目的,因为每次触发后,整个输出被收集并存储在驱动程序的内存中。
  • 内存接收器(用于调试) - 输出作为内存表存储在内存中。支持附加和完成输出模式。这应该用于低数据量上的调试目的,因为每次触发后,整个输出被收集并存储在驱动程序的内存中。

下面是所有接收器的表格和相应的设置:

接收器 支持的输出模式 用法 容错 备注
文件接收器 Append writeStream.format(“parquet”).start() Yes 支持对分区表的写入。按时间分区可能有用。
Foreach 接收器 Complete writeStream.foreach(…).start() 取决于
ForeachWriter
实现
更多细节在下一节
控制台接收器 Append, Complete writeStream.format(“console”).start() No
内存接收器 Append, Complete writeStream.format(“memory”).queryName(“table”).start() No 将输出数据保存为表,用于交互式查询。表名是查询名称。

最后,你必须调用start()才能真正开始执行查询。这返回一个 StreamingQuery 对象,它是连续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。

// ========== 没有聚合的数据框 ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// 将新数据打印到控制台
noAggDF
.writeStream()
.format("console")
.start();

// 将新数据写入 Parquet 文件
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.start();

// ========== 聚合后的数据框 ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// 将更新后的聚合结果打印到控制台
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();

// 将所有聚合结果存储在内存表中
aggDF
.writeStream()
.queryName("aggregates")    // 此查询名称将成为表名
.outputMode("complete")
.format("memory")
.start();

spark.sql("select * from aggregates").show();   // 在内存表中进行交互式查询

2.3.3 使用foreach

foreach 操作允许对输出数据计算任意操作。从 Spark 2.1开始,这只适用于 Scala 和 Java。要使用这个,你必须实现接口 ForeachWriter(Scala / Java docs),它有一个方法,当触发后产生一系列行作为输出时被调用。请注意以下要点。

编写器必须是可序列化的,因- 为它将被序列化并发送到执行器以供执行。

  • 所有三个方法,打开,处理和关闭将被调用的执行者。
  • 只有当调用 open 方法时,写程序必须执行所有的初始化(例如打开连接,启动事务等)。请注意,如果在创建对象时在类中有任何初始化,那么该初始化将在驱动程序中进行(因为这是创建实例的地方),这可能不是您想要的。
  • 版本和分区是 open 中的两个参数,它们唯一地表示需要被推出的一组行。版本是一个单调增加的 id,随着每个触发器增加。partition 是表示输出的分区的 id,因为输出是分布式的,并且将在多个执行器上处理。
  • open 可以使用版本和分区来选择是否需要写行序列。因此,它可以返回 true(继续写入)或 false(不需要写入)。如果返回 false,那么将不会在任何行上调用进程。例如,在部分故障之后,失败触发器的一些输出分区可能已经被提交到数据库。基于存储在数据库中的元数据,写者可以识别已经提交的分区,因此返回false以跳过再次提交它们。
  • 每当调用 open 时,也将调用 close(除非JVM由于某些错误而退出)。即使 open 返回 false,也是如此。如果在处理和写入数据时出现任何错误,将使用错误调用 close。您有责任清除在开放中创建的状态(例如连接,事务等),以便没有资源泄漏。

2.4 管理流式查询

启动查询时创建的 StreamingQuery 对象可用于监视和管理查询。

StreamingQuery query = df.writeStream().format("console").start();   // 获取查询对象

query.id();          // 获取正在运行的查询的唯一标识符

query.name();        // 获取自动生成的或用户指定的名称

query.explain();   // 打印有关查询的详细说明

query.stop();      // 停止查询

query.awaitTermination();   // 阻塞,直到查询终止,或者出现错误

query.exception();    // 如果查询已通过错误终止,则会抛出异常

query.sourceStatus();  // 关于从输入源读取的数据的进度信息

query.sinkStatus();   // 关于写入输出接收器的数据的进度信息

您可以在单个 SparkSession 中启动任意数量的查询。他们将同时运行共享集群资源。您可以使用 sparkSession.streams() 获取可用于管理当前活动查询的 StreamingQueryManager。

SparkSession spark = ...

spark.streams().active();    // 获取当前活动的流查询列表

spark.streams().get(id);   // 根据其唯一标识符获取查询对象

spark.streams().awaitAnyTermination();   // 阻塞,直到其中任何一个终止

2.5 监视流查询

有两个API用于以交互式和异步方式监视和调试活动的查询。

2.5.1 交互式API

您可以使用 streamingQuery.lastProgress() 和 streamingQuery.status() 直接获取活动查询的当前状态和指标。 lastProgress() 在 Scala和 Java 中返回一个 StreamingQueryProgress 对象,在 Python 中返回一个具有相同字段的字典。它具有关于在流的最后触发中所进行的进展的所有信息 - 什么数据被处理,什么是处理速率,等待时间等。还有 streamingQuery.recentProgress ,它返回最后几个进度的数组。

此外,streamingQuery.status() 在 Scala 和 Java 中返回 StreamingQueryStatus 对象,在 Python 中返回具有相同字段的字典。它提供有关查询立即执行的操作的信息 - 是触发器活动,正在处理数据等。这里有几个例子。

StreamingQuery query = ...

System.out.println(query.lastProgress());
/* 将打印类似于以下内容的信息。

{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/

System.out.println(query.status());
/* 将打印类似于以下内容的信息。
{
"message" : "等待数据到达",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/

2.5.2 异步API

您还可以通过附加 StreamingQueryListener(Scala / Java docs)异步监视与 SparkSession 相关联的所有查询。使用sparkSession.streams.attachListener() 附加自定义 StreamingQueryListener 对象后,当查询启动和停止以及活动查询中有进度时,您将获得回调。这里是一个例子

SparkSession spark = ...

// 添加流查询监听器
spark.streams.addListener(new StreamingQueryListener() {
// 查询开始时触发此方法
@Override
void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("查询已启动:" + queryStarted.id());
}

// 查询结束时触发此方法
@Override
void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
    System.out.println("查询已终止:" + queryTerminated.id());
}

// 查询进度更新时触发此方法
@Override
void onQueryProgress(QueryProgressEvent queryProgress) {
    System.out.println("查询已更新进度:" + queryProgress.progress());
}
});

2.6 使用检查点从故障中恢复

在故障或故意关闭的情况下,您可以恢复先前查询的先前进度和状态,并继续在其停止的地方。这是通过使用检查点和预写日志来完成的。您可以配置具有检查点位置的查询,并且查询将保存所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(例如快速示例中的字计数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。

aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();</row>

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
构建数据仓库的分层设计 构建数据仓库的分层设计
1. 前言数据仓库代表的是一套全面的数据管理和使用策略,包含了诸如ETL、调度和建模等完整的理论体系。而现在的“大数据”更多的是指数据量的增加和工具的更新。这两者并没有冲突,实际上,它们可以更好地结合起来。单纯使用Hadoop、Spark、
2018-05-03
下一篇 
Spark Streaming 6. Exactly-Once解决方案 Spark Streaming 6. Exactly-Once解决方案
1 Exactly-Once事务处理1.1 什么是Exactly-Once事务?数据仅处理一次并且仅输出一次,这样才是完整的事务处理。 以银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,保证事务的一致性,也就是说事务输出,能够输出且
2018-04-10
  目录