1. 时间语义和 water mark
1.1 时间语义介绍
EventTime (事件时间) : 数据产生时的时间,一般都包含在数据内,由用户指定数据中的时间戳。
Ingestion Time (摄取时间) : 数据进入 Flink 的时间,数据被 source 算子获取时的本地系统时间
Processing Time (处理时间) : 数据被算子处理时的系统时间,与机器相关。
flink 中默认使用Process Time,但是大多数情况下都会使用 EventTime,一般只有在 EventTime 无法使用时才会使用 Process Time 或者 Ingestion Time。
1.2 指定时间语义
- Process Time 是 flink 中的默认时间语义,无需特殊设置,但要使用Ingestion Time 或者 Event Time 则需要手动指定。
- 可以使用 env.setStreamTimeCharacteristic 方法,设置流的时间特性
- 设置 Event Time 时间语义时,具体的 Event Time 还需要从数据中提取相关的时间戳(timestamp)
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env 创建个每个 stream 设置时间语义为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// env 创建个每个 stream 设置时间语义为 IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env 创建个每个 stream 设置时间语义为 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
1.3 从数据中提取的具体时间戳
当确定时间是顺序数据,无需指定 WaterMark 可以使用:DataStream.assignAscendingTimestamps(_.timeStamp)
乱序数据,需要指定 WaterMark 可以使用:DataStream.assignTimestampsAndWatermarks(new Assigner()) 。在下面 WaterMark 中介绍该方法的使用
1.4 EventTime 代码示例
1.4.1 assignAscendingTimestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
object EventTimeAssignAscendingTimestamps extends App{
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
private val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
stream.map(data=>{
val dataArray: Array[String] = data.split(",")
Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
}).assignAscendingTimestamps(data=>{data.time}) // 设置时间语义中的时间戳字段(毫秒值)
env.execute()
}
case class Obj1(id:String,deviceName:String,time:Long)
1.4.2 assignTimestampsAndWatermarks
DataStream.assignTimestampsAndWatermarks() 在设置事件时间时间戳字段的同时也会设置 WaterMark 相关内容,在下面设置 WaterMark 的方法时讲解这种设置方式。
2. WaterMark 介绍
2.1 什么是 WaterMark ?
- WaterMark 是一种衡量EventTime 进展的机制,可以设定延迟触发。
- 通常会使用 WaterMark 加上 window 操作来处理乱序数据。
- 数据流中的 WaterMark 用来表示 timestamp 小于 WaterMark 的数据都已经到达了,所以 window 操作的执行也是有WaterMark 触发的。
- WaterMark 用来让程序自己平衡延迟和结果的正确性。
2.2 为什么要有 WaterMark?
当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
那么怎么避免乱序数据带来不正确的计算呢?
通常的解决办法是在遇到时间戳到达了窗口关闭的时间时,不会立即触发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。
2.3 WaterMark 的特点
- WaterMark 是被插入到数据流中的一条特殊的数据记录
- WaterMark 必须单调递增,以确保任务的事件时间的时钟是向前推进而不是后退的
- WaterMark 与数据的时间戳相关,也就是说当我们要使用 WaterMark 时一定要使用 Event Time 时间语义
2.4 WaterMark 的传递
3. WaterMark 的设定
当我们要设定 WaterMark 时一定要先设置好 EventTime,可以使用 DataStream.assignTimestampsAndWatermarks() 方法设置 WaterMark。
设置 WaterMark 有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks两种方式
3.1 AssignerWithPeriodicWatermarks
- 周期性的生成 WaterMark,系统将周期性的将 WaterMark 插入到流中,默认周期是 200ms
- 可以使用ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。例如修改WaterMark 生成时间是 5 秒 env.getConfig.setAutoWatermarkInterval(5000)
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] = {
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}
Flink 中提供了几个实现了AssignerWithPeriodicWatermarks的抽象类
- AscendingTimestampExtractor
- BoundedOutOfOrdernessTimestampExtractor
- IngestionTimeExtractor
3.1.1 AscendingTimestampExtractor
升序时间戳分配器,用于设置单调递增的数据的WaterMark 时间戳。
- 源码设置WaterMark时间戳的实现方法
//源码
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
// 当当前时间戳小于等于新时间戳(WaterMark) 时返回新的时间戳
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
//
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
- API调用方式
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
object TestAscendingTimestampExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj2] = stream1.map(data => {
val arr = data.split(",")
Obj2(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj2] {
override def extractAscendingTimestamp(element: Obj2) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time + 2000
}
})
}
case class Obj2(id:String,time:Long)
3.1.2 BoundedOutOfOrdernessTimestampExtractor
固定延迟的时间戳分配器,适用于有乱序的数据流中,但是最大的延迟时间已经基本了解的情况。
- 源码
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
- API调用方式
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
object TestBoundedOutOfOrdernessTimestampExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj3] = stream1.map(data => {
val arr = data.split(",")
Obj3(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Obj3](Time.seconds(3)
) {
override def extractTimestamp(element: Obj3) = element.time * 1000
})
env.execute()
}
case class Obj3(id:String,time:Long)
3.1.3 IngestionTimeExtractor
IngestionTimeExtractor用于指定时间特性为IngestionTime时,直接生成时间戳和获取水印。
- 源码
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
// 确保时间戳单调增加,即使系统时钟重新同步也是如此
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
- API调用方式
- 需要注意将时间语义修改为 IngestionTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
object TestIngestionTimeExtractor extends App{
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用 IngestionTime 时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj4] = stream1.map(data => {
val arr = data.split(",")
Obj4(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor[Obj4])
env.execute()
}
case class Obj4(id:String,time:Long)
3.1.4 自定义 WaterMark
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
object TestCustomAssignerWithPeriodicWatermarks {
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj5] = stream1.map(data => {
val arr = data.split(",")
Obj5(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new CustomPeriodicAssiner)
env.execute()
}
case class Obj5(id:String,time:Long)
// 周期性生成 WaterMark
class CustomPeriodicAssiner extends AssignerWithPeriodicWatermarks[Obj5] {
/** 延迟时间为 1 分钟 */
val bound: Long = 60 * 1000L
/** 观察到的最大时间戳 */
var maxTs: Long = Long.MinValue
/** 生成当前的 WaterMark */
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
/**
* 抽取时间戳的方法
* @param timeStamp 数据
* @param previousElementTimestamp
* @return
*/
def extractTimestamp(timeStamp: Obj5, previousElementTimestamp: Long): Long = {
maxTs = previousElementTimestamp.max(timeStamp.time)
timeStamp.time
}
}
3.2 AssignerWithPunctuatedWatermarks
没有时间规律,可以按照自己定义的规则生成 WaterMark。
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T] = {
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}
3.2.1 AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/2 18:11
**/
object TestCustomAssignerWithPunctuatedWatermarks extends App {
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj6] = stream1.map(data => {
val arr = data.split(",")
Obj6(arr(0), arr(1).toLong)
}).assignTimestampsAndWatermarks(new CustomPunctuatedAssigner[Obj6])
env.execute()
}
// 按照自己的规则生成 WaterMark
class CustomPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Obj6] {
/** 观察到的最大时间戳 */
val bound: Long = 60 * 1000
/**根据数据生成 WaterMark*/
override def checkAndGetNextWatermark(r: Obj6, extractedTS: Long): Watermark = {
if (r != null) {
new Watermark(extractedTS - bound)
} else {
null
}
}
// 从数据中抽取时间戳的方式
def extractTimestamp(r: Obj6, previousTS: Long): Long = {
r.time
}
}
case class Obj6(id:String,time:Long)
4. WaterMark 的问题
Flink 中WaterMark 由程序开发人员设定,通常需要对业务数据有一定的了解。
如果 WaterMark 设置的太久,收到过的速度就会慢很多,解决办法是在到达 WaterMark 之前输出一个近似结果。
如果 WaterMark 输出的太早,则可能出现错误的结果,不过 Flink 处理延迟数据的机制可以解决这个问题。