1. 触发器
1.1 触发器介绍
- 当数据到达某种情况,符合了某种条件去关闭窗口,这种到达条件就是触发。
- 设置到达某种条件并触发窗口执行就是触发器。
- 触发器的作用就是控制什么时候进行数据的聚合计算
- flink 中的窗口计算依赖于触发器。
- 每种窗口类型都会有默认的触发器。
- 默认的Trigger不能满足需求,可以使用自定义的 trigger
1.2 Trigger 抽象类结构
Flink 中有一个 Trigger 抽象类,它是所有窗口触发器的父类。Flink 中触发器相关类集成图如下:
Trigger 接口有六个方法,允许trigger对不同的事件做出反应:
onElement():进入窗口的每个元素都会调用该方法。
onEventTime():事件时间timer触发的时候被调用。
onProcessingTime():处理时间timer触发的时候会被调用。
onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
canMerge:如果此触发器支持合并触发器则返回 true,否则返回 false
clear():该方法主要是执行窗口的删除操作。
TriggerContext 接口是Trigger的一个内部接口它的方法如下:
- getCurrentProcessingTime:获取当前处理时间
- getMetricGroup:获取触发器的 metric group
- getCurrentWatermark:获得当前的 WaterMark
- registerProcessingTimeTimer:注册系统回调时间,当系统时间超过指定时间后,将使用此处指定的时间
- registerEventTimeTimer:注册 EventTime 回调时间,当前水印经过指定的时间时,将使用此处指定的时间onEventTime
- deleteProcessingTimeTimer:删除给定时间的处理时间触发器
- deleteEventTimeTimer:删除给定时间的事件时间触发器。
- getPartitionedState:检索一个State对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互
- getKeyValueState:检索一个ValueState对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互
1.3 TriggerResult 枚举类结构
TriggerResult是触发方法的结果类型,onElement()、onEventTime()、onProcessingTime()方法都需要返回一个TriggerResult, 这确定了窗口会发生什么情况,例如是应调用窗口函数还是应丢弃窗口。
如果Trigger返回 FIRE 或 FIRE_AND_PURGE 但是窗口不包含任何数据,则不会调用窗口函数,即不会为该窗口生成任何数据。
- fire:表示是否要触发window 的 computation 操作
- purge:表示是否要清理 window 的数据
| 操作 | 是否触发窗口计算 fire |
是否清理数据 purge |
|---|---|---|
| CONTINUE | × | × |
| FIRE | √ | × |
| FIRE_AND_PURGE | √ | √ |
| PURGE | √ | × |
1.4 实现的触发器类型
| 触发器类型 | 说明 |
|---|---|
| EventTimeTrigger | 通过对比 Watermark 和窗口的 Endtime 确定是否触发窗口计算,如果 Watermark 大于 Window EndTime 则触发,否则不触发,窗口将继续等待 |
| ProcessingTimeTrigger | 通过对比 ProcessTime 和窗口 EndTme 确定是否触发窗口,如果 ProcessTime 大于 EndTime 则触发计算,否则窗口继续等待 |
| ContinuousEventTimeTrigger | 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 EndTime 触发窗口计算 |
| ContinuousProcessingTimeTrigger | 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 ProcessTime 触发窗口计算 |
| CounterTrigger | 根据接入数据量是否超过设定的阙值判断是否触发窗口计算 |
| PurgingTrigger | 可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后计算完成的数据将被清理 |
| DeltaTrigger | 根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算 |
| NeverTrigger | 永远不会触发的触发器,它是 GlobalWindow 的默认触发器 |
2. 窗口触发器介绍
2.1 EventTimeTrigger
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
2.2 ProcessingTimeTrigger
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
2.3 ContinuousEventTimeTrigger
ContinuousEventTimeTrigger表示连续事件时间触发器,用在EventTime属性的任务流中,以事件时间的进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousEventTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
2.4 ContinuousProcessingTimeTrigger
ContinuousProcessingTimeTrigger 表示连续ProcessingTime触发器,用在ProcessingTime属性的任务流中,以ProcessingTime进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousProcessingTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
2.5 CounterTrigger
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.Record |
2.6 PurgingTrigger
- PurgingTrigger 清理计算结果的说明
假如我们定义一个5分钟的基于 EventTime 的滚动窗口,定义一个每2分触发计算的 ContinuousEventTimeTrigger,有4条数据事件时间分别是20:01、20:02、20:03、20:04,对应的值分别是1、2、3、2,我们要对值做 Sum 操作。
如果我们把结果输出到支持 update 的存储,比如 MySQL,那么结果值就由之前的3更新成了8。但是如果Result 所在的存储介质只支持 add 操作会有什么样的结果呢?
如果在此基础上做计算,那么数据处理就会出现问题,这时如果使用 🌟PurgingTrigger 就可以很简单的解决这个问题。
使用 PurgingTrigger 的计算流程
和上面的示例一样,唯一的不同是在 ContinuousEventTimeTrigger 外面包装了一个 PurgingTrigger,其作用是在 ContinuousEventTimeTrigger 触发窗口计算之后将窗口的 State 中的数据清除。
由于结果输出是 append 模式,会输出3和5两条数据,然后再做 Sum 也能得到正确的结果。
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{Record} |
2.7 DeltaTrigger
基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值则触发。
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
2.8 NeverTrigger
- 源码
1 |
|
2.9 自定义触发器
功能说明
- 增加一个自定义触发器,
- 当数量达到 5 条记录时触发计算
- 当时间到 5 秒时触发计算
- 没有数据不触发计算
- 窗口关闭时清理数据
代码
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
3. 移除器
Flink 窗口模式中可以使用别的算子Evictor(移除器),应用在 WindowAssigner 和 trigger 之间。通过 evictor() 方法使用。
Evictor 能够在 element 进入 Window 窗口聚合之前进行移除数据或者在进入Window窗口聚合后,Trigger触发计算操作之前移除数据。
Evictor接口有一下两个方法:
1 | /** |
- Flink为我们实现了3个Evictor:
- CountEvictor:保持窗口中用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
- TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其元素的最大时间戳,并删除时间戳小于的所有元素max_ts-interval。
- DeltaEvictor:取一个DeltaFunction和一个threshold,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除Delta大于或等于阈值的值。
3.1 CountEvictor
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
3.2 TimeEvictor
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
3.3 DeltaEvictor
- 源码
1 |
|
- 使用
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |
4. 延迟数据处理
当 WaterMark 到达以后并且已经出发了窗口计算,那么如果再有前一个窗口的数据到达时,这部分数据时会被丢掉的,那么如何避免这部分数据被丢掉呢?allowedLateness就是为了解决这个问题而产生的。
allowedLateness是针对 WaterMark 触发窗口计算之后,这个窗口还未到达的数据进行等待,之后对这部分数据再次处理的逻辑。
1 | import com.hnbian.flink.common.{MinDataReduceFunction, Obj1} |