Spark Streaming 6. Exactly-Once解决方案


1 Exactly-Once事务处理

1.1 什么是Exactly-Once事务?

数据仅处理一次并且仅输出一次,这样才是完整的事务处理。

以银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,保证事务的一致性,也就是说事务输出,能够输出且只会输出一次,即A只转一次,B只收一次。

1.2 从事务视角解密Spark Streaming架构

如下图,Spark Streaming应用程序启动,会分配资源,除非整个集群硬件资源崩溃,一般情况下都不会有问题。Spark Streaming程序分成两部分,一部分是Driver,另外一部分是Executor。Receiver接收到数据后不断发送元数据给Driver,Driver接收到元数据信息后进行CheckPoint处理。其中CheckPoint的元数据包括:Configuration(含有Spark Conf、Spark Streaming等配置信息)、Block MetaData、DStreamGraph、未处理完和等待中的Job。当然Receiver可以在多个Executor节点的上执行Job,Job的执行完全基于SparkCore的调度模式进行的。

Executor只有函数处理逻辑和数据,外部InputStream流入到Receiver中通过BlockManager写入磁盘、内存、WAL进行容错。WAL先写入磁盘然后写入Executor中,失败可能性不大。如果1G数据要处理,Executor一条一条接收,Receiver接收数据是积累到一定记录后才会写入WAL,如果Receiver线程失败时,数据有可能会丢失。

Driver处理元数据前会进行CheckPoint,Spark Streaming获取数据、产生作业,但没有解决执行的问题,执行一定要经过SparkContext。Driver级别的数据修复从Driver CheckPoint中需要把数据读入,在其内部会重新构建SparkContext、StreamingContext、SparkJob,再提交Spark集群运行。Receiver的重新恢复时会通过磁盘的WAL从磁盘恢复过来。

1.3 数据可能丢失的情况及通常的解决方式

​ 在Receiver收到数据且通过Driver的调度Executor开始计算数据的时候,如果Driver突然奔溃,则此时Executor会被杀死,那么Executor中的数据就会丢失(如果没有进行WAL的操作)。

​ 此时就必须通过例如WAL的方式,让所有的数据都通过例如HDFS的方式首先进行安全性容错处理。此时如果Executor中的数据丢失的话,就可以通过WAL恢复回来。

​ 注意:这种方式的弊端是通过WAL的方式会极大额损伤SparkStreaming中Receivers接收数据的性能。

1.4 Exactly-Once事务处理如何解决数据零丢失?

解决数据零丢失,必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全。

​ 我们以数据来自Kafka为例,运行在Executor上的Receiver在接收到来自Kafka的数据时会向Kafka发送ACK确认收到信息并读取下一条信息,Kafka会通过updateOffset来记录Receiver接收到的偏移,这种方式保证了在Executor数据零丢失。在Driver端,定期进行checkpoint操作,出错时从Checkpoint的文件系统中把数据读取进来进行恢复,内部会重新构建StreamingContext(也就是构建SparkContext)并启动,恢复出元数据metedata,再次产生RDD,恢复的是上次的Job,然后再次提交到集群执行。

1.5 Exactly-Once事务处理如何解决数据重复读取?

​ 在Receiver收到数据保存到HDFS等持久化引擎但是没有来得及进行updateOffsets(以Kafka为例),此时Receiver崩溃后重新启动就会通过管理Kafka的Zookeeper中元数据再次重复读取数据,但是此时Spark Streaming认为是成功的,但是Kafka认为是失败的(因为没有更新offset到ZooKeeper中),此时就会导致数据重新消费的情况。

​ 以Receiver基于ZooKeeper的方式,当读取数据时去访问Kafka的元数据信息,在处理代码中例如foreachRDD或transform时,将信息写入到内存数据库中(memorySet),在计算时读取内存数据库信息,判断是否已处理过,如果以处理过则跳过计算。这些元数据信息可以保存到内存数据结构或者memsql,sqllite中。

​ 注意:如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接收的时候又会有数据副本,这个时候其实是存储资源的浪费。Spark在1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统。此时兼具有流的优势和文件系统的优势,至此Spark Streaming+Kafka就构建了完美的流处理世界(1,数据不需要拷贝副本;2,不需要WAL对性能的损耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通过Kafka API直接消息数据,直接管理Offset,所以也不会重复消费数据。

2 输出不重复的解决办法

2.1 为什么会有数据输出多次重写这个问题?

因为Spark Streaming在计算的时候基于Spark Core天生会做以下事情导致Spark Streaming的结果(部分)重复输出:

  • Task重试
  • 慢任务推测
  • Stage重试
  • Job重试

2.2 具体解决方案?

  • 设置spark.task.maxFailures次数为1,这样就不会有Task重试了。
  • 设置spark.speculation为关闭状态,就不会有慢任务推测了,因为慢任务推测非常消耗性能,所以关闭后可以显著提高Spark Streaming处理性能。
  • Spark Streaming On Kafka的话,Job失败后可以设置Kafka的参数auto.offset.reset为largest方式。

2.3 kafka auto.offset.reset 值含义解释

  • 当各分区下有已提交的offset时(默认使用latest)
参数 说明
earliest 从提交的offset开始消费;无提交的offset时,从头开始消费
latest 从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none 从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  • 测试某一个分区有提交offset
参数 结果
none 当该topic下所有分区中存在未提交的offset时,抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

3. 使用示例

在实时流式计算中, 最重要的是在任何情况下, 消息不重复、比丢失, 即Exactly-once。 下面以 kafka -> Spark Streaming -> Redis 为例, 一方面说明一下如何做到 Exactly-once, 另一方面说明一下计算实时如何做去重指标

3.1 关于数据源

数据源是文本格式的日志, 由Nginx 产生, 存放在日志服务器上。 在日志服务器上部署flume Agent, 使用 TAILDIR Source 和kafka sink, 将日志采集到kafka进行临时存储, 日志格式如下:

1
2
3
4
5
6
7
8
9
10
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=DEIBAH&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

日志是由测试程序模拟产生的,字段之间由|~|分隔。

3.2 实时计算需求

分天、分小时PV;

分天、分小时、分网站(siteid)PV;

分天 UV;

3.3 消费Kafka数据

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

在Spark Streaming 中消费kafka数据, 保证Exactly-once 的核心有三点:

  1. 使用Direct 方式连接kafka

  2. 自己保存和维护Offset

  3. 更新Offset和计算在同一事务中完成

后面的Spark Streaming 程序, 主要有一下步骤:

  1. 启动后, 先从 Redis 中获取上次保存的 Offset, Redis 中的可以 为 topic_partition , 即每个分区维护一个Offset

  2. 使用获取到的 Offset, 创建 DirectStream

  3. 在处理每批次的消息时, 利用 Redis 的事物机制, 确保在 Redis 中指标的计算和 Offset 的更新维护在同一事物中完成。 只有这两者同步, 才能真正保证消息的 Exactly-once

  • 执行脚本
1
2
3
4
5
6
7
8
9
10
11
./spark-submit \
--class com.hnbian.spark.TestSparkStreaming \
--master local[2] \
--conf spark.streaming.kafka.maxRatePerPartition=20000 \
--jars /opt/hnbian/realtime/commons-pool2-2.3.jar,\
/opt/hnbian/realtime/jedis-2.9.0.jar,\
/opt/hnbian/realtime/kafka-clients-0.11.0.1.jar,\
/opt/hnbian/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
/opt/hnbian/realtime/testsparkstreaming.jar \
--executor-memory 4G \
--num-executors 1

在启动Spark Streaming 程序的时候, 有个参数最好指定:

spark.streaming.kafka.maxRatePerPartition=20000 ( 每秒钟从topic 的每个partition 最多消费的消息数)

如果程序第一次运行, 或者因为某种原因暂停了很久, 重新启动的时候, 会积累很多消息, 如果这些消息同事被消费, 很可能会因为内存不够而挂掉, 因为需要根据实际的数据量大小, 以及批次的间隔时间来说设置该参数, 以限定批次的消息量。

如果该参数设置20000, 而批次间隔时间为10s, 那么每个批次最多从kafka中消费20w消息

3.4 Redis中的数据模型

  • 分小时、分网站PV

普通K-V结构,计算时候使用incr命令递增,

Key为 site_pv_网站ID_小时

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

该数据模型用于计算分网站的按小时及按天PV。

  • 分小时PV

普通K-V结构,计算时候使用incr命令递增,

Key为 pv_小时,如:pv_2018-02-21-14、pv_2018-02-22-03

该数据模型用于计算按小时及按天总PV。

  • 分天UV

Set结构,计算时候使用sadd命令添加,

Key为 uv_天,如:uv_2018-02-21、uv_2018-02-20

该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)

注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。

3.5 故障恢复

如果 Spark Streaming 程序因为停电、网络等意外情况而终止需要恢复, 则直接重启即可;

如果因为其他原因需要重新计算某一时间段的消息, 可以先删除Redis中对应时间段内的key, 然后从原始日志中截取该段时间内的消息, 当做新的消息添加至kafka, 由Spark streaming 程序重新消费并进行计算;

3.6 附程序

依赖jar包:

1
2
3
4
commons-pool2-2.3.jar
jedis-2.9.0.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka-0-10_2.11-2.2.1.jar

3.7 代码示例

  • Redis 连接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import redis.clients.jedis.JedisPool
import org.apache.commons.pool2.impl.GenericObjectPoolConfig

/**
* Redis连接池
*/
object InternalRedisClient extends Serializable {

// JedisPool对象使用transient关键字标记为不可序列化
@transient private var pool: JedisPool = _

/**
* 创建一个Redis连接池
*
* @param host Redis主机名或IP地址
* @param port Redis端口号
* @param timeout 连接超时时间(毫秒)
* @param maxTotal 最大连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
*/
def makePool(host: String, port: Int, timeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
// 调用第二个重载方法,使用默认值设置testOnBorrow、testOnReturn和maxWaitMillis
makePool(host, port, timeout, maxTotal, maxIdle, minIdle, testOnBorrow = true, testOnReturn = false, maxWaitMillis = 10000)
}

/**
* 创建一个Redis连接池
*
* @param host Redis主机名或IP地址
* @param port Redis端口号
* @param timeout 连接超时时间(毫秒)
* @param maxTotal 最大连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
* @param testOnBorrow 是否在获取连接时测试连接的可用性(默认为true)
* @param testOnReturn 是否在归还连接时测试连接的可用性(默认为false)
* @param maxWaitMillis 获取连接的最大等待时间(毫秒,默认为-1,表示一直等待)
*/
def makePool(host: String, port: Int, timeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if (pool == null) { // 如果连接池未被创建,则创建连接池
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal) // 设置连接池的最大连接数
poolConfig.setMaxIdle(maxIdle) // 设置连接池的最大空闲连接数
poolConfig.setMinIdle(minIdle) // 设置连接池的最小空闲连接数
poolConfig.setTestOnBorrow(testOnBorrow) // 是否在获取连接时测试连接的可用性
poolConfig.setTestOnReturn(testOnReturn) // 是否在归还连接时测试连接的可用性
poolConfig.setMaxWaitMillis(maxWaitMillis) // 获取连接的最大等待时间
pool = new JedisPool(poolConfig, host, port, timeout) // 创建一个Jedis连接池
val hook = new Thread {
override def run(): Unit = pool.destroy() // 定义一个线程,在JVM停止时销毁连接池
}
sys.addShutdownHook(hook.run()) // 添加JVM关闭钩子
}
}

/**
* 获取Redis连接池对象
*
* @return Redis连接池对象
*/
def getPool: JedisPool = {
assert(pool != null, "Redis连接池未被初始化!") // 断言连接池已被创建
pool
}
}
  • TestSparkStreaming
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
	import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import redis.clients.jedis.Pipeline

object TestSparkStreaming {

def main(args: Array[String]): Unit = {

//Kafka参数
val brokers = "datadev1:9092"
val topic = "exactly-once"
val partition: Int = 0 //测试topic只有一个分区
val start_offset: Long = 0L
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "exactly-once",
"enable.auto.commit" -> (false: java.lang.Boolean),
"auto.offset.reset" -> "none"
)

// Redis 配置
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "172.16.213.79"
val redisPort = 6379
val redisTimeout = 30000
//默认db,用户存放Offset和pv数据
val dbDefaultIndex = 8
//创建连接池
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

//创建 StreamingContext
val conf = new SparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(10))

//从Redis获取上一次存的Offset
val jedis = InternalRedisClient.getPool.getResource
try {
jedis.select(dbDefaultIndex)
val topic_partition_key = topic + "_" + partition
var lastOffset: Long = 0L
val lastSavedOffset = jedis.get(topic_partition_key)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case ex: Exception =>
println(ex.getMessage)
println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "] ")
System.exit(1)
}
}
println("lastOffset from redis -> " + lastOffset)
} finally {
InternalRedisClient.getPool.returnResource(jedis)
}

//设置每个分区起始的Offset
val fromOffsets = Map(new TopicPartition(topic, partition) -> lastOffset)

//使用 Direct API 创建 stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
fromOffsets.keys.toList,
kafkaParams,
fromOffsets
)
)

//开始处理批次消息
stream.foreachRDD { rdd =>

//获取 offsetRanges
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

//将日志转换成 MyRecord 对象数组 rdd
val result = processLogs(rdd)

println("=============== Total " + result.length + " events in this batch ..")

val jedis = InternalRedisClient.getPool.getResource
val p1: Pipeline = jedis.pipelined()
try {
//选择默认数据库
p1.select(dbDefaultIndex)
//开启事务
p1.multi()

//逐条处理消息
result.foreach { record =>
//增加小时总 pv
val pv_by_hour_key = "pv_" + record.hour
p1.incr(pv_by_hour_key)
//增加网站小时 pv
val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
p1.incr(site_pv_by_hour_key)
//使用 Set 保存当天的 uv
val uv_by_day_key = "uv_" + record.hour.substring(0, 10)
p1.sadd(uv_by_day_key, record.user_id)
}

//更新 Offset
offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition +
" fromOffset: " + offsetRange.fromOffset +
" untilOffset: " + offsetRange.untilOffset)
val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
p1.set(topic_partition_key, offsetRange.untilOffset + "")
}

//提交事务
p1.exec()
} catch {
case ex:Exception =>println(ex.getMessage)
} finally {
//关闭 pipeline
p1.sync()
InternalRedisClient.getPool.returnResource(jedis)
}
}
//定义 MyRecord 类
case class MyRecord(hour: String, user_id: String, site_id: String)
//处理消息的函数,将 ConsumerRecord 转换为 MyRecord
def processLogs(messages: RDD[ConsumerRecord[String, String]]): Array[MyRecord] = {
messages.map(_.value()).flatMap(parseLog).collect()
}
//解析每条日志,生成 MyRecord
def parseLog(line: String): Option[MyRecord] = {
val ary: Array[String] = line.split("\\|~\\|", -1)
try {
val hour = ary(0).substring(0, 13).replace("T", "-")
val uri = ary(2).split("[=|&]", -1)
val user_id = uri(1)
val site_id = uri(3)
return Some(MyRecord(hour, user_id, site_id))
} catch {
case ex: Exception =>
println(ex.getMessage)
}
return None
}
ssc.start()
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
JedisPoolConfig config = new JedisPoolConfig();

//连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
config.setBlockWhenExhausted(true);

//设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy");

//是否启用pool的jmx管理功能, 默认true
config.setJmxEnabled(true);

//MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好.
config.setJmxNamePrefix("pool");

//是否启用后进先出, 默认true
config.setLifo(true);

//最大空闲连接数, 默认8个
config.setMaxIdle(8);

//最大连接数, 默认8个
config.setMaxTotal(8);

//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(-1);

//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config.setMinEvictableIdleTimeMillis(1800000);

//最小空闲连接数, 默认0
config.setMinIdle(0);

//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config.setNumTestsPerEvictionRun(3);

//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config.setSoftMinEvictableIdleTimeMillis(1800000);

//在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false);

//在空闲时检查有效性, 默认false
config.setTestWhileIdle(false);

//逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
config.setTimeBetweenEvictionRunsMillis(-1);

JedisPool pool = new JedisPool(config, "localhost",);

int timeout=3000;
new JedisSentinelPool(master, sentinels, poolConfig,timeout);//timeout 读取超时

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录