Flink系列 9. 介绍 Flink 窗口触发器、移除器和延迟数据等


1. 触发器

1.1 触发器介绍

  • 当数据到达某种情况,符合了某种条件去关闭窗口,这种到达条件就是触发。
  • 设置到达某种条件并触发窗口执行就是触发器。
  • 触发器的作用就是控制什么时候进行数据的聚合计算
  • flink 中的窗口计算依赖于触发器。
  • 每种窗口类型都会有默认的触发器。
  • 默认的Trigger不能满足需求,可以使用自定义的 trigger

1.2 Trigger 抽象类结构

Flink 中有一个 Trigger 抽象类,它是所有窗口触发器的父类。Flink 中触发器相关类集成图如下:

Trigger 接口有六个方法,允许trigger对不同的事件做出反应:

  • onElement():进入窗口的每个元素都会调用该方法。

  • onEventTime():事件时间timer触发的时候被调用。

  • onProcessingTime():处理时间timer触发的时候会被调用。

  • onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。

  • canMerge:如果此触发器支持合并触发器则返回 true,否则返回 false

  • clear():该方法主要是执行窗口的删除操作。

  • TriggerContext 接口是Trigger的一个内部接口它的方法如下:

    • getCurrentProcessingTime:获取当前处理时间
    • getMetricGroup:获取触发器的 metric group
    • getCurrentWatermark:获得当前的 WaterMark
    • registerProcessingTimeTimer:注册系统回调时间,当系统时间超过指定时间后,将使用此处指定的时间
    • registerEventTimeTimer:注册 EventTime 回调时间,当前水印经过指定的时间时,将使用此处指定的时间onEventTime
    • deleteProcessingTimeTimer:删除给定时间的处理时间触发器
    • deleteEventTimeTimer:删除给定时间的事件时间触发器。
    • getPartitionedState:检索一个State对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互
    • getKeyValueState:检索一个ValueState对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互

1.3 TriggerResult 枚举类结构

TriggerResult是触发方法的结果类型,onElement()、onEventTime()、onProcessingTime()方法都需要返回一个TriggerResult, 这确定了窗口会发生什么情况,例如是应调用窗口函数还是应丢弃窗口。
如果Trigger返回 FIRE 或 FIRE_AND_PURGE 但是窗口不包含任何数据,则不会调用窗口函数,即不会为该窗口生成任何数据。

  • fire:表示是否要触发window 的 computation 操作
  • purge:表示是否要清理 window 的数据
操作 是否触发窗口计算
fire
是否清理数据
purge
CONTINUE × ×
FIRE ×
FIRE_AND_PURGE
PURGE ×

1.4 实现的触发器类型

触发器类型 说明
EventTimeTrigger 通过对比 Watermark 和窗口的 Endtime 确定是否触发窗口计算,如果 Watermark 大于 Window EndTime 则触发,否则不触发,窗口将继续等待
ProcessingTimeTrigger 通过对比 ProcessTime 和窗口 EndTme 确定是否触发窗口,如果 ProcessTime 大于 EndTime 则触发计算,否则窗口继续等待
ContinuousEventTimeTrigger 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 EndTime 触发窗口计算
ContinuousProcessingTimeTrigger 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 ProcessTime 触发窗口计算
CounterTrigger 根据接入数据量是否超过设定的阙值判断是否触发窗口计算
PurgingTrigger 可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后计算完成的数据将被清理
DeltaTrigger 根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算
NeverTrigger 永远不会触发的触发器,它是 GlobalWindow 的默认触发器

Flink 中触发器相关的类继承图

2. 窗口触发器介绍

2.1 EventTimeTrigger

  • 源码
@Override
public TriggerResult onElement(
  Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

   // window.maxTimestamp():当前窗口的最大时间戳的数值
   // 当前 WaterMark 值,当WaterMark 大于等于窗口的最大时间戳时触发窗口计算
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE;
   } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
   }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
   return time == window.maxTimestamp() ?
      TriggerResult.FIRE :
      TriggerResult.CONTINUE;
}
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

object TestEventTimeTrigger extends App {

    val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
    stream1.print("stream1")
    val stream2: DataStream[Obj1] = stream1.map(data => {
      val arr = data.split(",")
      //println(arr)
      Obj1(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
      override def extractAscendingTimestamp(element: Obj1) = {
        // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
        element.time * 1000
      }
    })

    stream2.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(6)))
      .trigger(EventTimeTrigger.create()) // 设置 EventTimeTrigger
      .reduce(new MinDataReduceFunction)
      .print("TumblingEventTimeWindow")

    environment.execute()
}

2.2 ProcessingTimeTrigger

  • 源码
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger

object TestProcessingTimeTrigger extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    .trigger(ProcessingTimeTrigger.create())
    .reduce(new MinDataReduceFunction)
    .print()

  environment.execute()

}

2.3 ContinuousEventTimeTrigger

ContinuousEventTimeTrigger表示连续事件时间触发器,用在EventTime属性的任务流中,以事件时间的进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousEventTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。

  • 源码
@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerEventTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

        if (time == window.maxTimestamp()){
            return TriggerResult.FIRE;
        }

        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.get();

        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.clear();
            fireTimestampState.add(time + interval);
            ctx.registerEventTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger

object TestContinuousEventTimeTrigger extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
  stream1.print("stream1")
  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    //println(arr)
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(600))) // 设置窗口时间是十分钟
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) // 每十秒钟计算一次窗口数据
    .reduce(new MinDataReduceFunction)
    .print("TumblingEventTimeWindow")

  environment.execute()
}

2.4 ContinuousProcessingTimeTrigger

ContinuousProcessingTimeTrigger 表示连续ProcessingTime触发器,用在ProcessingTime属性的任务流中,以ProcessingTime进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousProcessingTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。

  • 源码
@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, ProcessingTimeTrigger}

object TestContinuousProcessingTimeTrigger extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 600 秒的窗口
  stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(600)))
    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) // 每 10 秒触发一次窗口计算
    .reduce(new MinDataReduceFunction)
    .print()

  environment.execute()

}

2.5 CounterTrigger

  • 源码
@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }
  • 使用
import com.hnbian.flink.common.Record
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger

object TestCounterTrigger extends App {
  // 创建执行环境
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Record] = stream1.map(data => {
    val arr = data.split(",")
    Record(arr(0), arr(1), arr(2).toInt)
  })

  stream2.map(record=>{
    (record.classId,record.age)
  }).keyBy(_._1)
    .timeWindow(Time.seconds(20)) // 默认使用的是 processing time
    .trigger(CountTrigger.of(2)) // 设置 CountTrigger
    .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
    .print("TestCounterTrigger")
  env.execute()

}

2.6 PurgingTrigger

  • PurgingTrigger 清理计算结果的说明

假如我们定义一个5分钟的基于 EventTime 的滚动窗口,定义一个每2分触发计算的 ContinuousEventTimeTrigger,有4条数据事件时间分别是20:01、20:02、20:03、20:04,对应的值分别是1、2、3、2,我们要对值做 Sum 操作。

1.初始时 State 和 Result 中的值都为0

2.数据20:01进入窗口时State的值为1,此时还没有到达Trigger的触发时间

3.第二条数据20:02进入窗口State中的值为1+2=3,此时达到2分钟满足Trigger的触发条件,故Result输出结果为3

4.第三条数据20:03进入窗口State中的值为3+3=6,此时未达到 Trigger 触发条件没有结果输出

5.第四条数据20:04进入窗口State中的值更新为6+2=8,此时又到了2分钟达到了Trigger触发时间,所以输出结果为8

如果我们把结果输出到支持 update 的存储,比如 MySQL,那么结果值就由之前的3更新成了8。但是如果Result 所在的存储介质只支持 add 操作会有什么样的结果呢?

6.因为 Result只支持 add 所以 Result 中存在两条结果数据

如果在此基础上做计算,那么数据处理就会出现问题,这时如果使用 🌟PurgingTrigger 就可以很简单的解决这个问题。

  • 使用 PurgingTrigger 的计算流程

    和上面的示例一样,唯一的不同是在 ContinuousEventTimeTrigger 外面包装了一个 PurgingTrigger,其作用是在 ContinuousEventTimeTrigger 触发窗口计算之后将窗口的 State 中的数据清除。

1.初始时,State 和 Result 中的值都为0

2.数据20:01进入窗口时State的值为1,此时还没有到达Trigger的触发时间

3.第二条数据20:02进入窗口State中的值为1+2=3,此时达到2分钟满足Trigger的触发条件,故Result输出结果为3

4.窗口出发后由于 PurgingTrigger 的作用,State中的数据会被清除

5.数据20:03进入窗口时因为之前的State已经清空所以当前State的值为3,此时还没有到达Trigger的触发时间

6.数据20:04进入窗口后到达Trigger的触发时间,计算结果为 5 写入 Result

由于结果输出是 append 模式,会输出3和5两条数据,然后再做 Sum 也能得到正确的结果。

  • 源码
    @Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }
  • 使用
import com.hnbian.flink.common.{Record}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.Window

object TestPurgingTrigger extends App {
  // 创建执行环境
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Record] = stream1.map(data => {
    val arr = data.split(",")
    Record(arr(0), arr(1), arr(2).toInt)
  })

  stream2.map(record=>{
    (record.classId,record.age)
  }).keyBy(_._1)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(10)))// 默认使用的是 processing time
    .trigger(PurgingTrigger.of(ContinuousProcessingTimeTrigger.of[Window](Time.seconds(10))))
    .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
    .print("TestPurgingTrigger")
  env.execute()
}

2.7 DeltaTrigger

基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值则触发。

  • 源码
@Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        }
        if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
import org.apache.flink.streaming.api.windowing.windows.Window

object TestDeltaTrigger extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    //println(arr)
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  private val deltaFunction: DeltaFunction[Obj1] = new DeltaFunction[Obj1] {
    override def getDelta(oldDataPoint: Obj1, newDataPoint: Obj1) = {
      newDataPoint.time - oldDataPoint.time
    }
  }

  private val deltaTrigger: DeltaTrigger[Obj1, Window]
  = DeltaTrigger.of[Obj1, Window](10L, deltaFunction, createTypeInformation[(Obj1)].createSerializer(environment.getConfig))

  stream2.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.minutes(10))) // 设置窗口时间是十分钟
    .trigger(deltaTrigger)
    .reduce(new MinDataReduceFunction)
    .print("TumblingEventTimeWindow")

  environment.execute()
}

2.8 NeverTrigger

  • 源码
@Override
        public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

2.9 自定义触发器

  • 功能说明
    • 增加一个自定义触发器,
    • 当数量达到 5 条记录时触发计算
    • 当时间到 5 秒时触发计算
    • 没有数据不触发计算
    • 窗口关闭时清理数据
  • 代码
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object TestCustomTrigger extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(10)))
    .trigger(new CustomTrigger[TumblingProcessingTimeWindows](5,5))
    .reduce(new MinDataReduceFunction)
    .print("TestCustomTrigger")

  environment.execute()
}

/**
  * 计数方法 记录当前数据量
  */
class Sum extends ReduceFunction[Long] {
  override def reduce(value1: Long, value2: Long): Long = value1 + value2
}

/**
  * 更新状态为最新的时间戳
  */
class Update extends ReduceFunction[Long] {
  override def reduce(value1: Long, value2: Long): Long = value2
}

/**
  * 实现自定义的 window,
  * 在指定的 时间或者数量触发窗口计算
  * @tparam Window
  */
class CustomTrigger[Window] extends Trigger[Object,TimeWindow]{
  //触发计算的最大数量
  private var maxCount: Long = _
  //定时触发间隔时长 (ms)
  private var interval: Long = 60 * 1000

  /**
    * @param maxCount 触发窗口计算的数据量
    * @param interval 触发窗口计算的时间间隔 单位秒
    */
  def this(maxCount: Long,interval:Long) {
    this()
    this.maxCount = maxCount
    this.interval = interval * 1000
  }
  // 记录当前数量的状态
  private val countStateDescriptor:ReducingStateDescriptor[Long]  = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
  // 记录 processTime 定时触发时间的状态
  private val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
  // 记录 eventTime 定时触发时间的状态
  private val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])

  override def onElement(element: Object, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

    val countState: ReducingState[Long] = ctx.getPartitionedState(countStateDescriptor)
    countState.add(1L) //计数状态加1

    println(s"当前数据量为:${countState.get()}")

    if (countState.get() >= this.maxCount) {
      //达到指定指定数量
      clear(window,ctx)
      println(s"数据量达到 $maxCount , 触发计算")
      //触发计算
      TriggerResult.FIRE_AND_PURGE
    } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {

      val nextFire = ctx.getCurrentProcessingTime + interval
      println(s"未达到指定数量,设置下次触发计算的时间为:${nextFire}")
      //未达到指定数量,且没有指定定时器,需要指定定时器
      //当前定时器状态值加上间隔值
      ctx.getPartitionedState(processTimerStateDescriptor).add(nextFire)
      //注册定执行时间定时器
      ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
      TriggerResult.CONTINUE
    } else {
      TriggerResult.CONTINUE
    }
  }

  // ProcessingTime 定时器触发
  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

    if(0 == ctx.getPartitionedState(processTimerStateDescriptor).get()){
      println("还没有指定下次触发窗口计算的时间")
    }else{
      println(s"距离触发下次窗口计算还有( ${ctx.getPartitionedState(processTimerStateDescriptor).get().-(time)/1000} )秒")
    }

    // 如果计数器数量大于0 并且 当前时间大于等于触发时间
    if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() <= time)) {
      println(s"数据量未达到 $maxCount ,由执行时间触发器 ${ctx.getPartitionedState(processTimerStateDescriptor).get() } 触发计算")
      clear(window,ctx)
      TriggerResult.FIRE_AND_PURGE
    } else {
      TriggerResult.CONTINUE
    }
  }

  /**
    * 窗口结束时清空状态
    * @param window
    * @param ctx
    */
  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
    // 清理数量状态
    ctx.getPartitionedState(countStateDescriptor).clear()
    // 清理 processTimer 状态
    ctx.getPartitionedState(processTimerStateDescriptor).clear()
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

// 执行结果

当前数据量为:1
未达到指定数量,设置下次触发计算的时间为:1609850677798
Obj1(1,小红,1598018543)
当前数据量为:2
Obj1(1,小红,1598018543)
当前数据量为:3
Obj1(1,小红,1598018543)
当前数据量为:4
Obj1(1,小红,1598018543)
当前数据量为:5
数据量达到 5 , 触发计算
6> Obj1(1,小红,1598018543)
还没有指定下次触发窗口计算的时间
当前数据量为:1
未达到指定数量,设置下次触发计算的时间为:1609850684226
距离触发下次窗口计算还有( 0 )秒
数据量未达到 5 ,由执行时间触发器 1609850684226 触发计算
6> Obj1(1,小红,1598018543)

3. 移除器

  • Flink 窗口模式中可以使用别的算子Evictor(移除器),应用在 WindowAssigner 和 trigger 之间。通过 evictor() 方法使用。

  • Evictor 能够在 element 进入 Window 窗口聚合之前进行移除数据或者在进入Window窗口聚合后,Trigger触发计算操作之前移除数据。

  • Evictor接口有一下两个方法:

    /**
     * 在Window Function之前调用 移除窗口元素
     *
     * @param elements 当前窗口中的元素
     * @param size 当前窗口中的元素数量
     * @param window 当前窗口
     * @param evictorContext evictor的上下文
   */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

    /**
     * 在Window Function之后调用移除窗口元素
     *
     * @param elements 当前窗口中的元素
     * @param size 当前窗口中的元素数量
     * @param window 当前窗口
     * @param evictorContext evictor的上下文
     */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

移除器相关类继承图

  • Flink为我们实现了3个Evictor:
    • CountEvictor:保持窗口中用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
    • TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其元素的最大时间戳,并删除时间戳小于的所有元素max_ts-interval。
    • DeltaEvictor:取一个DeltaFunction和一个threshold,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除Delta大于或等于阈值的值。

3.1 CountEvictor

  • 源码
@Override
    public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (size <= maxCount) {
            return;
        } else {
            int evictedCount = 0;
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
                iterator.next();
                evictedCount++;
                if (evictedCount > size - maxCount) {
                    break;
                } else {
                    iterator.remove();
                }
            }
        }
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time

object TestCountEvictor extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    // 使用 CountEvictor 移除器,设置在开窗之前移除数据
    .evictor(CountEvictor.of(10L,false))
    .reduce(new MinDataReduceFunction)
    .print()
  environment.execute()
}

3.2 TimeEvictor

  • 源码
@Override
    public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (!hasTimestamp(elements)) {
            return;
        }

        long currentTime = getMaxTimestamp(elements); // 获取最大时间
        long evictCutoff = currentTime - windowSize; // 确定移除数据时间范围

        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
            TimestampedValue<Object> record = iterator.next();
            if (record.getTimestamp() <= evictCutoff) { // 如果数据时间小于等于移除时间 则移除数据
                iterator.remove();
            }
        }
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time

object TestTimeEvictor extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    // 使用 TimeEvictor 移除器,设置在开窗之前移除数据
    .evictor(TimeEvictor.of(Time.seconds(3),false))
    .reduce(new MinDataReduceFunction)
    .print()
  environment.execute()
}

3.3 DeltaEvictor

  • 源码
    @Override
    public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
        TimestampedValue<T> lastElement = Iterables.getLast(elements);
        for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
            TimestampedValue<T> element = iterator.next();
      // 使用给定的delta 函数计算结果,如果结果大于等于设置的阈值则移除数据
            if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
                iterator.remove();
            }
        }
    }
  • 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor
import org.apache.flink.streaming.api.windowing.time.Time

object TestDeltaEvictor extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  private val deltaFunction: DeltaFunction[Obj1] = new DeltaFunction[Obj1] {
    override def getDelta(oldDataPoint: Obj1, newDataPoint: Obj1) = {
      newDataPoint.time - oldDataPoint.time
    }
  }
  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    // 使用 DeltaEvictor移除器,设置在开窗之前移除数据
    .evictor(DeltaEvictor.of(10,deltaFunction,false))
    .reduce(new MinDataReduceFunction)
    .print()
  environment.execute()
}

4. 延迟数据处理

  • 当 WaterMark 到达以后并且已经出发了窗口计算,那么如果再有前一个窗口的数据到达时,这部分数据时会被丢掉的,那么如何避免这部分数据被丢掉呢?allowedLateness就是为了解决这个问题而产生的。

  • allowedLateness是针对 WaterMark 触发窗口计算之后,这个窗口还未到达的数据进行等待,之后对这部分数据再次处理的逻辑。

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TestAllowedLateness extends App {

  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
    override def extractTimestamp(element: Obj1) = element.time * 1000
  })

  private val outputTag = new OutputTag[Obj1]("lastObj1")
  // 设置一个窗口时间是 10 秒的窗口
  private val stream3: DataStream[Obj1] = stream2.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .allowedLateness(Time.seconds(5)) // 在设置允许数据延迟 5 秒
    .sideOutputLateData(outputTag) // 将延迟数据输出到侧输出流中
    .reduce(new MinDataReduceFunction)

  private val lastObj1: DataStream[Obj1] = stream3.getSideOutput(outputTag)

  lastObj1.print("window 迟到数据")
  stream3.print("window 计算结果")

  environment.execute()
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
hexo博客中增加知识图谱页 hexo博客中增加知识图谱页
1. 增加页面新建知识图谱页面 hexo new page "graph" 执行完成之后在source 下面会增加 graph 文件夹以及文件夹中增加一个 index.md 文件 2. 配置菜单在 主题下面的_config.yml 文件中配
2020-07-31
下一篇 
Flink系列 8. 介绍Flink中的窗口类型与相关操作 Flink系列 8. 介绍Flink中的窗口类型与相关操作
1. window 的概念 一般真实的流都是无界的,怎样处理无界的数据? 可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。 window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中
2020-07-20
  目录