1. Timer 介绍
Timer(定时器)是 Flink 提供的用于 Processing Time 或 Event Time 变化的机制。
Timer是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。
Timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的
Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。
2.TimeService 接口
/**
* 用于处理时间和计时器的接口
*/
public interface TimerService {
/** Error string for {@link UnsupportedOperationException} on registering timers. */
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
/** Error string for {@link UnsupportedOperationException} on deleting timers. */
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
/** 返回当前的 processing time */
long currentProcessingTime();
/** 返回当前的 event-time watermark. */
long currentWatermark();
/**
* 注册一个 processing time 的定时器。当 processing time 到达设定的时间,触发timer
* 当定时器 timer 触发时, 会执行回调函数 onTimer()
* 定时器可以限定在 keyed 或者窗口内部
* 当设置一个定时器在 keyed 上下文中,如KeyedStream, 如果 收到计时器通知时,该上下文也将处于活动状态
*/
void registerProcessingTimeTimer(long time);
/**
* 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数
* 当定时器 timer 触发时, 会执行回调函数 onTimer()
* 定时器可以限定在 keyed 或者窗口内部
* 当设置一个定时器在 keyed 上下文中,如KeyedStream, 如果 收到计时器通知时,该上下文也将处于活动状态
*/
void registerEventTimeTimer(long time);
/**
* 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
* 定时器可以限定在 keyed 或者窗口内部
* 删除计时器时,会将其从当前键控上下文中删除
*/
void deleteProcessingTimeTimer(long time);
/**
* 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
* 删除具有给定触发时间的事件时间计时器。 仅当此类计时器先前已注册并且尚未过期时,此方法才有效
* 计时器可以在内部限定于键和/或窗口。 删除计时器时,会将其从当前键控上下文中删除
*/
void deleteEventTimeTimer(long time);
}
3.Timer 使用场景
3.1 KeyedProcessFunction中使用 Timer
在大多数情况下,使用 Timer 的方式就是在 KeyedProcessFunction 的 processElement() 方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。当到达触发条件时触发 onTimer() 方法,对于时间语义不同的场景,注册 Timer 的方法会稍有不同。
- Processing Time:调用Context.timerService().registerProcessingTimeTimer()注册 Timer。onTimer()在系统时间戳达到Timer设定的时间戳时触发。
- Event Time:调用Context.timerService().registerEventTimeTimer()注册。onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestTimers extends App {
// 创建执行环境
private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
private val value: DataStream[Obj1] = stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
private val value1: KeyedStream[Obj1, String] = value.keyBy(_.id)
value1.process(new CustomKeyedProcessFunction)
.print("TestKeyedProcessFunction")
env.execute()
}
/**
* KeyedProcessFunction
* String, 输入的 key 的数据类型
* Obj1, 输入的数据类型
* String 输出的数据类型
*/
class CustomKeyedProcessFunction extends KeyedProcessFunction[String, Obj1, String]{
override def processElement(value: Obj1, ctx: KeyedProcessFunction[String, Obj1, String]#Context, out: Collector[String]): Unit = {
println(s"当前 key:${ctx.getCurrentKey}")
println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()}")
if (value.id == "1") {
val timestamp = System.currentTimeMillis() + 10000
println(s"设置定时器,触发时间为:$timestamp")
ctx.timerService().registerProcessingTimeTimer(timestamp);
}
out.collect(value.name)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Obj1, String]#OnTimerContext, out: Collector[String]): Unit = {
println(s"定时器触发,时间为:$timestamp")
out.collect(s"$timestamp")
}
}
3.2 Window trigger 中使用 Timer
Timer 在窗口操作中也有着很多的使用,例如 Trigger,在onElement 会设置 Timer 的触发时间,当时间到达后触发 onEventTime() 进而关联触发该窗口相关联的Trigger。