1. window 的概念
一般真实的流都是无界的,怎样处理无界的数据?
可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。
window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。
2. window 类型介绍与测试准备 Flink 中的窗口操作需要在 KeyBy 之后才能使用,可以使用 .window(WindowAssigner) 来定义窗口,然后数据就会被分发到正确的 window 当中,被接下来的算子处理,同时,flink 也提供了更方便的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。
窗口分配器 WindowAssigner 是所有窗口类型的父类,他负责将每条数据发送到正确的 Window 当中,.window()方法接收的参数就是一个 WindowAssigner。
2.1 窗口分类 Flink 中的窗口大致可以分为:滚动时间窗口、滑动时间窗口、滚动计数窗口、滑动计数窗口、会话窗口与全局窗口。
滑动窗口 WindowAssigner
SlidingEventTimeWindows:滑动 事件时间 窗口
SlidingProcessingTimeWindows:滑动 处理时间 窗口
SlidingTimeWindows:滑动时间窗口(事件时间)标记过期
滚动窗口 WindowAssigner
TumblingEventTimeWindows:滚动事件时间窗口
TumblingProcessingTimeWindows:滚动处理时间窗口
TumblingTimeWindows:滚动时间窗口(事件时间)标记过期
全局窗口 GlobalWindows
会话窗口 MergingWindowAssigner
ProcessingTimeSessionWindows:处理时间会话窗口
EventTimeSessionWindows:事件时间会话窗口
DynamicEventTimeSessionWindows:动态事件时间会话窗口
DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
BaseAlignedWindowAssigner 窗口对齐
2.2 窗口API说明
1 2 3 4 5 6 7 8 9 stream .keyBy(...) <- 必须:非 WindowAll 窗口都需要进行 KeyBy 分组之后才能开窗 .window(...) <- 必须: 设置窗口类型,可使用 .timeWindow/.countWindow 替换 [.trigger(...)] <- 可选:设置触发器,不设置则使用默认触发器 [.evictor(...)] <- 可选:设置移除器,不设置则不使用 [.allowedLateness(...)] <- 可选:设置数据再 WaterMark 之后多久之后的数据还允许到达 [.sideOutputLateData(...)] <- 可选:设置侧输出流 .reduce/aggregate/fold/apply() <- 可选:设置函数 [.getSideOutput(...)] <- 可选: 获得侧输出流的数据
2.2 窗口测试准备代码 首先准备测试时所需的代码
样例类
1 case class Obj1 (id:String ,name:String ,time:Long )
ReduceFunction
1 2 3 4 5 6 7 8 9 10 11 import org.apache.flink.api.common.functions.ReduceFunction class MinDataReduceFunction extends ReduceFunction [Obj1 ] { override def reduce (r1: Obj1 , r2: Obj1 ):Obj1 = { if (r1.time > r2.time){ r1 }else { r2 } } }
3. 滚动时间窗口 3.1 说明
Tumbling Time Window
将数据依据固定的窗口长度对数据进行切分
时间对齐,窗口长度固定,没有重叠
每条数据只会被划分到一个窗口中
3.2 图示
3.3 实现方式
.timeWindow():基于 Process Time
TumblingProcessingTimeWindows:基于 Process Time
TumblingEventTimeWindows:基于 Event Time
TumblingTimeWindows:基于 Event Time,标记为Deprecated 推荐使用TumblingEventTimeWindows
3.3.1 .timeWindow() 方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._object TumblingTimeWindow { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.map(record=>{ (record.classId,record.age) }).keyBy(_._1) .timeWindow(Time .seconds(10 )) .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))}) .print("minAge" ) env.execute() } }
3.3.2 TumblingProcessingTimeWindows 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TumblingProcessingTimeWindow extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.keyBy(0 ).window(TumblingProcessingTimeWindows .of(Time .seconds(10 ))) .reduce(new minAgeFunction) .print() environment.execute() } class minAgeFunction extends ReduceFunction [Record ] { override def reduce (r1: Record , r2: Record ):Record = { if (r1.age < r2.age){ r1 }else { r2 } } }
3.3.3 TumblingEventTimeWindows 当使用 TumblingEventTimeWindows 窗口时一定要先设置 EventTime 否则会报类似如下的异常:
1 Caused by: java.lang.RuntimeException : Record has Long .MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime ', or did you forget to call 'DataStream .assignTimestampsAndWatermarks(...)'?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor , BoundedOutOfOrdernessTimestampExtractor }import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows , TumblingProcessingTimeWindows }import org.apache.flink.streaming.api.windowing.time.Time object TumblingEventTimeWindow extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment environment.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) stream1.print("stream1" ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { override def extractAscendingTimestamp (element: Obj1 ) = { element.time * 1000 } }) stream2.keyBy(0 ) .window(TumblingEventTimeWindows .of(Time .seconds(6 ))) .reduce(new minAgeFunction2) .print("TumblingEventTimeWindow" ) environment.execute() } case class Obj1 (id:String ,name:String ,time:Long )class minAgeFunction2 extends ReduceFunction [Obj1 ] { override def reduce (r1: Obj1 , r2: Obj1 ):Obj1 = { if (r1.time > r2.time){ r1 }else { r2 } } }
3.3.4 TumblingTimeWindows TumblingTimeWindows使用的也是 EventTime 时间语义,当使用它时同样需要设置 EventTime 否则会报异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TestTumblingTimeWindow extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment environment.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) stream1.print("stream1" ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { override def extractAscendingTimestamp (element: Obj1 ) = { element.time * 1000 } }) stream2.keyBy(0 ) .window(TumblingTimeWindows .of(Time .seconds(6 ))) .reduce(new MinDataReduceFunction ) .print("TestTumblingTimeWindow" ) environment.execute() }
4. 滑动时间窗口 4.1 说明
Sliding Time window
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
窗口长度固定,窗口间可以有重叠的部分
4.2 图示
4.3 实现方式
.timeWindow():基于 ProcessTime 的滑动窗口
SlidingProcessingTimeWindows:基于 ProcessTime 的滑动窗口
SlidingEventTimeWindows:基于 EventTime 的滑动窗口
SlidingTimeWindows:基于 EventTime 的滑动窗口,标记为Deprecated 推荐使用SlidingEventTimeWindows
4.3.1 .timeWindow() 方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import org.apache.flink.streaming.api.windowing.time.Time object SlidingTimeWindow { def main (args: Array [String ]): Unit = { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.map(record=>{ (record.classId,record.age) }).keyBy(_._1) .timeWindow(Time .seconds(10 ),Time .seconds(5 )) .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))}) .print("minAge" ) env.execute() } }
4.3.2 SlidingProcessingTimeWindows 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TestSlidingProcessingTimeWindow extends App { private val environment: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment private val stream: DataStream [String ] = environment.socketTextStream("localhost" , 9999 ) stream.map(data=>{ val dataArray: Array [String ] = data.split("," ) Obj1 (dataArray(0 ),dataArray(1 ),dataArray(2 ).toLong) }).keyBy(0 ) .window(SlidingProcessingTimeWindows .of(Time .seconds(5 ),Time .seconds(3 ))) .reduce(new MinDataReduceFunction ) .print("TestSlidingProcessingTimeWindow" ) environment.execute() }
4.3.3 SlidingEventTimeWindows 当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TestSlidingEventTimeWindow extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment environment.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) stream1.print("stream1" ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { override def extractAscendingTimestamp (element: Obj1 ) = { element.time * 1000 } }) stream2.keyBy(0 ) .window(SlidingEventTimeWindows .of(Time .seconds(6 ),Time .seconds(3 ))) .reduce(new MinDataReduceFunction ) .print("TestSlidingEventTimeWindow" ) environment.execute() }
4.3.4 SlidingTimeWindows 使用 EventTime 时间语义,使用前要设置 EventTime 相关内容,否则抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api._import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment }import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TestSlidingTimeWindow extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment environment.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { override def extractAscendingTimestamp (element: Obj1 ) = { element.time * 1000 } }) stream2.keyBy(0 ) .window(SlidingTimeWindows .of(Time .seconds(6 ),Time .seconds(3 ))) .reduce(new MinDataReduceFunction ) .print("TestSlidingEventTimeWindow" ) environment.execute() }
5. 会话窗口 5.1 介绍
Session window
由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是说一定时间没有收到新的数据就会生成一个新的窗口
特点:无时间对其
5.2 图示
5.3 实现方式
ProcessingTimeSessionWindows:处理时间会话窗口
EventTimeSessionWindows:事件时间会话窗口
DynamicEventTimeSessionWindows:动态事件时间会话窗口
DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
5.3.1 ProcessingTimeSessionWindows 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows object TestSessionProcessingTimeSessionWindow { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) stream1.print() val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) stream2.keyBy(0 ) .window(ProcessingTimeSessionWindows .withGap(Time .seconds(5 ))) .reduce(new MinDataReduceFunction ) .print("ProcessingTimeSessionWindows" ) env.execute() } }
5.3.2 EventTimeSessionWindows 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment }import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows , ProcessingTimeSessionWindows }import org.apache.flink.streaming.api.windowing.time.Time object TestSessionEventTimeSessionWindows extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { override def extractAscendingTimestamp (element: Obj1 ) = { element.time * 1000 } }) stream2.keyBy(0 ) .window(EventTimeSessionWindows .withGap(Time .seconds(5 ))) .reduce(new MinDataReduceFunction ) .print("EventTimeSessionWindows" ) env.execute() }
5.3.3 DynamicProcessingTimeSessionWindows 根据条件动态设置 session window 的间隙
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{DynamicProcessingTimeSessionWindows , EventTimeSessionWindows , SessionWindowTimeGapExtractor }object TestSessionDynamicProcessingTimeSessionWindows extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1 = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) val sessionGap = new SessionWindowTimeGapExtractor [Obj1 ](){ override def extract (element: Obj1 ) = { if (element.id >= "2" ){ 3000 L }else { 5000 L } } } stream2.keyBy(0 ) .window( DynamicProcessingTimeSessionWindows .withDynamicGap(sessionGap) ) .reduce(new MinDataReduceFunction ) .print("EventTimeSessionWindows" ) env.execute() }
5.3.4 DynamicEventTimeSessionWindows 当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import com.hnbian.flink.common.{MinDataReduceFunction , Obj1 }import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{DynamicEventTimeSessionWindows , SessionWindowTimeGapExtractor }object TestSessionDynamicEventTimeSessionWindows extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1 = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor [Obj1 ] { def extractAscendingTimestamp (element: Obj1 ) ={ element.time * 1000 } }) val sessionGap = new SessionWindowTimeGapExtractor [Obj1 ](){ override def extract (element: Obj1 ) = { if (element.id >= "2" ){ 3000 L }else { 5000 L } } } stream2.keyBy(0 ) .window( DynamicEventTimeSessionWindows .withDynamicGap(sessionGap) ) .reduce(new MinDataReduceFunction ) .print("EventTimeSessionWindows" ) env.execute() }
6. 滚动计数窗口 6.1 说明
Tumbling count window
将数据依据固定的长度对数据进行切分
时间对齐,窗口长度固定,没有重叠
每条数据只会被划分到一个窗口中
6.2 图示
6.3 实现方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.flink.streaming.api.scala._object TumblingCountWIndow { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.map(record=>{ (record.classId,record.age) }).keyBy(_._1) .countWindow(2 ) .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))}) .print("minAge" ) env.execute() } }
7. 滑动计数窗口 7.1 说明
Sliding count window
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
窗口长度固定,窗口间有重叠的部分
7.2 图示
7.3 实现方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 object SlidingCountWindow { def main (args: Array [String ]): Unit = { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.map(record=>{ (record.classId,record.age) }).keyBy(_._1) .countWindow(4 ,2 ) .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))}) .print("minAge" ) env.execute() } }
8. 全局窗口 8.1 说明
global window
全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。
如果没有相应触发器,则计算将不会被执行。
8.2 图示
8.3 实现方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.CountTrigger object GlobanWindow { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) stream1.print() stream1 .flatMap(str=>{str.split(" " )}) .map(str=>{(str,1 )}) .keyBy(0 ) .windowAll(GlobalWindows .create()) .trigger(CountTrigger .of(3 )) .sum(1 ) .print() env.execute() } }