1. 介绍Spark Streaming
spark streaming 是 spark 核心 api 的扩展,支持可扩展,高吞吐量,实时数据流的容错处理,数据可以从 kafka,flume,Kinesis 或者 TCP socket 中获取数据,并且可以使用高级复杂的算法来处理获取到的数据,像 map 、reduce、join、window 等,处理的数据可以存储到hdfs,关系型数据库或者实时展示,还可以将 spark mllib 和 图计算 算法应用于spark streaming。
如下图所示,它的内部工作原理是 spark streaming 接收实时输入数据流并将数据分批,然后由spark引擎处理,以批量生成结果流。
Spark streaming 提供了一个高度抽象的概念 称之为 discretized stream
或者 Dstream
,Dstream
可以通过kafka,flume 或者Kinesis等来源的出入数据流创建,也可以通过在其他DStream上的高级操作来创建,在内部一个Dstream 被标识为一系列的RDD。
2. Spark Stream 代码示例
2.1 初始化 Streaming
要初始化SparkStreaming程序,必须创建一个StreamingContext 对象,这是所有SparkStreaming功能的主要入口
streamingContext可以通过SparkConf创建,也可以通过现在的SparkContext对象创建
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
// appName : 在集群ui上面显示的名称
// master: spark 集群、Mesos 、yarn 集群的url, 或者以本地模式运行( local[4] )
val ssc = new StreamingContext(conf,Seconds(1))
当程序运行在集群中时,当不希望在程序中硬编码master,而是希望用sparksubmit启动该应用程序,并从spark-submit中得到master的值. 对于本地测试或者单元测试,你可以传递 “local” 字符串在同一个进程内运行spark Streaming. 需要注意的是,它在内部创建了一个SparkContext对象,你可以通过ssc.sparkContext访问这个SparkContext对象
StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定,详见Spark性能调优(Performance Tuning)
- 创建 streamingContext之后,需要进行以下操作
定义输入源,通过创建输入Dstream
定义流计算,通过应用 transformation 和 output 操作得到Dstreams
使用streamingContext.start() 开启接收数据和处理数据
使用streamingContext.awaitTermination()等待进程停止(手动或由于错误)
手动停止可以使用streamingContext.stop()
需要注意的几点
StreamingContext一旦启动不能针对其计算逻辑进行修改或添加
streamingContext一旦被stop掉就不能restart
单个jvm虚拟机同一时间只能包含一个active的streamingContext
streamingContext.stop() 会把关联的SparkContext对象stop掉,如果不想stop SparkContext 可以将StreamingContext.stop() 的可选参数 stopSparkContext 设置为false
一个SparkContext对象可以和多个StreamingContext 对象关联,只要先对前一个创建StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可
Spark 支持的数据源版本
数据源 | Maven工件 |
---|---|
Kafka | spark-streaming-kafka_2.10 |
Flume | spark-streaming-flume_2.10 |
Kinesis | spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
spark-streaming-twitter_2.10 | |
ZeroMQ | spark-streaming-zeromq_2.10 |
MQTT | spark-streaming-mqtt_2.10 |
2.2 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
2.3 安装 netcat
yum install netcat
2.4 spark streaming 代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by hnbia on 2017/4/4.
* windows nc ==>start==>nc -l -L -P 8888
* linux nc -lp 9999
* spark-submit --master spark://master1:7077 --class com.sbz.stream..SparkStreamingDemo2 shibz_spark-1.0-SNAPSHOT.jar
*/
object SparkStreamingDemo2 {
def main(args: Array[String]): Unit = {
// 创建一个 local StreamingContext , 包含两个工作线程,并将批次间隔设为3秒
// master至少需要两个CPU核,以避免出现任务饿死的情况
// 本地运行代码
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
// 利用这个上下文对象(StreamingContext),
// 我们可以创建一个Dstream, 该DStream代表从前面TCP 数据源流入的数据流, 同时TCP数据源是由主机名(hostname) 和端口号来描述的(9999)
val ssc = new StreamingContext(conf,Seconds(3))
// 创建一个连接到hostname:port的DStream,如:localhost:9999
// 这里的lines 就是从数据源 server 接收到的数据流。
// 其中每一条记录都是一行文本. 接下来, 我们就需要把这些文本按照空格分割成单次
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master1",9999)
// 将每一行分割成多个单词
// flatMap 是一种 "一到多" (one-to-many) 的映射算子,
// 它可以将源DStream中的每一条记录映射成多条记录, 从而产生一个新的DStream对象.
// 在本例中, lines中的每一行都会被flatMap映射为多个单次,从而生成新的words DStream对象, 然后我们就能对这些单次进行计数了
val words: DStream[String] = lines.flatMap(_.split(" "))
// 对每一批次中的单词进行计数
// words 这个DStream对象经过map 算子转换为键值对的DStream对象pairs, 再对pairs 使用reduce算子,得到各个批次中单词的出现频率, 最后 wordCounts.print() 将会每秒(前面设定的批次间隔) 打印单词计数到控制台
val pairs: DStream[(String,Int)] = words.map((_,1))
val wordCounts: DStream[(String,Int)] = pairs.reduceByKey(_+_)
// 将该DStream产生的RDD的头十个元素打印到控制台上
wordCounts.print()
// 启动流式计算
ssc.start()
// 等待直到计算终止
ssc.awaitTermination()
}
}