Flink系列 8. 介绍Flink中的窗口类型与相关操作


1. window 的概念

有界流与无界流模型

  • 一般真实的流都是无界的,怎样处理无界的数据?
  • 可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。
  • window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。

2. window 类型介绍与测试准备

Flink 中的窗口操作需要在 KeyBy 之后才能使用,可以使用 .window(WindowAssigner) 来定义窗口,然后数据就会被分发到正确的 window 当中,被接下来的算子处理,同时,flink 也提供了更方便的 .timeWindow.countWindow 方法,用于定义时间窗口和计数窗口。

窗口分配器 WindowAssigner 是所有窗口类型的父类,他负责将每条数据发送到正确的 Window 当中,.window()方法接收的参数就是一个 WindowAssigner。

2.1 窗口分类

Flink 中的窗口大致可以分为:滚动时间窗口、滑动时间窗口、滚动计数窗口、滑动计数窗口、会话窗口与全局窗口。

Flink 中window 相关类继承图

  • 滑动窗口 WindowAssigner
    • SlidingEventTimeWindows:滑动 事件时间 窗口
    • SlidingProcessingTimeWindows:滑动 处理时间 窗口
    • SlidingTimeWindows:滑动时间窗口(事件时间)标记过期
  • 滚动窗口 WindowAssigner
    • TumblingEventTimeWindows:滚动事件时间窗口
    • TumblingProcessingTimeWindows:滚动处理时间窗口
    • TumblingTimeWindows:滚动时间窗口(事件时间)标记过期
  • 全局窗口 GlobalWindows
  • 会话窗口 MergingWindowAssigner
    • ProcessingTimeSessionWindows:处理时间会话窗口
    • EventTimeSessionWindows:事件时间会话窗口
    • DynamicEventTimeSessionWindows:动态事件时间会话窗口
    • DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
  • BaseAlignedWindowAssigner 窗口对齐

2.2 窗口API说明

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- 必须:非 WindowAll 窗口都需要进行 KeyBy 分组之后才能开窗
.window(...) <- 必须: 设置窗口类型,可使用 .timeWindow/.countWindow 替换
[.trigger(...)] <- 可选:设置触发器,不设置则使用默认触发器
[.evictor(...)] <- 可选:设置移除器,不设置则不使用
[.allowedLateness(...)] <- 可选:设置数据再 WaterMark 之后多久之后的数据还允许到达
[.sideOutputLateData(...)] <- 可选:设置侧输出流
.reduce/aggregate/fold/apply() <- 可选:设置函数
[.getSideOutput(...)] <- 可选: 获得侧输出流的数据

2.2 窗口测试准备代码

首先准备测试时所需的代码

  1. 样例类
1
case class Obj1(id:String,name:String,time:Long)
  1. ReduceFunction
1
2
3
4
5
6
7
8
9
10
11
import org.apache.flink.api.common.functions.ReduceFunction

class MinDataReduceFunction extends ReduceFunction[Obj1]{
override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
if(r1.time > r2.time){
r1
}else{
r2
}
}
}

3. 滚动时间窗口

3.1 说明

  • Tumbling Time Window
  • 将数据依据固定的窗口长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
  • 每条数据只会被划分到一个窗口中

3.2 图示

滚动时间窗口示意图

3.3 实现方式

  • .timeWindow():基于 Process Time
  • TumblingProcessingTimeWindows:基于 Process Time
  • TumblingEventTimeWindows:基于 Event Time
  • TumblingTimeWindows:基于 Event Time,标记为Deprecated 推荐使用TumblingEventTimeWindows

3.3.1 .timeWindow() 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.scala._
object TumblingTimeWindow {
def main(args: Array[String]): Unit = {

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

// 取出 10 秒钟之内,每个 classId 年纪最小的用户

/* 准备发送的数据
1,xiaoming,12
2,xiaodong,11
1,xioahong,13
1,xiaogang,14
1,xiaogang,15
2,xiaohuang,15
*/

stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.timeWindow(Time.seconds(10)) // 窗口时间 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")

/**
* minAge:6> (1,12)
* minAge:3> (2,11)
*/
env.execute()
}
}

3.3.2 TumblingProcessingTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingProcessingTimeWindow extends App{

val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new minAgeFunction)
.print()

environment.execute()
}

// 计算窗口内年纪最小的记录
class minAgeFunction extends ReduceFunction[Record]{
override def reduce(r1: Record, r2: Record):Record = {
if(r1.age < r2.age){
r1
}else{
r2
}
}
}

3.3.3 TumblingEventTimeWindows

当使用 TumblingEventTimeWindows 窗口时一定要先设置 EventTime 否则会报类似如下的异常:

1
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingEventTimeWindow extends App{

val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})

stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(6)))
.reduce(new minAgeFunction2)
.print("TumblingEventTimeWindow")

environment.execute()
}
case class Obj1(id:String,name:String,time:Long)

// 计算窗口内时间最小的记录
class minAgeFunction2 extends ReduceFunction[Obj1]{
override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
if(r1.time > r2.time){
r1
}else{
r2
}
}
}

3.3.4 TumblingTimeWindows

TumblingTimeWindows使用的也是 EventTime 时间语义,当使用它时同样需要设置 EventTime 否则会报异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/3 14:11
**/
object TestTumblingTimeWindow extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})

stream2.keyBy(0)
.window(TumblingTimeWindows.of(Time.seconds(6)))
.reduce(new MinDataReduceFunction)
.print("TestTumblingTimeWindow")

environment.execute()
}

4. 滑动时间窗口

4.1 说明

  • Sliding Time window
  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 窗口长度固定,窗口间可以有重叠的部分

4.2 图示

滑动时间窗口示意图

4.3 实现方式

  • .timeWindow():基于 ProcessTime 的滑动窗口
  • SlidingProcessingTimeWindows:基于 ProcessTime 的滑动窗口
  • SlidingEventTimeWindows:基于 EventTime 的滑动窗口
  • SlidingTimeWindows:基于 EventTime 的滑动窗口,标记为Deprecated 推荐使用SlidingEventTimeWindows

4.3.1 .timeWindow() 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.flink.streaming.api.windowing.time.Time

object SlidingTimeWindow {
def main(args: Array[String]): Unit = {

import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

// 窗口时间 10 秒,每次滑动 5 秒 默认使用的是 processing time
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.timeWindow(Time.seconds(10),Time.seconds(5))
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")

env.execute()
}
}

4.3.2 SlidingProcessingTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TestSlidingProcessingTimeWindow extends App {

private val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

private val stream: DataStream[String] = environment.socketTextStream("localhost", 9999)

stream.map(data=>{
val dataArray: Array[String] = data.split(",")
Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
}).keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingProcessingTimeWindow")

environment.execute()
}

4.3.3 SlidingEventTimeWindows

当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TestSlidingEventTimeWindow extends App {

val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})

stream2.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(6),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingEventTimeWindow")

environment.execute()
}

4.3.4 SlidingTimeWindows

使用 EventTime 时间语义,使用前要设置 EventTime 相关内容,否则抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/3 17:55
**/
object TestSlidingTimeWindow extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})

stream2.keyBy(0)
.window(SlidingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingEventTimeWindow")

environment.execute()
}

5. 会话窗口

5.1 介绍

  • Session window
  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是说一定时间没有收到新的数据就会生成一个新的窗口
  • 特点:无时间对其

5.2 图示

会话窗口示意图

5.3 实现方式

  • ProcessingTimeSessionWindows:处理时间会话窗口
  • EventTimeSessionWindows:事件时间会话窗口
  • DynamicEventTimeSessionWindows:动态事件时间会话窗口
  • DynamicProcessingTimeSessionWindows:动态处理时间会话窗口

5.3.1 ProcessingTimeSessionWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows

object TestSessionProcessingTimeSessionWindow {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

stream1.print()
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})

stream2.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.reduce(new MinDataReduceFunction)
.print("ProcessingTimeSessionWindows")

env.execute()
}
}

5.3.2 EventTimeSessionWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time


object TestSessionEventTimeSessionWindows extends App {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})

stream2.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")

env.execute()

}

5.3.3 DynamicProcessingTimeSessionWindows

根据条件动态设置 session window 的间隙

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicProcessingTimeSessionWindows, EventTimeSessionWindows, SessionWindowTimeGapExtractor}

object TestSessionDynamicProcessingTimeSessionWindows extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})

val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
// 动态返回时间间隔,单位毫秒
override def extract(element: Obj1) = {
// 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
if (element.id >= "2"){
3000L
}else{
5000L
}
}
}

stream2.keyBy(0)
.window(
DynamicProcessingTimeSessionWindows.withDynamicGap(sessionGap)
)
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")

env.execute()
}

5.3.4 DynamicEventTimeSessionWindows

当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicEventTimeSessionWindows, SessionWindowTimeGapExtractor}

object TestSessionDynamicEventTimeSessionWindows extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1 = env.socketTextStream("localhost",9999)

val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
def extractAscendingTimestamp(element: Obj1) ={
element.time * 1000
}
})

val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
// 动态返回时间间隔,单位毫秒
override def extract(element: Obj1) = {
// 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
if (element.id >= "2"){
3000L
}else{
5000L
}
}
}

stream2.keyBy(0)
.window(
DynamicEventTimeSessionWindows.withDynamicGap(sessionGap)
)
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")

env.execute()
}

6. 滚动计数窗口

6.1 说明

  • Tumbling count window
  • 将数据依据固定的长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
  • 每条数据只会被划分到一个窗口中

6.2 图示

滚动计数窗口示意图

6.3 实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.flink.streaming.api.scala._

object TumblingCountWIndow {
def main(args: Array[String]): Unit = {

// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

// 取出 2 条记录之内,每个 classId 年纪最小的用户
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.countWindow(2) // 窗口数量 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")
env.execute()
}

}

7. 滑动计数窗口

7.1 说明

  • Sliding count window
  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 窗口长度固定,窗口间有重叠的部分

7.2 图示

滑动计数窗口示意图

7.3 实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

object SlidingCountWindow {
def main(args: Array[String]): Unit = {

import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

// 取出 2 条记录之内,每个 classId 年纪最小的用户
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.countWindow(4,2) // 窗口数量是 4 滑动为 2 , 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")

env.execute()
}
}

8. 全局窗口

8.1 说明

  • global window
  • 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。
  • 如果没有相应触发器,则计算将不会被执行。

8.2 图示

全局窗口示意图

8.3 实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger

object GlobanWindow {

def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

stream1.print()

// 当整个单词累计出现的次数每达到3次或者 3 次的倍数时,则触发计算,计算整个窗口内该单词出现的总数
stream1
.flatMap(str=>{str.split(" ")})
.map(str=>{(str,1)})
.keyBy(0)
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(3)) // 设置触发条件
.sum(1)
.print()

env.execute()
}
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录