Spark Streaming 2. DStream 介绍


1. DStreams(离散数据流)

离散数据流(DStream) 是spark Streaming最基本的抽象,它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD 组成的,每个RDD都是不可变、分布式的数据集,(详见spark编程指南 Spark Programming Guide )。每个RDD都包含了特定的时间间隔内的一批数据,如下如所示:

任何作用于DStream的算子,其内部都会被转化为对内部RDD 的操作。例如,在前面的例子中,我们将lines 这个DStream转化成words DSsream对象,其实作用与lines 上的flatMap 算子,会施加于lines 中的每个RDD 上面,并生成新的对应的RDD,而这些新城省的RDD 对象就组成了words 这个DStream对象,底层的 RDD 转换仍是由spark引擎计算的, DStream的算子将这些细节隐藏了起来,并未开发者提供了更方便的高级API。过程如下图:

2. DStream Receivers

Spark Streaming主要提供了两种内建流式数据源

  • 基础数据源(basic sources): 在StreamingContext API中可以直接使用,如: file system、 socket 或者 AKKA actor。
  • 高级数据源(Advanced sources): 需要额外工具类的源,如:kafka、flume、kinesis、Twitter等,这些数据源都需要增加额外的依赖

如果你需要同时从过个数据源拉取数据,那么你就需要创建多个DStream对象。 多个DStream对象其实也就同时创建了多个数据流接收器,但是需要注意的是Spark的worker/executor都是长期运行的,因此他们都会各自占用一个分配给SparkStreaming应用的CPU,所以在运行SparkStreaming应用的时候,需要注意分片足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPUcore来运行这些Receivers。

如果本地运行spark Streaming应用,记得不能将master设置为”local” 或者 “local[1]” ,这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:socket,kafka,flume等) 的输入 Dstream ,那么这个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了,因此,本地运行时,一定要将master设置为”local[n]”, 其中 n > 接收器的个数(有关master的详情请参考 Spark Properties)

将Spark Streaming 应用置于集群中运行时,同样分配给应用的CPU core数必须大于接收器的总数,否则该应用就只会接收数据,而不会处理数据

2.1 基础数据源

在前面的例子中我们已经看到,使用ssc.socketTextStream(…) 可以从TCP连接重接收文本数据,而除了TCP socket外,StreamingContext API 还支持从文件或者AKKA actor中拉去数据

  • 文件数据流(File Streams)

可以从任何可兼容的HDFS API (hdfs,s3,nfs …) 的文件系统,创建方式如下

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

SparkStreaming 将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录),注意:

  • 各个文件数据格式必须一致
  • dataDirectory 中的文件必须通过moving 或者renaming 来创建
  • 一旦文件move 进dataDirectory 之后,就不能再改动,所以如果这个文件后续还有写入,这些新写入的数据不会被读取.

对于简单的文本文件,更简单的方式是调用 StreamingContext.textFileStream(dataDirectory)

另外,文件数据流不是基于接收器的,所以不需要为其单独分片一个CPU core

Python API fileStream 目前暂时不可用,Python目前只支持testFileStream

  • RDD队列数据流(Queue of RDDs as a Stream)

如果需要测试SparkStreaming应用,你可以创建一个基于一批RDD的DStream对象,只需要调用streamingContext.queueStream(queueOfRdds),RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据

关于套接字(socket),文件 以及akka actor 数据流更详细信息,请参考文档:StreamingContext for Scala,JavaStreamingContext for Java,and StreamingContext for Python。

2.2 高级数据源

自 Spark1.6.1 起 Python API 将支持 kafka,kinesis,flume,mqtt 这些数据源。

使用这类数据源需要以来一些额外的代码库,有些依赖还挺复杂(如:kafka,flume),因此为了减少依赖项版本冲突,各个数据源DStream的相关功能被分割到不同的代码包中,只有用到的时候才需要链接打包进来,例如你需要使用Twitter的tweets 作为数据源,可以参考一下步骤:

  1. linking : 将spark-streaming-twitter_2.10 依赖加入到项目依赖中
  2. Programming: 导入TwitterUtils 类,然后调用TwitterUtils.createStream 创建一个Dstream
import org.apache.spark.streaming.twitter._

TwitterUtils.createStream(ssc, None)
  1. deploying: 生成一个uber jar包,并包含其所有依赖项(包括spark-streaming-twitter_2.10及其自身依赖树),再部署这个jar包,部署详见(Deploying section)

高级数据源在spark-shell 中不可用,因此不能用spark-shell来测试基于高级数据源的应用,如果真的需要的话,你需要自行下载对应的数据源的maven工具及其依赖,并将这个jar部署到spark-shell 的classpath中

下面列举了一些高级数据源

2.2.1 Spark-streaming kafka 代码示例

  • 添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
  • 代码
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by hnbia on 2017/4/4.
  * spark streaming -kafka
  *
  */
object SparkStreaming_Kafka {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkStreaming_Kafka").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))
    //制定kafka的broker 信息以及消费者信息
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master1:9092,master2:9092,slave1:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "mygroupkafka",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //topic 集合 可以指定多个topic
    val topics = Array("test")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => (record.key, record.value)).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

2.2.2 spark-streaming flume 代码示例

  • Maven 依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
  • 配置flume 使用avro sink
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 定义sink

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9999

# 绑定
a1.sources.r1.channels = c1
a1.sinks.channel = c1
  • 代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume._

/**
  * Created by hnbia on 2017/4/4.
  */
object SparkStreaming_Flume {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkStreaming_Flume").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val flumeStream = FlumeUtils.createStream(ssc, "master1", 9999)
    flumeStream.map(e => e.event.getBody.toString)
    println(flumeStream)
  }
}

2.2.3 Spark-streaming kafka flume 整合

  • 配置 flume
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

#定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#定义sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = slave1:9092

#绑定
a1.sources.r1.channels = c1
a1.sinks.channel = c
  • Spark-Streaming 代码示例

Window length:跨多少Dstream (必须是batch interval倍数)

Window slide:滑动长度(必须是batch interval倍数,多久进行窗口操作)

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * Created by hnbia on 2017/4/4.
  */
object SparkStreaming_Window {
    def main(args: Array[String]): Unit = {
      println("开始 流处理 。 。 。")
      val conf = new SparkConf().setAppName("SparkStreamingDemo1").setMaster("local[2]")
      val ssc = new StreamingContext(conf,Seconds(5))

      //设置保存点 用于故障恢复
      //ssc.checkpoint("/tmp/spark/checkpoint")
      ssc.checkpoint("D:\\spark")

      //创建离散流,
      val streamLines = ssc.socketTextStream("master1", 9999,StorageLevel.MEMORY_AND_DISK_SER)
      // 按照窗口进行消息计数 以及打印
      streamLines
        .flatMap(_.split(" "))
        .map((_,1))
        .reduceByKeyAndWindow((a:Int,b:Int)=>{a+b},Seconds(15),Seconds(10))
        .print()
      //streamLines.reduceByWindow((_+_),Seconds(15),Seconds(10))
      // 启动流计算
      ssc.start()
      // 等待结束
      ssc.awaitTermination()
    }
}

2.3 接收器的可靠性

从可靠性角度划分,大致有两种数据源,其中kafka,flume这样的数据源,他们支持对所传输的数据进行确认,系统受到这类可靠数据源过来的数据,然后发出确认信息,这样就能够保证任何失败的情况下都不会丢失数据,因此我们可以将接收器相应地分为两类:

1.可靠接收器(Reliable Receiver) - 可靠接收器会在成功接收并保存好Spark数副本后,向可靠数据源发送确认信息

2.不可靠接收器(Unreliable Receiver) - 不可靠接收器不会发送任何确认信息,不过这种接收器常用于不支持确认的数据源,或者不想引入确认数据的复杂性的数据源

3. DStream 支持的算子

3.1 转换算子

和RDD类似,DStream也支持从输入DStream经过各种transformation 算子映射成新的Dstream,DStream支持很多RDD 常见的transformation算子,常用的见下表

Transformation算子 用途
map(func) 将源DStream中每个元素通过func映射为新的元素 ,返回会一个新的DStream
flatMap(func) 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出
filter(func) 过滤经过 func 函数计算之后返回 true 的元素,返回一个新的DStream
repartition(numPartitions) 更改DStream的并行度,增加或减少分区数
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的并集
count() 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数
reduce(func) 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。
countByValue() 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。
reduceByKey(func, [numTasks]) 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数。
join(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。
cogroup(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。
updateStateByKey(func) 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。
  • UpdateStateByKey 算子

updateStateByKey算子支持维护一个任意的状态,要实现这一点,只需要两步:

1.定义状态 - 状态数据可以是任意类型

2.定义状态更新函数 - 定义好一个函数,其输入为数据流之前的状态和新的数据流数据,且可更新步骤1 中定义的输入数据流的状态.

在每一批次数据到达后,spark都会调用状态更新函数来更新所有已有key 的状态.(不管key 是否存在本批次中) 如果状态更新函数返回None,则对应的键值对会被删除

举例如下,假设你需要维护一个流式应用,统计数据流中每个单词出现的次数,这里将各个单词出现的次数这个整数定义为状态,我们接下来定义状态更新函数

该状态更新函数会为每个单词调用一次,且响应的newValues 是一个包含很多个 “1” 的数组(这些1 来自于(word,1) 键值对),而runningCount包含之前该单词的计数,

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[2]")
    // 创建streamingContext 每5秒一个批次
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("d:/data")

    // Initial state RDD for mapWithState operation
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using mapWithState
    // This will give a DStream made of state (which is the cumulative count of the words)
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

transform 算子(及其变体transformWith) 可以支持任意RDF到RDD的映射操作,也就是说,你可以用transform算子来包装任何Dstream 不支持的RDD算子,例如,将DStream每个批次中的RDD和另一个DataSet 进行关联(join) 操作,这个功能Dstream API并没有直接支持,不过你可以用transform来实现这个功能,可见 transform其实为Dstream 提供了非常强大的功能支持,比如说你可以用事先算好的信息,对DStream进行实时过滤


val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾信息的RDD
val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // 将DStream中的RDD和spamInfoRDD关联,并实时过滤垃圾数据
  ...
})

注意,这里transform包含的算子,其调用的时间间隔和批次间隔是相同的,所以你可以基于时间改变对RDD的操作,如: 在不同批次,调用不同的RDD算子,设置不同的RDD分区或者广播变量等

3.2 窗口算子

spark streaming 同样也提供基于时间窗口的计算,也就是说,你可以对某一个滑动时间窗口内的数据施加特定的transformation算子。

每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新的windowed Dstream ,在上图的例子中,这个操作会施加于三个RDD单元,而滑动距离是两个RDD 单元,由此可以得出任何窗口的相关操作都需要执行以下两个参数:

  1. (窗口长度) window length - 窗口覆盖的时间长度,多长时间进行一次计算(上图中为3)

  2. (滑动距离) sliding interval - 窗口启动的时间间隔(上图中为2)

注意这两个参数都必须是DStream批次间隔的整数倍数

举例,加入,你需要扩展前面的代码,需要每个十秒同意下前30秒内的单词计数,为此我们需要在包含(word,1) 键值对的DStream上,对最近30秒的数据调用reduceByKey算子,不过这些都是可以简单的用一个reduceBykeyAndWindow 来搞定

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
  • 以下列出了常用的窗口算子,这些算子都有前面提到的两个参数,- 窗口长度和滑动距离.
Transformation窗口算子 用途
window(windowLength, slideInterval) 将源DStream窗口化,并返回转化后的DStream
countByWindow(windowLength,slideInterval) 返回数据流在一个滑动窗口内的元素个数
reduceByWindow(func, windowLength,slideInterval) 基于数据流在一个滑动窗口内的元素,用func做聚合,返回一个单元素数据流。func必须满足结合律,以便支持并行计算。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) 基于(K, V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每个value都是各个key经过func聚合后的结果。 注意:如果不指定numTasks,其值将使用Spark的默认并行任务数(本地模式下为2,集群模式下由 spark.default.parallelism决定)。当然,你也可以通过numTasks来指定任务个数。
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) 和前面的reduceByKeyAndWindow() 类似,只是这个版本会用之前滑动窗口计算结果,递增地计算每个窗口的归约结果。当新的数据进入窗口时,这些values会被输入func做归约计算,而这些数据离开窗口时,对应的这些values又会被输入 invFunc 做”反归约”计算。举个简单的例子,就是把新进入窗口数据中各个单词个数“增加”到各个单词统计结果上,同时把离开窗口数据中各个单词的统计个数从相应的统计结果中“减掉”。不过,你的自己定义好”反归约”函数,即:该算子不仅有归约函数(见参数func),还得有一个对应的”反归约”函数(见参数中的 invFunc)。和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。注意,这个算子需要配置好检查点(checkpointing)才能用。
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于包含(K, V)键值对的DStream,返回新的包含(K, Long)键值对的DStream。其中的Long value都是滑动窗口内key出现次数的计数。 和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。
  • 代码示例
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * Created by hnbia on 2017/4/4.
  */
object SparkStreaming_Window {
    def main(args: Array[String]): Unit = {
      println("开始 流处理 。 。 。")
      val conf = new SparkConf().setAppName("SparkStreamingDemo1").setMaster("local[2]")
      val ssc = new StreamingContext(conf,Seconds(5))

      //设置保存点 用于故障恢复
      //ssc.checkpoint("/tmp/spark/checkpoint")
      ssc.checkpoint("D:\\spark")

      //创建离散流,
      val streamLines = ssc.socketTextStream("master1", 9999,StorageLevel.MEMORY_AND_DISK_SER)
      // 按照窗口进行消息计数 以及打印
      streamLines
        .flatMap(_.split(" "))
        .map((_,1))
        .reduceByKeyAndWindow((a:Int,b:Int)=>{a+b},Seconds(15),Seconds(10))
        .print()
      //streamLines.reduceByWindow((_+_),Seconds(15),Seconds(10))
      // 启动流计算
      ssc.start()
      // 等待结束
      ssc.awaitTermination()
    }
}

3.3 Join 算子

3.3.1 数据流与数据流做关联

在spark streaming中做各种关联(join) 操作非常简单,一个数据流可以和另一个数据流直接关联。如:

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2) 

上面代码中 stream1 的每个批次中的RDD 会和stream2 相应批次中的RDD 进行join,同样,你也可以类似的使用 leftOuterJoin,rightOuterJoin,fullOuterJoin 等,此外,你还可以基于窗口来join 不同的数据流,其实实现也很简单,例如

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

3.3.2 数据流与数据集做关联

其实这种情况已经在前面的Dstream.transform算子中介绍过来,这里再举个基于滑动窗口的例子.

val dataset: RDD[String, String] = ...

val windowedStream = stream.window(Seconds(20))...

val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

实际上,上面代码里,你可以动态地该表join 的数据集(dataset),传给tranform 算子的操作函数会在每个批次重新求值,所以每次该函数都会用最新的dataset值,所以不同批次间 你可以改变dataset的值

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by hnbia on 2017/4/5.
  */
object SparkStreaming_Window_DataFrame {
  def main(args: Array[String]): Unit = {
    def main(args: Array[String]): Unit = {
      println("开始 流处理 。 。 。")
      val conf = new SparkConf().setAppName("SparkStreamingDemo1").setMaster("local[2]")
      val ssc = new StreamingContext(conf,Seconds(5))

      //设置保存点 用于故障恢复
      //ssc.checkpoint("/tmp/spark/checkpoint")
      ssc.checkpoint("D:\\spark")

      //创建离散流,
      val streamLines = ssc.socketTextStream("master1", 9999,StorageLevel.MEMORY_AND_DISK_SER)
      streamLines
        .window(Seconds(6000),Seconds(10))
        .flatMap(_.split(" "))
        .foreachRDD(rdd=>{
          val sess = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
          import sess.implicits._
          val df = rdd.toDF("word")
          df.createGlobalTempView("words")
          sess.sql("select word,count(word) as c from words group by word order by c desc").show()
        })
      // 启动流计算
      ssc.start()
      // 等待结束
      ssc.awaitTermination()
    }
  }
}

3.4 输出算子

输出算子可以将DStream的数据推送到外部系统,如: 数据库或者文件系统。因为输出算子将最终完成转换的数据输出到外部系统,因此只有输出算组调用时,才会真正出发Dstream transformation 算子的真正执行,这一点类似于RDD的action 算子。目前支持的输出算子如下

输出算子 用途
print() 在 driver 节点上打印DStream每个批次中的头十个元素
saveAsTextFiles(prefix, [suffix]) 将DStream的内容保存到文本文件。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”
saveAsObjectFiles(prefix, [suffix]) 将DStream内容以序列化Java对象的形式保存到顺序文件中。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python
saveAsHadoopFiles(prefix, [suffix]) 将DStream内容保存到Hadoop文件中。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python
foreachRDD(func) 这是最通用的输出算子了,该算子接收一个函数func,func将作用于DStream的每个RDD上。 func应该实现将每个RDD的数据推到外部系统中,比如:保存到文件或者写到数据库中。 注意,func函数是在streaming应用的驱动器进程中执行的,所以如果其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。

Dstream.foreachRDD是一个非常强大的原生工具函数,用户可以基于此算子将Dstream 数据推送到外部系统中,不过用户需要了解如何正确而高效的使用这个工具,一下举了些常见的错误,

通常,对外部系统写入数据需要一些链接对象(如: 远程server的TCP链接等),以便发送数据给远程系统,因此,开发人员可能会不经意地在Spark驱动器(driver)进程中创建一个链接对象,然后又视图在Spark worker节点上使用这个链接,如下所示

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 这行在驱动器(driver)进程执行
  rdd.foreach { record =>
    connection.send(record) // 而这行将在worker节点上执行
  }
}

这段代码是错误的,因为它需要把链接对象序列化,再动驱动器节点发送到worker节点,而这些链接对象通常都是不能夸节点(机器)传递的,比如,链接对象通常都不能序列化,或者在另一个进程中反序列化后再次初始化等.(连接对象通常需要初始化,因此从驱动节点发到worker节点可能需要重新初始化) 解决此类错误的办法就是在worker节点上创建连接对象,

然而有些开发人员可能会走另一个极端,为每一条记录都创建一个连接对象,例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

一般来说,连接对象是有时间和资源开销限制的,因此,对每条记录都进行一次连接对象的创建和销毁会增加很多不必要的开销,同事也大大减小了系统的吞吐量,而一个比较好的解决方案就是使用rdd.foreachPartition - 为RDD的每个分区创建一个单独的连接对象 ,示例如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

这样一来,连接对象的创建开销就摊到很多条记录上了.

最后还有一个更优的办法,就是在很多个RDD批次之间复用连接对象,开发者可以维护一个静态连接池来保存对象,以便在不同批次的多个RDD之前共享同一组连接对象,示例如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool 是一个静态的、懒惰初始化的连接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 将连接返还给连接池,以便后续复用之
  }
}

注意连接池中的连接应该是懒惰创建的,并且有确定的超时时间,超时后自动销毁,这个实现应该是目前发送数最高的实现方式.

其他要点:

  • DStream的转化执行也是懒惰的,需要输出算子来出发,这一点和RDD的懒惰执行由action算子触发很类似,特别的,DStream输出算子中包含RDD的action算子会强制触发对所有接收数据的处理。因此,如果你的Streaming应用中没有输出算子,或者你用了dstream.foreachRdd(func) 缺没有在func中调用RDD的action 算子,那么这个应用只会接收数据,而不会处理数据,接收都的数据最后只是被简单的丢弃掉了,
  • 默认的,输出算子只能一次执行一次,且按照他们在应用程序中定义的顺序执行

3.5 DataFrame和SQL相关算子

在Streaming 应用中可以调用 DataFrames and SQL 来处理流式数据。开发者可以通过StreamingContext 中的SparkContext对象来创建SQLContext ,并且,开发者需要确保一旦驱动器(driver) 故障回复后,该SQLContext对象能重新创建出来,同样,你还是可以使用懒惰创建的单利模式来实例化SQLContext,如下面代码所示,我们将最开始那个例子多了一些修改,使用DataFeame 和SQL来统计单词数,其实就是将每个RDD都转化成一个RDDFrame,然后注册成临时表,再用SQL查询这些临时表

/** streaming应用中调用DataFrame算子 */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
  // 获得SQLContext单例
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._
  // 将RDD[String] 转为 DataFrame
  val wordsDataFrame = rdd.toDF("word")
  // DataFrame注册为临时表
  wordsDataFrame.registerTempTable("words")
  // 再用SQL语句查询,并打印出来
  val wordCountsDataFrame =
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

也可以在其他线程里执行SQL查询(异步查询,即: 执行SQL查询的线程和运行StreamingContext的线程不同)。不过这种情况下,你需要确保查询的时候StreamingContext没有把所需的数据丢弃掉,否则StreamingContext有可能已经将老的RDD数据丢弃掉,那么异步查询的sql语句也可能不发得到查询结果,


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark Streaming 3. 数据广播与检查点 Spark Streaming 3. 数据广播与检查点
1. 缓存/持久化和RDD类似, DStream也支持将数据持久化到内存中, 只需要调用DStream的persist()方法, 该方法内部会调用DStream中每个RDD的persist()方法, 进而将数据持久化到内存中, 这对于可能需
2018-03-04
下一篇 
Spark Streaming 1. 介绍 Spark Streaming 1. 介绍
1. 介绍Spark Streamingspark streaming 是 spark 核心 api 的扩展,支持可扩展,高吞吐量,实时数据流的容错处理,数据可以从 kafka,flume,Kinesis 或者 TCP socket 中获取
2018-02-24
  目录