1. 触发器
1.1 触发器介绍
- 当数据到达某种情况,符合了某种条件去关闭窗口,这种到达条件就是触发。
- 设置到达某种条件并触发窗口执行就是触发器。
- 触发器的作用就是控制什么时候进行数据的聚合计算
- flink 中的窗口计算依赖于触发器。
- 每种窗口类型都会有默认的触发器。
- 默认的Trigger不能满足需求,可以使用自定义的 trigger
1.2 Trigger 抽象类结构
Flink 中有一个 Trigger 抽象类,它是所有窗口触发器的父类。Flink 中触发器相关类集成图如下:
Trigger 接口有六个方法,允许trigger对不同的事件做出反应:
onElement():进入窗口的每个元素都会调用该方法。
onEventTime():事件时间timer触发的时候被调用。
onProcessingTime():处理时间timer触发的时候会被调用。
onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
canMerge:如果此触发器支持合并触发器则返回 true,否则返回 false
clear():该方法主要是执行窗口的删除操作。
TriggerContext 接口是Trigger的一个内部接口它的方法如下:
- getCurrentProcessingTime:获取当前处理时间
- getMetricGroup:获取触发器的 metric group
- getCurrentWatermark:获得当前的 WaterMark
- registerProcessingTimeTimer:注册系统回调时间,当系统时间超过指定时间后,将使用此处指定的时间
- registerEventTimeTimer:注册 EventTime 回调时间,当前水印经过指定的时间时,将使用此处指定的时间onEventTime
- deleteProcessingTimeTimer:删除给定时间的处理时间触发器
- deleteEventTimeTimer:删除给定时间的事件时间触发器。
- getPartitionedState:检索一个State对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互
- getKeyValueState:检索一个ValueState对象,该对象可用于与范围为当前触发器调用的窗口和键的容错状态进行交互
1.3 TriggerResult 枚举类结构
TriggerResult是触发方法的结果类型,onElement()、onEventTime()、onProcessingTime()方法都需要返回一个TriggerResult, 这确定了窗口会发生什么情况,例如是应调用窗口函数还是应丢弃窗口。
如果Trigger返回 FIRE 或 FIRE_AND_PURGE 但是窗口不包含任何数据,则不会调用窗口函数,即不会为该窗口生成任何数据。
- fire:表示是否要触发window 的 computation 操作
- purge:表示是否要清理 window 的数据
操作 | 是否触发窗口计算 fire |
是否清理数据 purge |
---|---|---|
CONTINUE | × | × |
FIRE | √ | × |
FIRE_AND_PURGE | √ | √ |
PURGE | √ | × |
1.4 实现的触发器类型
触发器类型 | 说明 |
---|---|
EventTimeTrigger | 通过对比 Watermark 和窗口的 Endtime 确定是否触发窗口计算,如果 Watermark 大于 Window EndTime 则触发,否则不触发,窗口将继续等待 |
ProcessingTimeTrigger | 通过对比 ProcessTime 和窗口 EndTme 确定是否触发窗口,如果 ProcessTime 大于 EndTime 则触发计算,否则窗口继续等待 |
ContinuousEventTimeTrigger | 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 EndTime 触发窗口计算 |
ContinuousProcessingTimeTrigger | 根据间隔时间周期性触发窗口 或者 Window 的结束时间小于当前 ProcessTime 触发窗口计算 |
CounterTrigger | 根据接入数据量是否超过设定的阙值判断是否触发窗口计算 |
PurgingTrigger | 可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后计算完成的数据将被清理 |
DeltaTrigger | 根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算 |
NeverTrigger | 永远不会触发的触发器,它是 GlobalWindow 的默认触发器 |
2. 窗口触发器介绍
2.1 EventTimeTrigger
- 源码
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// window.maxTimestamp():当前窗口的最大时间戳的数值
// 当前 WaterMark 值,当WaterMark 大于等于窗口的最大时间戳时触发窗口计算
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
object TestEventTimeTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(6)))
.trigger(EventTimeTrigger.create()) // 设置 EventTimeTrigger
.reduce(new MinDataReduceFunction)
.print("TumblingEventTimeWindow")
environment.execute()
}
2.2 ProcessingTimeTrigger
- 源码
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
object TestProcessingTimeTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.trigger(ProcessingTimeTrigger.create())
.reduce(new MinDataReduceFunction)
.print()
environment.execute()
}
2.3 ContinuousEventTimeTrigger
ContinuousEventTimeTrigger表示连续事件时间触发器,用在EventTime属性的任务流中,以事件时间的进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousEventTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。
- 源码
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
}
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()){
return TriggerResult.FIRE;
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
Long fireTimestamp = fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
fireTimestampState.add(time + interval);
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
object TestContinuousEventTimeTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(600))) // 设置窗口时间是十分钟
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) // 每十秒钟计算一次窗口数据
.reduce(new MinDataReduceFunction)
.print("TumblingEventTimeWindow")
environment.execute()
}
2.4 ContinuousProcessingTimeTrigger
ContinuousProcessingTimeTrigger 表示连续ProcessingTime触发器,用在ProcessingTime属性的任务流中,以ProcessingTime进度来推动定期触发。我们可能有疑惑,既然会自动触发窗口计算那么为什么还要设置这个连续事件触发器呢?当设置的窗口时间短的时候我们自然可以等到窗口触发的时间去看结果,但是当一个窗口时间比较长,而我们又想看到实时结果的时候,就需要使用 ContinuousProcessingTimeTrigger 了,它能够指定一个固定的时间间隔,实时看到窗口当前的计算结果,无需等到窗口结束以后才能看到结果。
- 源码
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, ProcessingTimeTrigger}
object TestContinuousProcessingTimeTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 600 秒的窗口
stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(600)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) // 每 10 秒触发一次窗口计算
.reduce(new MinDataReduceFunction)
.print()
environment.execute()
}
2.5 CounterTrigger
- 源码
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
- 使用
import com.hnbian.flink.common.Record
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
object TestCounterTrigger extends App {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.timeWindow(Time.seconds(20)) // 默认使用的是 processing time
.trigger(CountTrigger.of(2)) // 设置 CountTrigger
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("TestCounterTrigger")
env.execute()
}
2.6 PurgingTrigger
- PurgingTrigger 清理计算结果的说明
假如我们定义一个5分钟的基于 EventTime 的滚动窗口,定义一个每2分触发计算的 ContinuousEventTimeTrigger,有4条数据事件时间分别是20:01、20:02、20:03、20:04,对应的值分别是1、2、3、2,我们要对值做 Sum 操作。
如果我们把结果输出到支持 update 的存储,比如 MySQL,那么结果值就由之前的3更新成了8。但是如果Result 所在的存储介质只支持 add 操作会有什么样的结果呢?
如果在此基础上做计算,那么数据处理就会出现问题,这时如果使用 🌟PurgingTrigger 就可以很简单的解决这个问题。
使用 PurgingTrigger 的计算流程
和上面的示例一样,唯一的不同是在 ContinuousEventTimeTrigger 外面包装了一个 PurgingTrigger,其作用是在 ContinuousEventTimeTrigger 触发窗口计算之后将窗口的 State 中的数据清除。
由于结果输出是 append 模式,会输出3和5两条数据,然后再做 Sum 也能得到正确的结果。
- 源码
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
- 使用
import com.hnbian.flink.common.{Record}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.Window
object TestPurgingTrigger extends App {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.minutes(10)))// 默认使用的是 processing time
.trigger(PurgingTrigger.of(ContinuousProcessingTimeTrigger.of[Window](Time.seconds(10))))
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("TestPurgingTrigger")
env.execute()
}
2.7 DeltaTrigger
基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值则触发。
- 源码
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
if (lastElementState.value() == null) {
lastElementState.update(element);
return TriggerResult.CONTINUE;
}
if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
lastElementState.update(element);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
import org.apache.flink.streaming.api.windowing.windows.Window
object TestDeltaTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
private val deltaFunction: DeltaFunction[Obj1] = new DeltaFunction[Obj1] {
override def getDelta(oldDataPoint: Obj1, newDataPoint: Obj1) = {
newDataPoint.time - oldDataPoint.time
}
}
private val deltaTrigger: DeltaTrigger[Obj1, Window]
= DeltaTrigger.of[Obj1, Window](10L, deltaFunction, createTypeInformation[(Obj1)].createSerializer(environment.getConfig))
stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10))) // 设置窗口时间是十分钟
.trigger(deltaTrigger)
.reduce(new MinDataReduceFunction)
.print("TumblingEventTimeWindow")
environment.execute()
}
2.8 NeverTrigger
- 源码
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
2.9 自定义触发器
- 功能说明
- 增加一个自定义触发器,
- 当数量达到 5 条记录时触发计算
- 当时间到 5 秒时触发计算
- 没有数据不触发计算
- 窗口关闭时清理数据
- 代码
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object TestCustomTrigger extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(10)))
.trigger(new CustomTrigger[TumblingProcessingTimeWindows](5,5))
.reduce(new MinDataReduceFunction)
.print("TestCustomTrigger")
environment.execute()
}
/**
* 计数方法 记录当前数据量
*/
class Sum extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}
/**
* 更新状态为最新的时间戳
*/
class Update extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value2
}
/**
* 实现自定义的 window,
* 在指定的 时间或者数量触发窗口计算
* @tparam Window
*/
class CustomTrigger[Window] extends Trigger[Object,TimeWindow]{
//触发计算的最大数量
private var maxCount: Long = _
//定时触发间隔时长 (ms)
private var interval: Long = 60 * 1000
/**
* @param maxCount 触发窗口计算的数据量
* @param interval 触发窗口计算的时间间隔 单位秒
*/
def this(maxCount: Long,interval:Long) {
this()
this.maxCount = maxCount
this.interval = interval * 1000
}
// 记录当前数量的状态
private val countStateDescriptor:ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
// 记录 processTime 定时触发时间的状态
private val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
// 记录 eventTime 定时触发时间的状态
private val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])
override def onElement(element: Object, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
val countState: ReducingState[Long] = ctx.getPartitionedState(countStateDescriptor)
countState.add(1L) //计数状态加1
println(s"当前数据量为:${countState.get()}")
if (countState.get() >= this.maxCount) {
//达到指定指定数量
clear(window,ctx)
println(s"数据量达到 $maxCount , 触发计算")
//触发计算
TriggerResult.FIRE_AND_PURGE
} else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {
val nextFire = ctx.getCurrentProcessingTime + interval
println(s"未达到指定数量,设置下次触发计算的时间为:${nextFire}")
//未达到指定数量,且没有指定定时器,需要指定定时器
//当前定时器状态值加上间隔值
ctx.getPartitionedState(processTimerStateDescriptor).add(nextFire)
//注册定执行时间定时器
ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
TriggerResult.CONTINUE
} else {
TriggerResult.CONTINUE
}
}
// ProcessingTime 定时器触发
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if(0 == ctx.getPartitionedState(processTimerStateDescriptor).get()){
println("还没有指定下次触发窗口计算的时间")
}else{
println(s"距离触发下次窗口计算还有( ${ctx.getPartitionedState(processTimerStateDescriptor).get().-(time)/1000} )秒")
}
// 如果计数器数量大于0 并且 当前时间大于等于触发时间
if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() <= time)) {
println(s"数据量未达到 $maxCount ,由执行时间触发器 ${ctx.getPartitionedState(processTimerStateDescriptor).get() } 触发计算")
clear(window,ctx)
TriggerResult.FIRE_AND_PURGE
} else {
TriggerResult.CONTINUE
}
}
/**
* 窗口结束时清空状态
* @param window
* @param ctx
*/
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
// 清理数量状态
ctx.getPartitionedState(countStateDescriptor).clear()
// 清理 processTimer 状态
ctx.getPartitionedState(processTimerStateDescriptor).clear()
}
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}
// 执行结果
当前数据量为:1
未达到指定数量,设置下次触发计算的时间为:1609850677798
Obj1(1,小红,1598018543)
当前数据量为:2
Obj1(1,小红,1598018543)
当前数据量为:3
Obj1(1,小红,1598018543)
当前数据量为:4
Obj1(1,小红,1598018543)
当前数据量为:5
数据量达到 5 , 触发计算
6> Obj1(1,小红,1598018543)
还没有指定下次触发窗口计算的时间
当前数据量为:1
未达到指定数量,设置下次触发计算的时间为:1609850684226
距离触发下次窗口计算还有( 0 )秒
数据量未达到 5 ,由执行时间触发器 1609850684226 触发计算
6> Obj1(1,小红,1598018543)
3. 移除器
Flink 窗口模式中可以使用别的算子Evictor(移除器),应用在 WindowAssigner 和 trigger 之间。通过 evictor() 方法使用。
Evictor 能够在 element 进入 Window 窗口聚合之前进行移除数据或者在进入Window窗口聚合后,Trigger触发计算操作之前移除数据。
Evictor接口有一下两个方法:
/**
* 在Window Function之前调用 移除窗口元素
*
* @param elements 当前窗口中的元素
* @param size 当前窗口中的元素数量
* @param window 当前窗口
* @param evictorContext evictor的上下文
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* 在Window Function之后调用移除窗口元素
*
* @param elements 当前窗口中的元素
* @param size 当前窗口中的元素数量
* @param window 当前窗口
* @param evictorContext evictor的上下文
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
- Flink为我们实现了3个Evictor:
- CountEvictor:保持窗口中用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
- TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其元素的最大时间戳,并删除时间戳小于的所有元素max_ts-interval。
- DeltaEvictor:取一个DeltaFunction和一个threshold,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除Delta大于或等于阈值的值。
3.1 CountEvictor
- 源码
@Override
public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
iterator.remove();
}
}
}
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time
object TestCountEvictor extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 使用 CountEvictor 移除器,设置在开窗之前移除数据
.evictor(CountEvictor.of(10L,false))
.reduce(new MinDataReduceFunction)
.print()
environment.execute()
}
3.2 TimeEvictor
- 源码
@Override
public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements); // 获取最大时间
long evictCutoff = currentTime - windowSize; // 确定移除数据时间范围
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
if (record.getTimestamp() <= evictCutoff) { // 如果数据时间小于等于移除时间 则移除数据
iterator.remove();
}
}
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
object TestTimeEvictor extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 使用 TimeEvictor 移除器,设置在开窗之前移除数据
.evictor(TimeEvictor.of(Time.seconds(3),false))
.reduce(new MinDataReduceFunction)
.print()
environment.execute()
}
3.3 DeltaEvictor
- 源码
@Override
public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 使用给定的delta 函数计算结果,如果结果大于等于设置的阈值则移除数据
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}
- 使用
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor
import org.apache.flink.streaming.api.windowing.time.Time
object TestDeltaEvictor extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
private val deltaFunction: DeltaFunction[Obj1] = new DeltaFunction[Obj1] {
override def getDelta(oldDataPoint: Obj1, newDataPoint: Obj1) = {
newDataPoint.time - oldDataPoint.time
}
}
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 使用 DeltaEvictor移除器,设置在开窗之前移除数据
.evictor(DeltaEvictor.of(10,deltaFunction,false))
.reduce(new MinDataReduceFunction)
.print()
environment.execute()
}
4. 延迟数据处理
当 WaterMark 到达以后并且已经出发了窗口计算,那么如果再有前一个窗口的数据到达时,这部分数据时会被丢掉的,那么如何避免这部分数据被丢掉呢?allowedLateness就是为了解决这个问题而产生的。
allowedLateness是针对 WaterMark 触发窗口计算之后,这个窗口还未到达的数据进行等待,之后对这部分数据再次处理的逻辑。
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TestAllowedLateness extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
override def extractTimestamp(element: Obj1) = element.time * 1000
})
private val outputTag = new OutputTag[Obj1]("lastObj1")
// 设置一个窗口时间是 10 秒的窗口
private val stream3: DataStream[Obj1] = stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5)) // 在设置允许数据延迟 5 秒
.sideOutputLateData(outputTag) // 将延迟数据输出到侧输出流中
.reduce(new MinDataReduceFunction)
private val lastObj1: DataStream[Obj1] = stream3.getSideOutput(outputTag)
lastObj1.print("window 迟到数据")
stream3.print("window 计算结果")
environment.execute()
}