1. Timer 介绍
Timer(定时器)是 Flink 提供的用于 Processing Time 或 Event Time 变化的机制。
Timer 是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。
Timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的
Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。
2.TimeService 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public interface TimerService { String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams." ; String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams." ; long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); }
3. Timer 使用场景 3.1 KeyedProcessFunction中使用 Timer 在大多数情况下,使用 Timer 的方式就是在 KeyedProcessFunction 的 processElement() 方法中注册 Timer,然后覆写其 onTimer() 方法作为Timer触发时的回调逻辑。当到达触发条件时触发 onTimer() 方法,对于时间语义不同的场景,注册 Timer 的方法会稍有不同。
Processing Time:调用 Context.timerService().registerProcessingTimeTimer() 注册 Timer。onTimer() 在系统时间戳达到Timer设定的时间戳时触发。
Event Time:调用 Context.timerService().registerEventTimeTimer() 注册。onTimer() 在 Flink 内部WaterMark 达到或超过 Timer 设定的时间戳时触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestTimers extends App { private val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) private val value: DataStream [Obj1 ] = stream1 .map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) private val value1: KeyedStream [Obj1 , String ] = value.keyBy(_.id) value1.process(new CustomKeyedProcessFunction ) .print("TestKeyedProcessFunction" ) env.execute() } class CustomKeyedProcessFunction extends KeyedProcessFunction [String , Obj1 , String ] { override def processElement (value: Obj1 , ctx: KeyedProcessFunction [String , Obj1 , String ]#Context , out: Collector [String ]): Unit = { println(s"当前 key:${ctx.getCurrentKey} " ) println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()} " ) if (value.id == "1" ) { val timestamp = System .currentTimeMillis() + 10000 println(s"设置定时器,触发时间为:$timestamp " ) ctx.timerService().registerProcessingTimeTimer(timestamp); } out.collect(value.name) } override def onTimer (timestamp: Long , ctx: KeyedProcessFunction [String , Obj1 , String ]#OnTimerContext , out: Collector [String ]): Unit = { println(s"定时器触发,时间为:$timestamp " ) out.collect(s"$timestamp " ) } }
3.2 Window trigger 中使用 Timer Timer 在窗口操作中也有着很多的使用,例如 Trigger,在onElement 会设置 Timer 的触发时间,当时间到达后触发 onEventTime() 进而关联触发该窗口相关联的Trigger。