1 Exactly-Once事务处理
1.1 什么是Exactly-Once事务?
数据仅处理一次并且仅输出一次,这样才是完整的事务处理。
以银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,保证事务的一致性,也就是说事务输出,能够输出且只会输出一次,即A只转一次,B只收一次。
1.2 从事务视角解密Spark Streaming架构
如下图,Spark Streaming应用程序启动,会分配资源,除非整个集群硬件资源崩溃,一般情况下都不会有问题。Spark Streaming程序分成两部分,一部分是Driver,另外一部分是Executor。Receiver接收到数据后不断发送元数据给Driver,Driver接收到元数据信息后进行CheckPoint处理。其中CheckPoint的元数据包括:Configuration(含有Spark Conf、Spark Streaming等配置信息)、Block MetaData、DStreamGraph、未处理完和等待中的Job。当然Receiver可以在多个Executor节点的上执行Job,Job的执行完全基于SparkCore的调度模式进行的。
Executor只有函数处理逻辑和数据,外部InputStream流入到Receiver中通过BlockManager写入磁盘、内存、WAL进行容错。WAL先写入磁盘然后写入Executor中,失败可能性不大。如果1G数据要处理,Executor一条一条接收,Receiver接收数据是积累到一定记录后才会写入WAL,如果Receiver线程失败时,数据有可能会丢失。
Driver处理元数据前会进行CheckPoint,Spark Streaming获取数据、产生作业,但没有解决执行的问题,执行一定要经过SparkContext。Driver级别的数据修复从Driver CheckPoint中需要把数据读入,在其内部会重新构建SparkContext、StreamingContext、SparkJob,再提交Spark集群运行。Receiver的重新恢复时会通过磁盘的WAL从磁盘恢复过来。
1.3 数据可能丢失的情况及通常的解决方式
在Receiver收到数据且通过Driver的调度Executor开始计算数据的时候,如果Driver突然奔溃,则此时Executor会被杀死,那么Executor中的数据就会丢失(如果没有进行WAL的操作)。
此时就必须通过例如WAL的方式,让所有的数据都通过例如HDFS的方式首先进行安全性容错处理。此时如果Executor中的数据丢失的话,就可以通过WAL恢复回来。
注意:这种方式的弊端是通过WAL的方式会极大额损伤SparkStreaming中Receivers接收数据的性能。
1.4 Exactly-Once事务处理如何解决数据零丢失?
解决数据零丢失,必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全。
我们以数据来自Kafka为例,运行在Executor上的Receiver在接收到来自Kafka的数据时会向Kafka发送ACK确认收到信息并读取下一条信息,Kafka会通过updateOffset来记录Receiver接收到的偏移,这种方式保证了在Executor数据零丢失。在Driver端,定期进行checkpoint操作,出错时从Checkpoint的文件系统中把数据读取进来进行恢复,内部会重新构建StreamingContext(也就是构建SparkContext)并启动,恢复出元数据metedata,再次产生RDD,恢复的是上次的Job,然后再次提交到集群执行。
1.5 Exactly-Once事务处理如何解决数据重复读取?
在Receiver收到数据保存到HDFS等持久化引擎但是没有来得及进行updateOffsets(以Kafka为例),此时Receiver崩溃后重新启动就会通过管理Kafka的Zookeeper中元数据再次重复读取数据,但是此时Spark Streaming认为是成功的,但是Kafka认为是失败的(因为没有更新offset到ZooKeeper中),此时就会导致数据重新消费的情况。
以Receiver基于ZooKeeper的方式,当读取数据时去访问Kafka的元数据信息,在处理代码中例如foreachRDD或transform时,将信息写入到内存数据库中(memorySet),在计算时读取内存数据库信息,判断是否已处理过,如果以处理过则跳过计算。这些元数据信息可以保存到内存数据结构或者memsql,sqllite中。
注意:如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接收的时候又会有数据副本,这个时候其实是存储资源的浪费。Spark在1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统。此时兼具有流的优势和文件系统的优势,至此Spark Streaming+Kafka就构建了完美的流处理世界(1,数据不需要拷贝副本;2,不需要WAL对性能的损耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通过Kafka API直接消息数据,直接管理Offset,所以也不会重复消费数据。
2 输出不重复的解决办法
2.1 为什么会有数据输出多次重写这个问题?
因为Spark Streaming在计算的时候基于Spark Core天生会做以下事情导致Spark Streaming的结果(部分)重复输出:
- Task重试
- 慢任务推测
- Stage重试
- Job重试
2.2 具体解决方案?
- 设置spark.task.maxFailures次数为1,这样就不会有Task重试了。
- 设置spark.speculation为关闭状态,就不会有慢任务推测了,因为慢任务推测非常消耗性能,所以关闭后可以显著提高Spark Streaming处理性能。
- Spark Streaming On Kafka的话,Job失败后可以设置Kafka的参数auto.offset.reset为largest方式。
2.3 kafka auto.offset.reset 值含义解释
- 当各分区下有已提交的offset时(默认使用latest)
参数 | 说明 |
---|---|
earliest | 从提交的offset开始消费;无提交的offset时,从头开始消费 |
latest | 从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 |
none | 从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 |
- 测试某一个分区有提交offset
参数 | 结果 |
---|---|
none | 当该topic下所有分区中存在未提交的offset时,抛出异常。 |
earliest | 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 |
latest | 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 |
3. 使用示例
在实时流式计算中, 最重要的是在任何情况下, 消息不重复、比丢失, 即Exactly-once。 下面以 kafka -> Spark Streaming -> Redis 为例, 一方面说明一下如何做到 Exactly-once, 另一方面说明一下计算实时如何做去重指标
3.1 关于数据源
数据源是文本格式的日志, 由Nginx 产生, 存放在日志服务器上。 在日志服务器上部署flume Agent, 使用 TAILDIR Source 和kafka sink, 将日志采集到kafka进行临时存储, 日志格式如下:
2018-02-22T00:00:00+08:00||200||/test?pcid=DEIBAH&siteid=3
2018-02-22T00:00:00+08:00||200||/test?pcid=GLLIEG&siteid=3
2018-02-22T00:00:00+08:00||200||/test?pcid=HIJMEC&siteid=8
2018-02-22T00:00:00+08:00||200||/test?pcid=HMGBDE&siteid=3
2018-02-22T00:00:00+08:00||200||/test?pcid=HIJFLA&siteid=4
2018-02-22T00:00:01+08:00||200||/test?pcid=JCEBBC&siteid=9
2018-02-22T00:00:01+08:00||200||/test?pcid=KJLAKG&siteid=8
2018-02-22T00:00:01+08:00||200||/test?pcid=FHEIKI&siteid=3
2018-02-22T00:00:01+08:00||200||/test?pcid=IGIDLB&siteid=3
2018-02-22T00:00:01+08:00||200||/test?pcid=IIIJCD&siteid=5
日志是由测试程序模拟产生的,字段之间由|~|分隔。
3.2 实时计算需求
分天、分小时PV;
分天、分小时、分网站(siteid)PV;
分天 UV;
3.3 消费Kafka数据
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
在Spark Streaming 中消费kafka数据, 保证Exactly-once 的核心有三点:
使用Direct 方式连接kafka
自己保存和维护Offset
更新Offset和计算在同一事务中完成
后面的Spark Streaming 程序, 主要有一下步骤:
启动后, 先从 Redis 中获取上次保存的 Offset, Redis 中的可以 为
topic_partition
, 即每个分区维护一个Offset使用获取到的 Offset, 创建 DirectStream
在处理每批次的消息时, 利用 Redis 的事物机制, 确保在 Redis 中指标的计算和 Offset 的更新维护在同一事物中完成。 只有这两者同步, 才能真正保证消息的 Exactly-once
- 执行脚本
./spark-submit \
--class com.hnbian.spark.TestSparkStreaming \
--master local[2] \
--conf spark.streaming.kafka.maxRatePerPartition=20000 \
--jars /opt/hnbian/realtime/commons-pool2-2.3.jar,\
/opt/hnbian/realtime/jedis-2.9.0.jar,\
/opt/hnbian/realtime/kafka-clients-0.11.0.1.jar,\
/opt/hnbian/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
/opt/hnbian/realtime/testsparkstreaming.jar \
--executor-memory 4G \
--num-executors 1
在启动Spark Streaming 程序的时候, 有个参数最好指定:
spark.streaming.kafka.maxRatePerPartition=20000
( 每秒钟从topic 的每个partition 最多消费的消息数)
如果程序第一次运行, 或者因为某种原因暂停了很久, 重新启动的时候, 会积累很多消息, 如果这些消息同事被消费, 很可能会因为内存不够而挂掉, 因为需要根据实际的数据量大小, 以及批次的间隔时间来说设置该参数, 以限定批次的消息量。
如果该参数设置20000, 而批次间隔时间为10s, 那么每个批次最多从kafka中消费20w消息
3.4 Redis中的数据模型
- 分小时、分网站PV
普通K-V结构,计算时候使用incr命令递增,
Key为 site_pv_网站ID_小时
,
如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01
该数据模型用于计算分网站的按小时及按天PV。
- 分小时PV
普通K-V结构,计算时候使用incr命令递增,
Key为 pv_小时
,如:pv_2018-02-21-14、pv_2018-02-22-03
该数据模型用于计算按小时及按天总PV。
- 分天UV
Set结构,计算时候使用sadd命令添加,
Key为 uv_天
,如:uv_2018-02-21、uv_2018-02-20
该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)
注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。
3.5 故障恢复
如果 Spark Streaming 程序因为停电、网络等意外情况而终止需要恢复, 则直接重启即可;
如果因为其他原因需要重新计算某一时间段的消息, 可以先删除Redis中对应时间段内的key, 然后从原始日志中截取该段时间内的消息, 当做新的消息添加至kafka, 由Spark streaming 程序重新消费并进行计算;
3.6 附程序
依赖jar包:
commons-pool2-2.3.jar
jedis-2.9.0.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka-0-10_2.11-2.2.1.jar
3.7 代码示例
- Redis 连接池
import redis.clients.jedis.JedisPool
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
/**
* Redis连接池
*/
object InternalRedisClient extends Serializable {
// JedisPool对象使用transient关键字标记为不可序列化
@transient private var pool: JedisPool = _
/**
* 创建一个Redis连接池
*
* @param host Redis主机名或IP地址
* @param port Redis端口号
* @param timeout 连接超时时间(毫秒)
* @param maxTotal 最大连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
*/
def makePool(host: String, port: Int, timeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
// 调用第二个重载方法,使用默认值设置testOnBorrow、testOnReturn和maxWaitMillis
makePool(host, port, timeout, maxTotal, maxIdle, minIdle, testOnBorrow = true, testOnReturn = false, maxWaitMillis = 10000)
}
/**
* 创建一个Redis连接池
*
* @param host Redis主机名或IP地址
* @param port Redis端口号
* @param timeout 连接超时时间(毫秒)
* @param maxTotal 最大连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
* @param testOnBorrow 是否在获取连接时测试连接的可用性(默认为true)
* @param testOnReturn 是否在归还连接时测试连接的可用性(默认为false)
* @param maxWaitMillis 获取连接的最大等待时间(毫秒,默认为-1,表示一直等待)
*/
def makePool(host: String, port: Int, timeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if (pool == null) { // 如果连接池未被创建,则创建连接池
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal) // 设置连接池的最大连接数
poolConfig.setMaxIdle(maxIdle) // 设置连接池的最大空闲连接数
poolConfig.setMinIdle(minIdle) // 设置连接池的最小空闲连接数
poolConfig.setTestOnBorrow(testOnBorrow) // 是否在获取连接时测试连接的可用性
poolConfig.setTestOnReturn(testOnReturn) // 是否在归还连接时测试连接的可用性
poolConfig.setMaxWaitMillis(maxWaitMillis) // 获取连接的最大等待时间
pool = new JedisPool(poolConfig, host, port, timeout) // 创建一个Jedis连接池
val hook = new Thread {
override def run(): Unit = pool.destroy() // 定义一个线程,在JVM停止时销毁连接池
}
sys.addShutdownHook(hook.run()) // 添加JVM关闭钩子
}
}
/**
* 获取Redis连接池对象
*
* @return Redis连接池对象
*/
def getPool: JedisPool = {
assert(pool != null, "Redis连接池未被初始化!") // 断言连接池已被创建
pool
}
}
- TestSparkStreaming
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import redis.clients.jedis.Pipeline
object TestSparkStreaming {
def main(args: Array[String]): Unit = {
//Kafka参数
val brokers = "datadev1:9092"
val topic = "exactly-once"
val partition: Int = 0 //测试topic只有一个分区
val start_offset: Long = 0L
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "exactly-once",
"enable.auto.commit" -> (false: java.lang.Boolean),
"auto.offset.reset" -> "none"
)
// Redis 配置
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "172.16.213.79"
val redisPort = 6379
val redisTimeout = 30000
//默认db,用户存放Offset和pv数据
val dbDefaultIndex = 8
//创建连接池
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
//创建 StreamingContext
val conf = new SparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
//从Redis获取上一次存的Offset
val jedis = InternalRedisClient.getPool.getResource
try {
jedis.select(dbDefaultIndex)
val topic_partition_key = topic + "_" + partition
var lastOffset: Long = 0L
val lastSavedOffset = jedis.get(topic_partition_key)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case ex: Exception =>
println(ex.getMessage)
println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "] ")
System.exit(1)
}
}
println("lastOffset from redis -> " + lastOffset)
} finally {
InternalRedisClient.getPool.returnResource(jedis)
}
//设置每个分区起始的Offset
val fromOffsets = Map(new TopicPartition(topic, partition) -> lastOffset)
//使用 Direct API 创建 stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
fromOffsets.keys.toList,
kafkaParams,
fromOffsets
)
)
//开始处理批次消息
stream.foreachRDD { rdd =>
//获取 offsetRanges
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//将日志转换成 MyRecord 对象数组 rdd
val result = processLogs(rdd)
println("=============== Total " + result.length + " events in this batch ..")
val jedis = InternalRedisClient.getPool.getResource
val p1: Pipeline = jedis.pipelined()
try {
//选择默认数据库
p1.select(dbDefaultIndex)
//开启事务
p1.multi()
//逐条处理消息
result.foreach { record =>
//增加小时总 pv
val pv_by_hour_key = "pv_" + record.hour
p1.incr(pv_by_hour_key)
//增加网站小时 pv
val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
p1.incr(site_pv_by_hour_key)
//使用 Set 保存当天的 uv
val uv_by_day_key = "uv_" + record.hour.substring(0, 10)
p1.sadd(uv_by_day_key, record.user_id)
}
//更新 Offset
offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition +
" fromOffset: " + offsetRange.fromOffset +
" untilOffset: " + offsetRange.untilOffset)
val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
p1.set(topic_partition_key, offsetRange.untilOffset + "")
}
//提交事务
p1.exec()
} catch {
case ex:Exception =>println(ex.getMessage)
} finally {
//关闭 pipeline
p1.sync()
InternalRedisClient.getPool.returnResource(jedis)
}
}
//定义 MyRecord 类
case class MyRecord(hour: String, user_id: String, site_id: String)
//处理消息的函数,将 ConsumerRecord 转换为 MyRecord
def processLogs(messages: RDD[ConsumerRecord[String, String]]): Array[MyRecord] = {
messages.map(_.value()).flatMap(parseLog).collect()
}
//解析每条日志,生成 MyRecord
def parseLog(line: String): Option[MyRecord] = {
val ary: Array[String] = line.split("\\|~\\|", -1)
try {
val hour = ary(0).substring(0, 13).replace("T", "-")
val uri = ary(2).split("[=|&]", -1)
val user_id = uri(1)
val site_id = uri(3)
return Some(MyRecord(hour, user_id, site_id))
} catch {
case ex: Exception =>
println(ex.getMessage)
}
return None
}
ssc.start()
ssc.awaitTermination()
}
}
JedisPoolConfig config = new JedisPoolConfig();
//连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
config.setBlockWhenExhausted(true);
//设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy");
//是否启用pool的jmx管理功能, 默认true
config.setJmxEnabled(true);
//MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好.
config.setJmxNamePrefix("pool");
//是否启用后进先出, 默认true
config.setLifo(true);
//最大空闲连接数, 默认8个
config.setMaxIdle(8);
//最大连接数, 默认8个
config.setMaxTotal(8);
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(-1);
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config.setMinEvictableIdleTimeMillis(1800000);
//最小空闲连接数, 默认0
config.setMinIdle(0);
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config.setNumTestsPerEvictionRun(3);
//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config.setSoftMinEvictableIdleTimeMillis(1800000);
//在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false);
//在空闲时检查有效性, 默认false
config.setTestWhileIdle(false);
//逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
config.setTimeBetweenEvictionRunsMillis(-1);
JedisPool pool = new JedisPool(config, "localhost",);
int timeout=3000;
new JedisSentinelPool(master, sentinels, poolConfig,timeout);//timeout 读取超时