Flink系列 12. 介绍Flink中 Timer 的使用


1. Timer 介绍

  • Timer(定时器)是 Flink 提供的用于 Processing Time 或 Event Time 变化的机制。

  • Timer 是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。

  • Timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的

  • Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。

2.TimeService 接口

/**
 * 用于处理时间和计时器的接口
 */
public interface TimerService {

   /** 注册计时器时的 {@link UnsupportedOperationException} 错误字符串 */
   String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";

   /** 删除计时器时的 {@link UnsupportedOperationException} 错误字符串 */
   String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

   /** 返回当前的 processing time */
   long currentProcessingTime();

   /** 返回当前的 event-time watermark. */
   long currentWatermark();

   /**
    * 注册一个 processing time 的定时器。当 processing time 到达设定的时间,触发timer
    * 当定时器 timer 触发时, 会执行回调函数 onTimer()
    * 定时器可以限定在 keyed 或者窗口内部
    * 当设置一个定时器在 keyed 上下文中,如KeyedStream, 如果 收到计时器通知时,该上下文也将处于活动状态
    */
   void registerProcessingTimeTimer(long time);

   /**
    * 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数
    * 当定时器 timer 触发时, 会执行回调函数 onTimer()
    * 定时器可以限定在 keyed 或者窗口内部
    * 当设置一个定时器在 keyed 上下文中,如KeyedStream, 如果 收到计时器通知时,该上下文也将处于活动状态
    */
   void registerEventTimeTimer(long time);

   /**
    * 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
    * 定时器可以限定在 keyed 或者窗口内部
    * 删除计时器时,会将其从当前键控上下文中删除
    */
   void deleteProcessingTimeTimer(long time);

   /**
    * 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
    * 删除具有给定触发时间的事件时间计时器。 仅当此类计时器先前已注册并且尚未过期时,此方法才有效
    * 计时器可以在内部限定于键和/或窗口。 删除计时器时,会将其从当前键控上下文中删除
    */
   void deleteEventTimeTimer(long time);
}

3. Timer 使用场景

3.1 KeyedProcessFunction中使用 Timer

在大多数情况下,使用 Timer 的方式就是在 KeyedProcessFunction 的 processElement() 方法中注册 Timer,然后覆写其 onTimer() 方法作为Timer触发时的回调逻辑。当到达触发条件时触发 onTimer() 方法,对于时间语义不同的场景,注册 Timer 的方法会稍有不同。

  • Processing Time:调用 Context.timerService().registerProcessingTimeTimer() 注册 Timer。onTimer() 在系统时间戳达到Timer设定的时间戳时触发。
  • Event Time:调用 Context.timerService().registerEventTimeTimer() 注册。onTimer() 在 Flink 内部WaterMark 达到或超过 Timer 设定的时间戳时触发。
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestTimers extends App {
  // 创建执行环境
  private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  private val value: DataStream[Obj1] = stream1
    .map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    })
  private val value1: KeyedStream[Obj1, String] = value.keyBy(_.id)

  value1.process(new CustomKeyedProcessFunction)
    .print("TestKeyedProcessFunction")
  env.execute()
}

/**
  * KeyedProcessFunction
  * String, 输入的 key 的数据类型
  * Obj1, 输入的数据类型
  * String 输出的数据类型
  */
class CustomKeyedProcessFunction extends KeyedProcessFunction[String, Obj1, String]{

  override def processElement(value: Obj1, ctx: KeyedProcessFunction[String, Obj1, String]#Context, out: Collector[String]): Unit = {
    println(s"当前 key:${ctx.getCurrentKey}")
    println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()}")

    if (value.id == "1") {
      val timestamp = System.currentTimeMillis() + 10000
      println(s"设置定时器,触发时间为:$timestamp")
      ctx.timerService().registerProcessingTimeTimer(timestamp);
    }
    out.collect(value.name)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Obj1, String]#OnTimerContext, out: Collector[String]): Unit = {
    println(s"定时器触发,时间为:$timestamp")
    out.collect(s"$timestamp")
  }
}

3.2 Window trigger 中使用 Timer

Timer 在窗口操作中也有着很多的使用,例如 Trigger,在onElement 会设置 Timer 的触发时间,当时间到达后触发 onEventTime() 进而关联触发该窗口相关联的Trigger。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 13. 介绍Flink中的Operator State 和 Keyed State Flink系列 13. 介绍Flink中的Operator State 和 Keyed State
1. Flink State 介绍1.1 什么是 State(状态) 由一个任务维护,并且用来计算某个结果的所有数据,就属于这个任务的状态 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问 当任务失败时,可以使用状态恢复数据 状态始终
2020-09-06
下一篇 
Flink系列 11. 介绍Flink中 ProcessFunction 的使用 Flink系列 11. 介绍Flink中 ProcessFunction 的使用
1. ProcessFunction 介绍1.1 介绍Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用
2020-08-25
  目录