1. window 的概念
- 一般真实的流都是无界的,怎样处理无界的数据?
- 可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。
- window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。
2. window 类型介绍与测试准备
Flink 中的窗口操作需要在 KeyBy 之后才能使用,可以使用 .window(WindowAssigner) 来定义窗口,然后数据就会被分发到正确的 window 当中,被接下来的算子处理,同时,flink 也提供了更方便的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。
窗口分配器 WindowAssigner 是所有窗口类型的父类,他负责将每条数据发送到正确的 Window 当中,.window()方法接收的参数就是一个 WindowAssigner。
2.1 窗口分类
Flink 中的窗口大致可以分为:滚动时间窗口、滑动时间窗口、滚动计数窗口、滑动计数窗口、会话窗口与全局窗口。
- WindowAssigner
- SlidingEventTimeWindows:滑动事件时间窗口
- SlidingProcessingTimeWindows:滑动处理时间窗口
- SlidingTimeWindows:滑动时间窗口(事件时间)
- TumblingEventTimeWindows:滚动事件时间窗口
- TumblingProcessingTimeWindows:滚动处理时间窗口
- TumblingTimeWindows:滚动时间窗口(事件时间)
- GlobalWindows:全局窗口
- MergingWindowAssigner
- ProcessingTimeSessionWindows:处理时间会话窗口
- EventTimeSessionWindows:事件时间会话窗口
- DynamicEventTimeSessionWindows:动态事件时间会话窗口
- DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
- BaseAlignedWindowAssigner
2.2 窗口API说明
stream
.keyBy(...) <- 必须:非 WindowAll 窗口都需要进行 KeyBy 分组之后才能开窗
.window(...) <- 必须: 设置窗口类型,可使用 .timeWindow/.countWindow 替换
[.trigger(...)] <- 可选:设置触发器,不设置则使用默认触发器
[.evictor(...)] <- 可选:设置移除器,不设置则不使用
[.allowedLateness(...)] <- 可选:设置数据再 WaterMark 之后多久之后的数据还允许到达
[.sideOutputLateData(...)] <- 可选:设置侧输出流
.reduce/aggregate/fold/apply() <- 可选:设置函数
[.getSideOutput(...)] <- 可选: 获得侧输出流的数据
2.2 窗口测试准备代码
首先准备测试时所需的代码
- 样例类
case class Obj1(id:String,name:String,time:Long)
- ReduceFunction
import org.apache.flink.api.common.functions.ReduceFunction
class MinDataReduceFunction extends ReduceFunction[Obj1]{
override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
if(r1.time > r2.time){
r1
}else{
r2
}
}
}
3. 滚动时间窗口
3.1 说明
- Tumbling Time Window
- 将数据依据固定的窗口长度对数据进行切分
- 时间对齐,窗口长度固定,没有重叠
- 每条数据只会被划分到一个窗口中
3.2 图示
3.3 实现方式
- .timeWindow():基于 Process Time
- TumblingProcessingTimeWindows:基于 Process Time
- TumblingEventTimeWindows:基于 Event Time
- TumblingTimeWindows:基于 Event Time,标记为Deprecated 推荐使用TumblingEventTimeWindows
3.3.1 .timeWindow() 方式
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object TumblingTimeWindow {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 取出 10 秒钟之内,每个 classId 年纪最小的用户
/* 准备发送的数据
1,xiaoming,12
2,xiaodong,11
1,xioahong,13
1,xiaogang,14
1,xiaogang,15
2,xiaohuang,15
*/
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.timeWindow(Time.seconds(10)) // 窗口时间 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")
/**
* minAge:6> (1,12)
* minAge:3> (2,11)
*/
env.execute()
}
}
3.3.2 TumblingProcessingTimeWindows
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TumblingProcessingTimeWindow extends App{
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new minAgeFunction)
.print()
environment.execute()
}
// 计算窗口内年纪最小的记录
class minAgeFunction extends ReduceFunction[Record]{
override def reduce(r1: Record, r2: Record):Record = {
if(r1.age < r2.age){
r1
}else{
r2
}
}
}
3.3.3 TumblingEventTimeWindows
当使用 TumblingEventTimeWindows 窗口时一定要先设置 EventTime 否则会报类似如下的异常:
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
- 测试代码
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object TumblingEventTimeWindow extends App{
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(6)))
.reduce(new minAgeFunction2)
.print("TumblingEventTimeWindow")
environment.execute()
}
case class Obj1(id:String,name:String,time:Long)
// 计算窗口内时间最小的记录
class minAgeFunction2 extends ReduceFunction[Obj1]{
override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
if(r1.time > r2.time){
r1
}else{
r2
}
}
}
3.3.4 TumblingTimeWindows
TumblingTimeWindows使用的也是 EventTime 时间语义,当使用它时同样需要设置 EventTime 否则会报异常
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/3 14:11
**/
object TestTumblingTimeWindow extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(TumblingTimeWindows.of(Time.seconds(6)))
.reduce(new MinDataReduceFunction)
.print("TestTumblingTimeWindow")
environment.execute()
}
4. 滑动时间窗口
4.1 说明
- Sliding Time window
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 窗口长度固定,窗口间可以有重叠的部分
4.2 图示
4.3 实现方式
- .timeWindow():基于 ProcessTime 的滑动窗口
- SlidingProcessingTimeWindows:基于 ProcessTime 的滑动窗口
- SlidingEventTimeWindows:基于 EventTime 的滑动窗口
- SlidingTimeWindows:基于 EventTime 的滑动窗口,标记为Deprecated 推荐使用SlidingEventTimeWindows
4.3.1 .timeWindow() 方式
import org.apache.flink.streaming.api.windowing.time.Time
object SlidingTimeWindow {
def main(args: Array[String]): Unit = {
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 窗口时间 10 秒,每次滑动 5 秒 默认使用的是 processing time
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.timeWindow(Time.seconds(10),Time.seconds(5))
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")
env.execute()
}
}
4.3.2 SlidingProcessingTimeWindows
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TestSlidingProcessingTimeWindow extends App {
private val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
private val stream: DataStream[String] = environment.socketTextStream("localhost", 9999)
stream.map(data=>{
val dataArray: Array[String] = data.split(",")
Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
}).keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingProcessingTimeWindow")
environment.execute()
}
4.3.3 SlidingEventTimeWindows
当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TestSlidingEventTimeWindow extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("stream1")
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(6),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingEventTimeWindow")
environment.execute()
}
4.3.4 SlidingTimeWindows
使用 EventTime 时间语义,使用前要设置 EventTime 相关内容,否则抛出异常
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/3 17:55
**/
object TestSlidingTimeWindow extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
//println(arr)
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(SlidingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
.reduce(new MinDataReduceFunction)
.print("TestSlidingEventTimeWindow")
environment.execute()
}
5. 会话窗口
5.1 介绍
- Session window
- 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是说一定时间没有收到新的数据就会生成一个新的窗口
- 特点:无时间对其
5.2 图示
5.3 实现方式
- ProcessingTimeSessionWindows:处理时间会话窗口
- EventTimeSessionWindows:事件时间会话窗口
- DynamicEventTimeSessionWindows:动态事件时间会话窗口
- DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
5.3.1 ProcessingTimeSessionWindows
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
object TestSessionProcessingTimeSessionWindow {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
stream1.print()
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
stream2.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.reduce(new MinDataReduceFunction)
.print("ProcessingTimeSessionWindows")
env.execute()
}
}
5.3.2 EventTimeSessionWindows
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object TestSessionEventTimeSessionWindows extends App {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
override def extractAscendingTimestamp(element: Obj1) = {
// 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
element.time * 1000
}
})
stream2.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")
env.execute()
}
5.3.3 DynamicProcessingTimeSessionWindows
根据条件动态设置 session window 的间隙
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicProcessingTimeSessionWindows, EventTimeSessionWindows, SessionWindowTimeGapExtractor}
object TestSessionDynamicProcessingTimeSessionWindows extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
// 动态返回时间间隔,单位毫秒
override def extract(element: Obj1) = {
// 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
if (element.id >= "2"){
3000L
}else{
5000L
}
}
}
stream2.keyBy(0)
.window(
DynamicProcessingTimeSessionWindows.withDynamicGap(sessionGap)
)
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")
env.execute()
}
5.3.4 DynamicEventTimeSessionWindows
当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常
import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicEventTimeSessionWindows, SessionWindowTimeGapExtractor}
object TestSessionDynamicEventTimeSessionWindows extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1 = env.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
def extractAscendingTimestamp(element: Obj1) ={
element.time * 1000
}
})
val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
// 动态返回时间间隔,单位毫秒
override def extract(element: Obj1) = {
// 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
if (element.id >= "2"){
3000L
}else{
5000L
}
}
}
stream2.keyBy(0)
.window(
DynamicEventTimeSessionWindows.withDynamicGap(sessionGap)
)
.reduce(new MinDataReduceFunction)
.print("EventTimeSessionWindows")
env.execute()
}
6. 滚动计数窗口
6.1 说明
- Tumbling count window
- 将数据依据固定的长度对数据进行切分
- 时间对齐,窗口长度固定,没有重叠
- 每条数据只会被划分到一个窗口中
6.2 图示
6.3 实现方式
import org.apache.flink.streaming.api.scala._
object TumblingCountWIndow {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 取出 2 条记录之内,每个 classId 年纪最小的用户
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.countWindow(2) // 窗口数量 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")
env.execute()
}
}
7. 滑动计数窗口
7.1 说明
- Sliding count window
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 窗口长度固定,窗口间有重叠的部分
7.2 图示
7.3 实现方式
object SlidingCountWindow {
def main(args: Array[String]): Unit = {
import org.apache.flink.streaming.api.scala._
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 取出 2 条记录之内,每个 classId 年纪最小的用户
stream2.map(record=>{
(record.classId,record.age)
}).keyBy(_._1)
.countWindow(4,2) // 窗口数量是 4 滑动为 2 , 默认使用的是 processing time
.reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
.print("minAge")
env.execute()
}
}
8. 全局窗口
8.1 说明
- global window
- 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。
- 如果没有相应触发器,则计算将不会被执行。
8.2 图示
8.3 实现方式
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
object GlobanWindow {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
stream1.print()
// 当整个单词累计出现的次数每达到3次或者 3 次的倍数时,则触发计算,计算整个窗口内该单词出现的总数
stream1
.flatMap(str=>{str.split(" ")})
.map(str=>{(str,1)})
.keyBy(0)
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(3)) // 设置触发条件
.sum(1)
.print()
env.execute()
}
}