Flink系列 7. 介绍Flink中的时间语义与WaterMark


1. 时间语义和 water mark

1.1 时间语义介绍

  • EventTime (事件时间) : 数据产生时的时间,一般都包含在数据内,由用户指定数据中的时间戳。

  • Ingestion Time (摄取时间) : 数据进入 Flink 的时间,数据被 source 算子获取时的本地系统时间

  • Processing Time (处理时间) : 数据被算子处理时的系统时间,与服务器设置的时间相关。

Flink 中默认使用 Process Time,但是大多数情况下都会使用 EventTime,一般只有在 EventTime 无法使用时才会使用 Process Time 或者 Ingestion Time。

Flink 中的时间概念

1.2 指定时间语义

  • Process Time 是 flink 中的默认时间语义,无需特殊设置,但要使用 Ingestion Time 或者 Event Time 则需要手动指定。
  • 可以使用 env.setStreamTimeCharacteristic 方法,设置流的时间特性
  • 设置 Event Time 时间语义时,具体的 Event Time 还需要从数据中提取相关的时间戳(timestamp)
1
2
3
4
5
6
7
8
9
10
11
12
13

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// env 创建个每个 stream 设置时间语义为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// env 创建个每个 stream 设置时间语义为 IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

// env 创建个每个 stream 设置时间语义为 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

1.3 从数据中提取的具体时间戳

  • 当确定时间是顺序数据,无需指定 WaterMark 可以使用:DataStream.assignAscendingTimestamps(_.timeStamp)

  • 乱序数据,需要指定 WaterMark 可以使用:DataStream.assignTimestampsAndWatermarks(new Assigner()) 。在下面 WaterMark 中介绍该方法的使用

1.4 EventTime 代码示例

1.4.1 assignAscendingTimestamps

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic

object EventTimeAssignAscendingTimestamps extends App{

// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

private val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
stream.map(data=>{
val dataArray: Array[String] = data.split(",")
Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
}).assignAscendingTimestamps(data=>{data.time}) // 设置时间语义中的时间戳字段(毫秒值)

env.execute()
}
case class Obj1(id:String,deviceName:String,time:Long)

1.4.2 assignTimestampsAndWatermarks

DataStream.assignTimestampsAndWatermarks() 在设置事件时间时间戳字段的同时也会设置 WaterMark 相关内容,在下面设置 WaterMark 的方法时讲解这种设置方式。

2. WaterMark 介绍

2.1 什么是 WaterMark ?

  • WaterMark 是一种衡量EventTime 进展的机制,可以设定延迟触发。
  • 通常会使用 WaterMark 加上 window 操作来处理乱序数据。
  • 数据流中的 WaterMark 用来表示 timestamp 小于 WaterMark 的数据都已经到达了,所以 window 操作的执行也是有WaterMark 触发的。
  • WaterMark 用来让程序自己平衡延迟和结果的正确性。

2.2 为什么要有 WaterMark?

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

顺序与乱序的数据

那么怎么避免乱序数据带来不正确的计算呢?

通常的解决办法是在遇到时间戳到达了窗口关闭的时间时,不会立即触发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。

2.3 WaterMark 的特点

  • WaterMark 是被插入到数据流中的一条特殊的数据记录
  • WaterMark 必须单调递增,以确保任务的事件时间的时钟是向前推进而不是后退的
  • WaterMark 与数据的时间戳相关,也就是说当我们要使用 WaterMark 时一定要使用 Event Time 时间语义

2.4 WaterMark 的传递

WaterMark 表示当前事件进展到的时间点, 以最小的 WaterMark 为准。

WaterMark 传递示意图

3. WaterMark 的设定

当我们要设定 WaterMark 时一定要先设置好 EventTime,可以使用 DataStream.assignTimestampsAndWatermarks() 方法设置 WaterMark。

设置 WaterMark 有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks两种方式

Flink 中实现的设定 WaterMark 相关的类继承图

3.1 AssignerWithPeriodicWatermarks

  • 周期性的生成 WaterMark,系统将周期性的将 WaterMark 插入到流中,默认周期是 200ms
  • 可以使用ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。例如修改WaterMark 生成时间是 5 秒 env.getConfig.setAutoWatermarkInterval(5000)
1
2
3
4
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] = {
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}

Flink 中提供了几个实现了AssignerWithPeriodicWatermarks的抽象类

  • AscendingTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor
  • IngestionTimeExtractor

3.1.1 AscendingTimestampExtractor

升序时间戳分配器,用于设置单调递增的数据的WaterMark 时间戳。

  • 源码设置WaterMark时间戳的实现方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
 //源码  
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
// 当 当前时间戳小于等于新时间戳(WaterMark) 时返回新的时间戳
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
//
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
  • API调用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor

object TestAscendingTimestampExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj2] = stream1.map(data => {
val arr = data.split(",")
Obj2(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj2] {
override def extractAscendingTimestamp(element: Obj2) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time + 2000
}
})
}

case class Obj2(id:String,time:Long)

3.1.2 BoundedOutOfOrdernessTimestampExtractor

固定延迟的时间戳分配器,适用于有乱序的数据流中,但是最大的延迟时间已经基本了解的情况。

  • 源码
1
2
3
4
5
6
7
8
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
  • API调用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

object TestBoundedOutOfOrdernessTimestampExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj3] = stream1.map(data => {
val arr = data.split(",")
Obj3(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Obj3](Time.seconds(3)
) {
override def extractTimestamp(element: Obj3) = element.time * 1000
})
env.execute()
}

case class Obj3(id:String,time:Long)

3.1.3 IngestionTimeExtractor

IngestionTimeExtractor用于指定时间特性为IngestionTime时,直接生成时间戳和获取水印。

  • 源码
1
2
3
4
5
6
7
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
// 确保时间戳单调增加,即使系统时钟重新同步也是如此
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
  • API调用方式
    • 需要注意将时间语义修改为 IngestionTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor

object TestIngestionTimeExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用 IngestionTime 时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj4] = stream1.map(data => {
val arr = data.split(",")
Obj4(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor[Obj4])
env.execute()
}

case class Obj4(id:String,time:Long)

3.1.4 自定义 WaterMark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

object TestCustomAssignerWithPeriodicWatermarks {
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj5] = stream1.map(data => {
val arr = data.split(",")
Obj5(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new CustomPeriodicAssiner)
env.execute()
}

case class Obj5(id:String,time:Long)

// 周期性生成 WaterMark
class CustomPeriodicAssiner extends AssignerWithPeriodicWatermarks[Obj5] {

/** 延迟时间为 1 分钟 */
val bound: Long = 60 * 1000L

/** 观察到的最大时间戳 */
var maxTs: Long = Long.MinValue

/** 生成当前的 WaterMark */
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}

/**
* 抽取时间戳的方法
* @param timeStamp 数据
* @param previousElementTimestamp
* @return
*/
def extractTimestamp(timeStamp: Obj5, previousElementTimestamp: Long): Long = {
maxTs = previousElementTimestamp.max(timeStamp.time)
timeStamp.time
}
}

3.2 AssignerWithPunctuatedWatermarks

没有时间规律,可以按照自己定义的规则生成 WaterMark。

1
2
3
4
5
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks

def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T] = {
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}

3.2.1 AssignerWithPunctuatedWatermarks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/2 18:11
**/
object TestCustomAssignerWithPunctuatedWatermarks extends App {

import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj6] = stream1.map(data => {
val arr = data.split(",")
Obj6(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new CustomPunctuatedAssigner[Obj6])
env.execute()
}
// 按照自己的规则生成 WaterMark
class CustomPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Obj6] {

/** 观察到的最大时间戳 */
val bound: Long = 60 * 1000

/**根据数据生成 WaterMark*/
override def checkAndGetNextWatermark(r: Obj6, extractedTS: Long): Watermark = {
if (r != null) {
new Watermark(extractedTS - bound)
} else {
null
}
}

// 从数据中抽取时间戳的方式
def extractTimestamp(r: Obj6, previousTS: Long): Long = {
r.time
}
}

case class Obj6(id:String,time:Long)

4. WaterMark 的问题

Flink 中WaterMark 由程序开发人员设定,通常需要对业务数据有一定的了解。

如果 WaterMark 设置的太久,收到过的速度就会慢很多,解决办法是在到达 WaterMark 之前输出一个近似结果。

如果 WaterMark 输出的太早,则可能出现错误的结果,不过 Flink 处理延迟数据的机制可以解决这个问题。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录