Spark Streaming 6. Exactly-Once解决方案


1 Exactly-Once事务处理

1.1 什么是Exactly-Once事务?

数据仅处理一次并且仅输出一次,这样才是完整的事务处理。

以银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,保证事务的一致性,也就是说事务输出,能够输出且只会输出一次,即A只转一次,B只收一次。

1.2 从事务视角解密Spark Streaming架构

如下图,Spark Streaming应用程序启动,会分配资源,除非整个集群硬件资源崩溃,一般情况下都不会有问题。Spark Streaming程序分成两部分,一部分是Driver,另外一部分是Executor。Receiver接收到数据后不断发送元数据给Driver,Driver接收到元数据信息后进行CheckPoint处理。其中CheckPoint的元数据包括:Configuration(含有Spark Conf、Spark Streaming等配置信息)、Block MetaData、DStreamGraph、未处理完和等待中的Job。当然Receiver可以在多个Executor节点的上执行Job,Job的执行完全基于SparkCore的调度模式进行的。

Executor只有函数处理逻辑和数据,外部InputStream流入到Receiver中通过BlockManager写入磁盘、内存、WAL进行容错。WAL先写入磁盘然后写入Executor中,失败可能性不大。如果1G数据要处理,Executor一条一条接收,Receiver接收数据是积累到一定记录后才会写入WAL,如果Receiver线程失败时,数据有可能会丢失。

Driver处理元数据前会进行CheckPoint,Spark Streaming获取数据、产生作业,但没有解决执行的问题,执行一定要经过SparkContext。Driver级别的数据修复从Driver CheckPoint中需要把数据读入,在其内部会重新构建SparkContext、StreamingContext、SparkJob,再提交Spark集群运行。Receiver的重新恢复时会通过磁盘的WAL从磁盘恢复过来。

1.3 数据可能丢失的情况及通常的解决方式

​ 在Receiver收到数据且通过Driver的调度Executor开始计算数据的时候,如果Driver突然奔溃,则此时Executor会被杀死,那么Executor中的数据就会丢失(如果没有进行WAL的操作)。

​ 此时就必须通过例如WAL的方式,让所有的数据都通过例如HDFS的方式首先进行安全性容错处理。此时如果Executor中的数据丢失的话,就可以通过WAL恢复回来。

​ 注意:这种方式的弊端是通过WAL的方式会极大额损伤SparkStreaming中Receivers接收数据的性能。

1.4 Exactly-Once事务处理如何解决数据零丢失?

解决数据零丢失,必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全。

​ 我们以数据来自Kafka为例,运行在Executor上的Receiver在接收到来自Kafka的数据时会向Kafka发送ACK确认收到信息并读取下一条信息,Kafka会通过updateOffset来记录Receiver接收到的偏移,这种方式保证了在Executor数据零丢失。在Driver端,定期进行checkpoint操作,出错时从Checkpoint的文件系统中把数据读取进来进行恢复,内部会重新构建StreamingContext(也就是构建SparkContext)并启动,恢复出元数据metedata,再次产生RDD,恢复的是上次的Job,然后再次提交到集群执行。

1.5 Exactly-Once事务处理如何解决数据重复读取?

​ 在Receiver收到数据保存到HDFS等持久化引擎但是没有来得及进行updateOffsets(以Kafka为例),此时Receiver崩溃后重新启动就会通过管理Kafka的Zookeeper中元数据再次重复读取数据,但是此时Spark Streaming认为是成功的,但是Kafka认为是失败的(因为没有更新offset到ZooKeeper中),此时就会导致数据重新消费的情况。

​ 以Receiver基于ZooKeeper的方式,当读取数据时去访问Kafka的元数据信息,在处理代码中例如foreachRDD或transform时,将信息写入到内存数据库中(memorySet),在计算时读取内存数据库信息,判断是否已处理过,如果以处理过则跳过计算。这些元数据信息可以保存到内存数据结构或者memsql,sqllite中。

​ 注意:如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接收的时候又会有数据副本,这个时候其实是存储资源的浪费。Spark在1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统。此时兼具有流的优势和文件系统的优势,至此Spark Streaming+Kafka就构建了完美的流处理世界(1,数据不需要拷贝副本;2,不需要WAL对性能的损耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通过Kafka API直接消息数据,直接管理Offset,所以也不会重复消费数据。

2 输出不重复的解决办法

2.1 为什么会有数据输出多次重写这个问题?

因为Spark Streaming在计算的时候基于Spark Core天生会做以下事情导致Spark Streaming的结果(部分)重复输出:

  • Task重试
  • 慢任务推测
  • Stage重试
  • Job重试

2.2 具体解决方案?

  • 设置spark.task.maxFailures次数为1,这样就不会有Task重试了。
  • 设置spark.speculation为关闭状态,就不会有慢任务推测了,因为慢任务推测非常消耗性能,所以关闭后可以显著提高Spark Streaming处理性能。
  • Spark Streaming On Kafka的话,Job失败后可以设置Kafka的参数auto.offset.reset为largest方式。

2.3 kafka auto.offset.reset 值含义解释

  • 当各分区下有已提交的offset时(默认使用latest)
参数 说明
earliest 从提交的offset开始消费;无提交的offset时,从头开始消费
latest 从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none 从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  • 测试某一个分区有提交offset
参数 结果
none 当该topic下所有分区中存在未提交的offset时,抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

3. 使用示例

在实时流式计算中, 最重要的是在任何情况下, 消息不重复、比丢失, 即Exactly-once。 下面以 kafka -> Spark Streaming -> Redis 为例, 一方面说明一下如何做到 Exactly-once, 另一方面说明一下计算实时如何做去重指标

3.1 关于数据源

数据源是文本格式的日志, 由Nginx 产生, 存放在日志服务器上。 在日志服务器上部署flume Agent, 使用 TAILDIR Source 和kafka sink, 将日志采集到kafka进行临时存储, 日志格式如下:

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=DEIBAH&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3
2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3
2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

日志是由测试程序模拟产生的,字段之间由|~|分隔。

3.2 实时计算需求

分天、分小时PV;

分天、分小时、分网站(siteid)PV;

分天 UV;

3.3 消费Kafka数据

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

在Spark Streaming 中消费kafka数据, 保证Exactly-once 的核心有三点:

  1. 使用Direct 方式连接kafka

  2. 自己保存和维护Offset

  3. 更新Offset和计算在同一事务中完成

后面的Spark Streaming 程序, 主要有一下步骤:

  1. 启动后, 先从 Redis 中获取上次保存的 Offset, Redis 中的可以 为 topic_partition , 即每个分区维护一个Offset

  2. 使用获取到的 Offset, 创建 DirectStream

  3. 在处理每批次的消息时, 利用 Redis 的事物机制, 确保在 Redis 中指标的计算和 Offset 的更新维护在同一事物中完成。 只有这两者同步, 才能真正保证消息的 Exactly-once

  • 执行脚本
./spark-submit \
--class com.hnbian.spark.TestSparkStreaming \
--master local[2] \
--conf spark.streaming.kafka.maxRatePerPartition=20000 \
--jars /opt/hnbian/realtime/commons-pool2-2.3.jar,\
/opt/hnbian/realtime/jedis-2.9.0.jar,\
/opt/hnbian/realtime/kafka-clients-0.11.0.1.jar,\
/opt/hnbian/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
/opt/hnbian/realtime/testsparkstreaming.jar \
--executor-memory 4G \
--num-executors 1

在启动Spark Streaming 程序的时候, 有个参数最好指定:

spark.streaming.kafka.maxRatePerPartition=20000 ( 每秒钟从topic 的每个partition 最多消费的消息数)

如果程序第一次运行, 或者因为某种原因暂停了很久, 重新启动的时候, 会积累很多消息, 如果这些消息同事被消费, 很可能会因为内存不够而挂掉, 因为需要根据实际的数据量大小, 以及批次的间隔时间来说设置该参数, 以限定批次的消息量。

如果该参数设置20000, 而批次间隔时间为10s, 那么每个批次最多从kafka中消费20w消息

3.4 Redis中的数据模型

  • 分小时、分网站PV

普通K-V结构,计算时候使用incr命令递增,

Key为 site_pv_网站ID_小时

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

该数据模型用于计算分网站的按小时及按天PV。

  • 分小时PV

普通K-V结构,计算时候使用incr命令递增,

Key为 pv_小时,如:pv_2018-02-21-14、pv_2018-02-22-03

该数据模型用于计算按小时及按天总PV。

  • 分天UV

Set结构,计算时候使用sadd命令添加,

Key为 uv_天,如:uv_2018-02-21、uv_2018-02-20

该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)

注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。

3.5 故障恢复

如果 Spark Streaming 程序因为停电、网络等意外情况而终止需要恢复, 则直接重启即可;

如果因为其他原因需要重新计算某一时间段的消息, 可以先删除Redis中对应时间段内的key, 然后从原始日志中截取该段时间内的消息, 当做新的消息添加至kafka, 由Spark streaming 程序重新消费并进行计算;

3.6 附程序

依赖jar包:

commons-pool2-2.3.jar
jedis-2.9.0.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka-0-10_2.11-2.2.1.jar

3.7 代码示例

  • Redis 连接池
import redis.clients.jedis.JedisPool
import org.apache.commons.pool2.impl.GenericObjectPoolConfig

/**
  * Redis连接池
  */
object InternalRedisClient extends Serializable {

  // JedisPool对象使用transient关键字标记为不可序列化
  @transient private var pool: JedisPool = _

  /**
    * 创建一个Redis连接池
    *
    * @param host     Redis主机名或IP地址
    * @param port     Redis端口号
    * @param timeout  连接超时时间(毫秒)
    * @param maxTotal 最大连接数
    * @param maxIdle  最大空闲连接数
    * @param minIdle  最小空闲连接数
    */
  def makePool(host: String, port: Int, timeout: Int,
               maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
    // 调用第二个重载方法,使用默认值设置testOnBorrow、testOnReturn和maxWaitMillis
    makePool(host, port, timeout, maxTotal, maxIdle, minIdle, testOnBorrow = true, testOnReturn = false, maxWaitMillis = 10000)
  }

  /**
    * 创建一个Redis连接池
    *
    * @param host            Redis主机名或IP地址
    * @param port            Redis端口号
    * @param timeout         连接超时时间(毫秒)
    * @param maxTotal        最大连接数
    * @param maxIdle         最大空闲连接数
    * @param minIdle         最小空闲连接数
    * @param testOnBorrow    是否在获取连接时测试连接的可用性(默认为true)
    * @param testOnReturn    是否在归还连接时测试连接的可用性(默认为false)
    * @param maxWaitMillis   获取连接的最大等待时间(毫秒,默认为-1,表示一直等待)
    */
  def makePool(host: String, port: Int, timeout: Int,
               maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
               testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
    if (pool == null) { // 如果连接池未被创建,则创建连接池
      val poolConfig = new GenericObjectPoolConfig()
      poolConfig.setMaxTotal(maxTotal) // 设置连接池的最大连接数
      poolConfig.setMaxIdle(maxIdle) // 设置连接池的最大空闲连接数
      poolConfig.setMinIdle(minIdle) // 设置连接池的最小空闲连接数
      poolConfig.setTestOnBorrow(testOnBorrow) // 是否在获取连接时测试连接的可用性
      poolConfig.setTestOnReturn(testOnReturn) // 是否在归还连接时测试连接的可用性
      poolConfig.setMaxWaitMillis(maxWaitMillis) // 获取连接的最大等待时间
      pool = new JedisPool(poolConfig, host, port, timeout) // 创建一个Jedis连接池
      val hook = new Thread {
        override def run(): Unit = pool.destroy() // 定义一个线程,在JVM停止时销毁连接池
      }
      sys.addShutdownHook(hook.run()) // 添加JVM关闭钩子
    }
  }

  /**
    * 获取Redis连接池对象
    *
    * @return Redis连接池对象
    */
  def getPool: JedisPool = {
    assert(pool != null, "Redis连接池未被初始化!") // 断言连接池已被创建
    pool
  }
}
  • TestSparkStreaming
    import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import redis.clients.jedis.Pipeline

object TestSparkStreaming {

  def main(args: Array[String]): Unit = {

    //Kafka参数
    val brokers = "datadev1:9092"
    val topic = "exactly-once"
    val partition: Int = 0 //测试topic只有一个分区
    val start_offset: Long = 0L
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "exactly-once",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "auto.offset.reset" -> "none"
    )

    // Redis 配置
    val maxTotal = 10
    val maxIdle = 10
    val minIdle = 1
    val redisHost = "172.16.213.79"
    val redisPort = 6379
    val redisTimeout = 30000
    //默认db,用户存放Offset和pv数据
    val dbDefaultIndex = 8
    //创建连接池
    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

    //创建 StreamingContext
    val conf = new SparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master", "local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    //从Redis获取上一次存的Offset
    val jedis = InternalRedisClient.getPool.getResource
    try {
      jedis.select(dbDefaultIndex)
      val topic_partition_key = topic + "_" + partition
      var lastOffset: Long = 0L
      val lastSavedOffset = jedis.get(topic_partition_key)
      if (null != lastSavedOffset) {
        try {
          lastOffset = lastSavedOffset.toLong
        } catch {
          case ex: Exception =>
            println(ex.getMessage)
            println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "] ")
            System.exit(1)
        }
      }
      println("lastOffset from redis -> " + lastOffset)
    } finally {
      InternalRedisClient.getPool.returnResource(jedis)
    }

    //设置每个分区起始的Offset
    val fromOffsets = Map(new TopicPartition(topic, partition) -> lastOffset)

    //使用 Direct API 创建 stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](
        fromOffsets.keys.toList,
        kafkaParams,
        fromOffsets
      )
    )

    //开始处理批次消息
    stream.foreachRDD { rdd =>

      //获取 offsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      //将日志转换成 MyRecord 对象数组 rdd
      val result = processLogs(rdd)

      println("=============== Total " + result.length + " events in this batch ..")

      val jedis = InternalRedisClient.getPool.getResource
      val p1: Pipeline = jedis.pipelined()
      try {
        //选择默认数据库
        p1.select(dbDefaultIndex)
        //开启事务
        p1.multi()

        //逐条处理消息
        result.foreach { record =>
          //增加小时总 pv
          val pv_by_hour_key = "pv_" + record.hour
          p1.incr(pv_by_hour_key)
          //增加网站小时 pv
          val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
          p1.incr(site_pv_by_hour_key)
          //使用 Set 保存当天的 uv
          val uv_by_day_key = "uv_" + record.hour.substring(0, 10)
          p1.sadd(uv_by_day_key, record.user_id)
        }

        //更新 Offset
        offsetRanges.foreach { offsetRange =>
          println("partition : " + offsetRange.partition + 
                  " fromOffset:  " + offsetRange.fromOffset + 
                  " untilOffset: " + offsetRange.untilOffset)
          val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
          p1.set(topic_partition_key, offsetRange.untilOffset + "")
        }

        //提交事务
        p1.exec()
      } catch {
        case ex:Exception =>println(ex.getMessage)
      } finally {
          //关闭 pipeline
          p1.sync()
          InternalRedisClient.getPool.returnResource(jedis)
      }
    }
      //定义 MyRecord 类
      case class MyRecord(hour: String, user_id: String, site_id: String)
      //处理消息的函数,将 ConsumerRecord 转换为 MyRecord
      def processLogs(messages: RDD[ConsumerRecord[String, String]]): Array[MyRecord] = {
          messages.map(_.value()).flatMap(parseLog).collect()
      }
      //解析每条日志,生成 MyRecord
      def parseLog(line: String): Option[MyRecord] = {
          val ary: Array[String] = line.split("\\|~\\|", -1)
          try {
              val hour = ary(0).substring(0, 13).replace("T", "-")
              val uri = ary(2).split("[=|&]", -1)
              val user_id = uri(1)
              val site_id = uri(3)
              return Some(MyRecord(hour, user_id, site_id))
          } catch {
              case ex: Exception =>
              println(ex.getMessage)
          }
          return None
      }
      ssc.start()
      ssc.awaitTermination()
  }
}
    JedisPoolConfig config = new JedisPoolConfig();

    //连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
    config.setBlockWhenExhausted(true);

    //设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
    config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy");

    //是否启用pool的jmx管理功能, 默认true
    config.setJmxEnabled(true);

    //MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好.
    config.setJmxNamePrefix("pool");

    //是否启用后进先出, 默认true
    config.setLifo(true);

    //最大空闲连接数, 默认8个
    config.setMaxIdle(8);

    //最大连接数, 默认8个
    config.setMaxTotal(8);

    //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
    config.setMaxWaitMillis(-1);

    //逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
    config.setMinEvictableIdleTimeMillis(1800000);

    //最小空闲连接数, 默认0
    config.setMinIdle(0);

    //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
    config.setNumTestsPerEvictionRun(3);

    //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)   
    config.setSoftMinEvictableIdleTimeMillis(1800000);

    //在获取连接的时候检查有效性, 默认false
    config.setTestOnBorrow(false);

    //在空闲时检查有效性, 默认false
    config.setTestWhileIdle(false);

    //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
    config.setTimeBetweenEvictionRunsMillis(-1);

    JedisPool pool = new JedisPool(config, "localhost",);

    int timeout=3000;
     new JedisSentinelPool(master, sentinels, poolConfig,timeout);//timeout 读取超时

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark Streaming 7. Structured Streaming 入门 Spark Streaming 7. Structured Streaming 入门
本文转自:Spark结构式流编程指南 1. 概览Structured Streaming 是一个可拓展,容错的,基于 Spark SQL 执行引擎的流处理引擎。使用小量的静态数据模拟流处理。伴随流数据的到来,SparkSQL 引擎会逐渐连续
2018-04-15
下一篇 
实例详解机器学习如何解决问题 实例详解机器学习如何解决问题
转载美团文档,原文地址为:https://tech.meituan.com/mt-mlinaction-how-to-ml.html 1. 前言随着大数据时代的到来,机器学习成为解决问题的一种重要且关键的工具。不管是工业界还是学术界,机器学
2018-03-30
  目录