1. 介绍Spark Streaming
spark streaming 是 spark 核心 api 的扩展,支持可扩展,高吞吐量,实时数据流的容错处理,数据可以从 kafka,flume,Kinesis 或者 TCP socket 中获取数据,并且可以使用高级复杂的算法来处理获取到的数据,像 map 、reduce、join、window 等,处理的数据可以存储到hdfs,关系型数据库或者实时展示,还可以将 spark mllib 和 图计算 算法应用于spark streaming。
如下图所示,它的内部工作原理是 spark streaming 接收实时输入数据流并将数据分批,然后由spark引擎处理,以批量生成结果流。
Spark streaming 提供了一个高度抽象的概念 称之为 discretized stream 或者 Dstream,Dstream 可以通过kafka,flume 或者Kinesis等来源的出入数据流创建,也可以通过在其他DStream上的高级操作来创建,在内部一个Dstream 被标识为一系列的RDD。
2. Spark Stream 代码示例
2.1 初始化 Streaming
要初始化SparkStreaming程序,必须创建一个StreamingContext 对象,这是所有SparkStreaming功能的主要入口
streamingContext可以通过SparkConf创建,也可以通过现在的SparkContext对象创建
1 | import org.apache.spark._ |
当程序运行在集群中时,当不希望在程序中硬编码master,而是希望用sparksubmit启动该应用程序,并从spark-submit中得到master的值. 对于本地测试或者单元测试,你可以传递 “local” 字符串在同一个进程内运行spark Streaming. 需要注意的是,它在内部创建了一个SparkContext对象,你可以通过ssc.sparkContext访问这个SparkContext对象
StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定,详见Spark性能调优(Performance Tuning)
- 创建 streamingContext之后,需要进行以下操作
定义输入源,通过创建输入Dstream
定义流计算,通过应用 transformation 和 output 操作得到Dstreams
使用streamingContext.start() 开启接收数据和处理数据
使用streamingContext.awaitTermination()等待进程停止(手动或由于错误)
手动停止可以使用streamingContext.stop()
需要注意的几点
StreamingContext一旦启动不能针对其计算逻辑进行修改或添加
streamingContext一旦被stop掉就不能restart
单个jvm虚拟机同一时间只能包含一个active的streamingContext
streamingContext.stop() 会把关联的SparkContext对象stop掉,如果不想stop SparkContext 可以将StreamingContext.stop() 的可选参数 stopSparkContext 设置为false
一个SparkContext对象可以和多个StreamingContext 对象关联,只要先对前一个创建StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可
Spark 支持的数据源版本
| 数据源 | Maven工件 |
|---|---|
| Kafka | spark-streaming-kafka_2.10 |
| Flume | spark-streaming-flume_2.10 |
| Kinesis | spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
| spark-streaming-twitter_2.10 | |
| ZeroMQ | spark-streaming-zeromq_2.10 |
| MQTT | spark-streaming-mqtt_2.10 |
2.2 添加依赖
1 | <dependency> |
2.3 安装 netcat
1 | yum install netcat |
2.4 spark streaming 代码
1 | import org.apache.spark.SparkConf |