Flink系列 7. 介绍Flink中的时间语义与WaterMark


1. 时间语义和 water mark

1.1 时间语义介绍

  • EventTime (事件时间) : 数据产生时的时间,一般都包含在数据内,由用户指定数据中的时间戳。

  • Ingestion Time (摄取时间) : 数据进入 Flink 的时间,数据被 source 算子获取时的本地系统时间

  • Processing Time (处理时间) : 数据被算子处理时的系统时间,与服务器设置的时间相关。

Flink 中默认使用 Process Time,但是大多数情况下都会使用 EventTime,一般只有在 EventTime 无法使用时才会使用 Process Time 或者 Ingestion Time。

Flink 中的时间概念

1.2 指定时间语义

  • Process Time 是 flink 中的默认时间语义,无需特殊设置,但要使用 Ingestion Time 或者 Event Time 则需要手动指定。
  • 可以使用 env.setStreamTimeCharacteristic 方法,设置流的时间特性
  • 设置 Event Time 时间语义时,具体的 Event Time 还需要从数据中提取相关的时间戳(timestamp)

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//  env 创建个每个 stream 设置时间语义为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//  env 创建个每个 stream 设置时间语义为 IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

//  env 创建个每个 stream 设置时间语义为 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

1.3 从数据中提取的具体时间戳

  • 当确定时间是顺序数据,无需指定 WaterMark 可以使用:DataStream.assignAscendingTimestamps(_.timeStamp)

  • 乱序数据,需要指定 WaterMark 可以使用:DataStream.assignTimestampsAndWatermarks(new Assigner()) 。在下面 WaterMark 中介绍该方法的使用

1.4 EventTime 代码示例

1.4.1 assignAscendingTimestamps

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic

object EventTimeAssignAscendingTimestamps extends App{

  // 创建执行环境
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  private val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
  stream.map(data=>{
    val dataArray: Array[String] = data.split(",")
    Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
  }).assignAscendingTimestamps(data=>{data.time}) // 设置时间语义中的时间戳字段(毫秒值)

  env.execute()
}
case class Obj1(id:String,deviceName:String,time:Long)

1.4.2 assignTimestampsAndWatermarks

DataStream.assignTimestampsAndWatermarks() 在设置事件时间时间戳字段的同时也会设置 WaterMark 相关内容,在下面设置 WaterMark 的方法时讲解这种设置方式。

2. WaterMark 介绍

2.1 什么是 WaterMark ?

  • WaterMark 是一种衡量EventTime 进展的机制,可以设定延迟触发。
  • 通常会使用 WaterMark 加上 window 操作来处理乱序数据。
  • 数据流中的 WaterMark 用来表示 timestamp 小于 WaterMark 的数据都已经到达了,所以 window 操作的执行也是有WaterMark 触发的。
  • WaterMark 用来让程序自己平衡延迟和结果的正确性。

2.2 为什么要有 WaterMark?

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

顺序与乱序的数据

那么怎么避免乱序数据带来不正确的计算呢?

通常的解决办法是在遇到时间戳到达了窗口关闭的时间时,不会立即触发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。

2.3 WaterMark 的特点

  • WaterMark 是被插入到数据流中的一条特殊的数据记录
  • WaterMark 必须单调递增,以确保任务的事件时间的时钟是向前推进而不是后退的
  • WaterMark 与数据的时间戳相关,也就是说当我们要使用 WaterMark 时一定要使用 Event Time 时间语义

2.4 WaterMark 的传递

WaterMark 表示当前事件进展到的时间点, 以最小的 WaterMark 为准。

WaterMark 传递示意图

3. WaterMark 的设定

当我们要设定 WaterMark 时一定要先设置好 EventTime,可以使用 DataStream.assignTimestampsAndWatermarks() 方法设置 WaterMark。

设置 WaterMark 有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks两种方式

Flink 中实现的设定 WaterMark 相关的类继承图

3.1 AssignerWithPeriodicWatermarks

  • 周期性的生成 WaterMark,系统将周期性的将 WaterMark 插入到流中,默认周期是 200ms
  • 可以使用ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。例如修改WaterMark 生成时间是 5 秒 env.getConfig.setAutoWatermarkInterval(5000)
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] = {
    asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
  }

Flink 中提供了几个实现了AssignerWithPeriodicWatermarks的抽象类

  • AscendingTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor
  • IngestionTimeExtractor

3.1.1 AscendingTimestampExtractor

升序时间戳分配器,用于设置单调递增的数据的WaterMark 时间戳。

  • 源码设置WaterMark时间戳的实现方法
  //源码  
  @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) { 
      // 当 当前时间戳小于等于新时间戳(WaterMark) 时返回新的时间戳
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
      // 
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }
  • API调用方式
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor

object TestAscendingTimestampExtractor extends App{
  import org.apache.flink.streaming.api.scala._
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置使用事件时间
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj2] = stream1.map(data => {
    val arr = data.split(",")
    Obj2(arr(0), arr(1).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj2] {
    override def extractAscendingTimestamp(element: Obj2) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time + 2000
    }
  })
}

case class Obj2(id:String,time:Long)

3.1.2 BoundedOutOfOrdernessTimestampExtractor

固定延迟的时间戳分配器,适用于有乱序的数据流中,但是最大的延迟时间已经基本了解的情况。

  • 源码
    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
  • API调用方式
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

object TestBoundedOutOfOrdernessTimestampExtractor extends App{
  import org.apache.flink.streaming.api.scala._
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置使用事件时间
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj3] = stream1.map(data => {
    val arr = data.split(",")
    Obj3(arr(0), arr(1).toLong)
  }).assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[Obj3](Time.seconds(3)
  ) {
    override def extractTimestamp(element: Obj3) = element.time * 1000
  })
  env.execute()
}

case class Obj3(id:String,time:Long)

3.1.3 IngestionTimeExtractor

IngestionTimeExtractor用于指定时间特性为IngestionTime时,直接生成时间戳和获取水印。

  • 源码
    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        // 确保时间戳单调增加,即使系统时钟重新同步也是如此
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return now;
    }
  • API调用方式
    • 需要注意将时间语义修改为 IngestionTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor

object TestIngestionTimeExtractor extends App{
  import org.apache.flink.streaming.api.scala._
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置使用 IngestionTime 时间语义
  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj4] = stream1.map(data => {
    val arr = data.split(",")
    Obj4(arr(0), arr(1).toLong)
  }).assignTimestampsAndWatermarks(new IngestionTimeExtractor[Obj4])
  env.execute()
}

case class Obj4(id:String,time:Long)

3.1.4 自定义 WaterMark

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

object TestCustomAssignerWithPeriodicWatermarks {
  import org.apache.flink.streaming.api.scala._
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置使用事件时间
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj5] = stream1.map(data => {
    val arr = data.split(",")
    Obj5(arr(0), arr(1).toLong)
  }).assignTimestampsAndWatermarks(new CustomPeriodicAssiner)
  env.execute()
}

case class Obj5(id:String,time:Long)

// 周期性生成 WaterMark
class CustomPeriodicAssiner extends AssignerWithPeriodicWatermarks[Obj5] {

  /** 延迟时间为 1 分钟 */
  val bound: Long = 60 * 1000L

  /** 观察到的最大时间戳 */
  var maxTs: Long = Long.MinValue

  /** 生成当前的 WaterMark */
  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  /**
    * 抽取时间戳的方法
    * @param timeStamp 数据
    * @param previousElementTimestamp
    * @return
    */
   def extractTimestamp(timeStamp: Obj5, previousElementTimestamp: Long): Long = {
    maxTs = previousElementTimestamp.max(timeStamp.time)
    timeStamp.time
  }
}

3.2 AssignerWithPunctuatedWatermarks

没有时间规律,可以按照自己定义的规则生成 WaterMark。

org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks

def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T] = {
    asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
  }

3.2.1 AssignerWithPunctuatedWatermarks

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks

/**
  * @Author haonan.bian
  * @Description //TODO
  * @Date 2021/1/2 18:11 
  **/
object TestCustomAssignerWithPunctuatedWatermarks extends App {

  import org.apache.flink.streaming.api.scala._
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置使用事件时间
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj6] = stream1.map(data => {
    val arr = data.split(",")
    Obj6(arr(0), arr(1).toLong)
  }).assignTimestampsAndWatermarks(new CustomPunctuatedAssigner[Obj6])
  env.execute()
}
// 按照自己的规则生成 WaterMark
class CustomPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Obj6] {

  /** 观察到的最大时间戳 */
  val bound: Long = 60 * 1000

  /**根据数据生成 WaterMark*/
  override def checkAndGetNextWatermark(r: Obj6, extractedTS: Long): Watermark = {
    if (r != null) {
      new Watermark(extractedTS - bound)
    } else {
      null
    }
  }

  // 从数据中抽取时间戳的方式
  def extractTimestamp(r: Obj6, previousTS: Long): Long = {
        r.time
  }
}

case class Obj6(id:String,time:Long)

4. WaterMark 的问题

Flink 中WaterMark 由程序开发人员设定,通常需要对业务数据有一定的了解。

如果 WaterMark 设置的太久,收到过的速度就会慢很多,解决办法是在到达 WaterMark 之前输出一个近似结果。

如果 WaterMark 输出的太早,则可能出现错误的结果,不过 Flink 处理延迟数据的机制可以解决这个问题。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 8. 介绍Flink中的窗口类型与相关操作 Flink系列 8. 介绍Flink中的窗口类型与相关操作
1. window 的概念 一般真实的流都是无界的,怎样处理无界的数据? 可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。 window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中
2020-07-20
下一篇 
GreenPlum 6.0 版本升级内容与新功能介绍 GreenPlum 6.0 版本升级内容与新功能介绍
1. postgresql 升级gpdb 5版本是将 postgresql 从 8.2 升级到 8.3 ,gpdb 6 有 6 个 postgresql 大版本升级: v8.4 - 2314 commits v9.0 - 1859 com
2020-07-03
  目录