1. 时间语义和 water mark 1.1 时间语义介绍
EventTime (事件时间) : 数据产生时的时间,一般都包含在数据内,由用户指定数据中的时间戳。
Ingestion Time (摄取时间) : 数据进入 Flink 的时间,数据被 source 算子获取时的本地系统时间
Processing Time (处理时间) : 数据被算子处理时的系统时间,与服务器设置的时间相关。
Flink 中默认使用 Process Time,但是大多数情况下都会使用 EventTime,一般只有在 EventTime 无法使用时才会使用 Process Time 或者 Ingestion Time。
1.2 指定时间语义
Process Time 是 flink 中的默认时间语义,无需特殊设置,但要使用 Ingestion Time 或者 Event Time 则需要手动指定。
可以使用 env.setStreamTimeCharacteristic 方法,设置流的时间特性
设置 Event Time 时间语义时,具体的 Event Time 还需要从数据中提取相关的时间戳(timestamp)
1 2 3 4 5 6 7 8 9 10 11 12 13 val env = StreamExecutionEnvironment .getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setStreamTimeCharacteristic(TimeCharacteristic .IngestionTime ) env.setStreamTimeCharacteristic(TimeCharacteristic .ProcessingTime )
1.3 从数据中提取的具体时间戳
1.4 EventTime 代码示例 1.4.1 assignAscendingTimestamps 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.TimeCharacteristic object EventTimeAssignAscendingTimestamps extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) private val stream: DataStream [String ] = env.socketTextStream("localhost" , 9999 ) stream.map(data=>{ val dataArray: Array [String ] = data.split("," ) Obj1 (dataArray(0 ),dataArray(1 ),dataArray(2 ).toLong) }).assignAscendingTimestamps(data=>{data.time}) env.execute() } case class Obj1 (id:String ,deviceName:String ,time:Long )
1.4.2 assignTimestampsAndWatermarks DataStream.assignTimestampsAndWatermarks() 在设置事件时间时间戳字段的同时也会设置 WaterMark 相关内容,在下面设置 WaterMark 的方法时讲解这种设置方式。
2. WaterMark 介绍 2.1 什么是 WaterMark ?
WaterMark 是一种衡量EventTime 进展的机制,可以设定延迟触发。
通常会使用 WaterMark 加上 window 操作来处理乱序数据。
数据流中的 WaterMark 用来表示 timestamp 小于 WaterMark 的数据都已经到达了,所以 window 操作的执行也是有WaterMark 触发的。
WaterMark 用来让程序自己平衡延迟和结果的正确性。
2.2 为什么要有 WaterMark? 当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
那么怎么避免乱序数据带来不正确的计算呢?
通常的解决办法是在遇到时间戳到达了窗口关闭的时间时,不会立即触发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。
2.3 WaterMark 的特点
WaterMark 是被插入到数据流中的一条特殊的数据记录
WaterMark 必须单调递增,以确保任务的事件时间的时钟是向前推进而不是后退的
WaterMark 与数据的时间戳相关,也就是说当我们要使用 WaterMark 时一定要使用 Event Time 时间语义
2.4 WaterMark 的传递 WaterMark 表示当前事件进展到的时间点, 以最小的 WaterMark 为准。
3. WaterMark 的设定 当我们要设定 WaterMark 时一定要先设置好 EventTime,可以使用 DataStream.assignTimestampsAndWatermarks() 方法设置 WaterMark。
设置 WaterMark 有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks两种方式
3.1 AssignerWithPeriodicWatermarks
周期性的生成 WaterMark,系统将周期性的将 WaterMark 插入到流中,默认周期是 200ms
可以使用ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。例如修改WaterMark 生成时间是 5 秒 env.getConfig.setAutoWatermarkInterval(5000)
1 2 3 4 org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks def assignTimestampsAndWatermarks (assigner: AssignerWithPeriodicWatermarks [T ]): DataStream [T ] = { asScalaStream(stream.assignTimestampsAndWatermarks(assigner)) }
Flink 中提供了几个实现了AssignerWithPeriodicWatermarks的抽象类
AscendingTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor
IngestionTimeExtractor
升序时间戳分配器,用于设置单调递增的数据的WaterMark 时间戳。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public final long extractTimestamp(T element, long elementPrevTimestamp) { final long newTimestamp = extractAscendingTimestamp(element); if (newTimestamp >= this .currentTimestamp) { this .currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this .currentTimestamp); return newTimestamp; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor object TestAscendingTimestampExtractor extends App { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj2 ] = stream1.map(data => { val arr = data.split("," ) Obj2 (arr(0 ), arr(1 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj2 ] { override def extractAscendingTimestamp (element: Obj2 ) = { element.time + 2000 } }) } case class Obj2 (id:String ,time:Long )
固定延迟的时间戳分配器,适用于有乱序的数据流中,但是最大的延迟时间已经基本了解的情况。
1 2 3 4 5 6 7 8 @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor object TestBoundedOutOfOrdernessTimestampExtractor extends App { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj3 ] = stream1.map(data => { val arr = data.split("," ) Obj3 (arr(0 ), arr(1 ).toLong) }).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor [Obj3 ](Time .seconds(3 ) ) { override def extractTimestamp (element: Obj3 ) = element.time * 1000 }) env.execute() } case class Obj3 (id:String ,time:Long )
IngestionTimeExtractor用于指定时间特性为IngestionTime时,直接生成时间戳和获取水印。
1 2 3 4 5 6 7 @Override public long extractTimestamp(T element, long previousElementTimestamp) { final long now = Math .max(System .currentTimeMillis(), maxTimestamp); maxTimestamp = now; return now; }
API调用方式
需要注意将时间语义修改为 IngestionTime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.IngestionTimeExtractor object TestIngestionTimeExtractor extends App { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .IngestionTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj4 ] = stream1.map(data => { val arr = data.split("," ) Obj4 (arr(0 ), arr(1 ).toLong) }).assignTimestampsAndWatermarks(new IngestionTimeExtractor [Obj4 ]) env.execute() } case class Obj4 (id:String ,time:Long )
3.1.4 自定义 WaterMark 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.watermark.Watermark object TestCustomAssignerWithPeriodicWatermarks { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj5 ] = stream1.map(data => { val arr = data.split("," ) Obj5 (arr(0 ), arr(1 ).toLong) }).assignTimestampsAndWatermarks(new CustomPeriodicAssiner ) env.execute() } case class Obj5 (id:String ,time:Long )class CustomPeriodicAssiner extends AssignerWithPeriodicWatermarks [Obj5 ] { val bound: Long = 60 * 1000 L var maxTs: Long = Long .MinValue override def getCurrentWatermark : Watermark = { new Watermark (maxTs - bound) } def extractTimestamp (timeStamp: Obj5 , previousElementTimestamp: Long ): Long = { maxTs = previousElementTimestamp.max(timeStamp.time) timeStamp.time } }
3.2 AssignerWithPunctuatedWatermarks 没有时间规律,可以按照自己定义的规则生成 WaterMark。
1 2 3 4 5 org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks def assignTimestampsAndWatermarks (assigner: AssignerWithPunctuatedWatermarks [T ]): DataStream [T ] = { asScalaStream(stream.assignTimestampsAndWatermarks(assigner)) }
3.2.1 AssignerWithPunctuatedWatermarks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks object TestCustomAssignerWithPunctuatedWatermarks extends App { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj6 ] = stream1.map(data => { val arr = data.split("," ) Obj6 (arr(0 ), arr(1 ).toLong) }).assignTimestampsAndWatermarks(new CustomPunctuatedAssigner [Obj6 ]) env.execute() } class CustomPunctuatedAssigner extends AssignerWithPunctuatedWatermarks [Obj6 ] { val bound: Long = 60 * 1000 override def checkAndGetNextWatermark (r: Obj6 , extractedTS: Long ): Watermark = { if (r != null ) { new Watermark (extractedTS - bound) } else { null } } def extractTimestamp (r: Obj6 , previousTS: Long ): Long = { r.time } } case class Obj6 (id:String ,time:Long )
4. WaterMark 的问题 Flink 中WaterMark 由程序开发人员设定,通常需要对业务数据有一定的了解。
如果 WaterMark 设置的太久,收到过的速度就会慢很多,解决办法是在到达 WaterMark 之前输出一个近似结果。
如果 WaterMark 输出的太早,则可能出现错误的结果,不过 Flink 处理延迟数据的机制可以解决这个问题。