Flink系列 8. 介绍Flink中的窗口类型与相关操作


1. window 的概念

有界流与无界流模型

  • 一般真实的流都是无界的,怎样处理无界的数据?
  • 可以把无界的流进行切分,得到有限的数据集进行处理,也就是得到有界流。
  • window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。

2. window 类型介绍与测试准备

Flink 中的窗口操作需要在 KeyBy 之后才能使用,可以使用 .window(WindowAssigner) 来定义窗口,然后数据就会被分发到正确的 window 当中,被接下来的算子处理,同时,flink 也提供了更方便的 .timeWindow.countWindow 方法,用于定义时间窗口和计数窗口。

窗口分配器 WindowAssigner 是所有窗口类型的父类,他负责将每条数据发送到正确的 Window 当中,.window()方法接收的参数就是一个 WindowAssigner。

2.1 窗口分类

Flink 中的窗口大致可以分为:滚动时间窗口、滑动时间窗口、滚动计数窗口、滑动计数窗口、会话窗口与全局窗口。

Flink 中window 相关类继承图

  • 滑动窗口 WindowAssigner
    • SlidingEventTimeWindows:滑动 事件时间 窗口
    • SlidingProcessingTimeWindows:滑动 处理时间 窗口
    • SlidingTimeWindows:滑动时间窗口(事件时间)标记过期
  • 滚动窗口 WindowAssigner
    • TumblingEventTimeWindows:滚动事件时间窗口
    • TumblingProcessingTimeWindows:滚动处理时间窗口
    • TumblingTimeWindows:滚动时间窗口(事件时间)标记过期
  • 全局窗口 GlobalWindows
  • 会话窗口 MergingWindowAssigner
    • ProcessingTimeSessionWindows:处理时间会话窗口
    • EventTimeSessionWindows:事件时间会话窗口
    • DynamicEventTimeSessionWindows:动态事件时间会话窗口
    • DynamicProcessingTimeSessionWindows:动态处理时间会话窗口
  • BaseAlignedWindowAssigner 窗口对齐

2.2 窗口API说明

stream
       .keyBy(...)               <-  必须:非 WindowAll 窗口都需要进行 KeyBy 分组之后才能开窗
       .window(...)              <-  必须: 设置窗口类型,可使用 .timeWindow/.countWindow 替换
      [.trigger(...)]            <-  可选:设置触发器,不设置则使用默认触发器
      [.evictor(...)]            <-  可选:设置移除器,不设置则不使用
      [.allowedLateness(...)]    <-  可选:设置数据再 WaterMark 之后多久之后的数据还允许到达
      [.sideOutputLateData(...)] <-  可选:设置侧输出流
       .reduce/aggregate/fold/apply()      <-  可选:设置函数
      [.getSideOutput(...)]      <-  可选: 获得侧输出流的数据

2.2 窗口测试准备代码

首先准备测试时所需的代码

  1. 样例类
case class Obj1(id:String,name:String,time:Long)
  1. ReduceFunction
import org.apache.flink.api.common.functions.ReduceFunction

class MinDataReduceFunction extends ReduceFunction[Obj1]{
  override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
    if(r1.time > r2.time){
      r1
    }else{
      r2
    }
  }
}

3. 滚动时间窗口

3.1 说明

  • Tumbling Time Window
  • 将数据依据固定的窗口长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
  • 每条数据只会被划分到一个窗口中

3.2 图示

滚动时间窗口示意图

3.3 实现方式

  • .timeWindow():基于 Process Time
  • TumblingProcessingTimeWindows:基于 Process Time
  • TumblingEventTimeWindows:基于 Event Time
  • TumblingTimeWindows:基于 Event Time,标记为Deprecated 推荐使用TumblingEventTimeWindows

3.3.1 .timeWindow() 方式

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.scala._
object TumblingTimeWindow {
  def main(args: Array[String]): Unit = {

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    val stream2: DataStream[Record] = stream1.map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

    // 取出 10 秒钟之内,每个 classId 年纪最小的用户

     /* 准备发送的数据
      1,xiaoming,12
      2,xiaodong,11
      1,xioahong,13
      1,xiaogang,14
      1,xiaogang,15
      2,xiaohuang,15
      */

    stream2.map(record=>{
      (record.classId,record.age)
    }).keyBy(_._1)
      .timeWindow(Time.seconds(10)) //  窗口时间 默认使用的是 processing time
      .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
      .print("minAge")

    /**
      * minAge:6> (1,12)
      * minAge:3> (2,11)
      */
    env.execute()
  }
}

3.3.2 TumblingProcessingTimeWindows

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingProcessingTimeWindow extends App{

  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Record] = stream1.map(data => {
    val arr = data.split(",")
    Record(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 10 秒的窗口
  stream2.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    .reduce(new minAgeFunction)
    .print()

  environment.execute()
}

// 计算窗口内年纪最小的记录
class minAgeFunction extends ReduceFunction[Record]{
  override def reduce(r1: Record, r2: Record):Record = {
    if(r1.age < r2.age){
      r1
    }else{
      r2
    }
  }
}

3.3.3 TumblingEventTimeWindows

当使用 TumblingEventTimeWindows 窗口时一定要先设置 EventTime 否则会报类似如下的异常:

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
  • 测试代码
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingEventTimeWindow extends App{

  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
  stream1.print("stream1")
  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    //println(arr)
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(6)))
    .reduce(new minAgeFunction2)
    .print("TumblingEventTimeWindow")

  environment.execute()
}
case class Obj1(id:String,name:String,time:Long)

// 计算窗口内时间最小的记录
class minAgeFunction2 extends ReduceFunction[Obj1]{
  override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
    if(r1.time > r2.time){
      r1
    }else{
      r2
    }
  }
}

3.3.4 TumblingTimeWindows

TumblingTimeWindows使用的也是 EventTime 时间语义,当使用它时同样需要设置 EventTime 否则会报异常

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Author haonan.bian
  * @Description //TODO
  * @Date 2021/1/3 14:11 
  **/
object TestTumblingTimeWindow extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
  stream1.print("stream1")
  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(TumblingTimeWindows.of(Time.seconds(6)))
    .reduce(new MinDataReduceFunction)
    .print("TestTumblingTimeWindow")

  environment.execute()
}

4. 滑动时间窗口

4.1 说明

  • Sliding Time window
  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 窗口长度固定,窗口间可以有重叠的部分

4.2 图示

滑动时间窗口示意图

4.3 实现方式

  • .timeWindow():基于 ProcessTime 的滑动窗口
  • SlidingProcessingTimeWindows:基于 ProcessTime 的滑动窗口
  • SlidingEventTimeWindows:基于 EventTime 的滑动窗口
  • SlidingTimeWindows:基于 EventTime 的滑动窗口,标记为Deprecated 推荐使用SlidingEventTimeWindows

4.3.1 .timeWindow() 方式

import org.apache.flink.streaming.api.windowing.time.Time

object SlidingTimeWindow {
  def main(args: Array[String]): Unit = {

    import org.apache.flink.streaming.api.scala._
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    val stream2: DataStream[Record] = stream1.map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

    //  窗口时间 10 秒,每次滑动 5 秒 默认使用的是 processing time
    stream2.map(record=>{
      (record.classId,record.age)
    }).keyBy(_._1)
      .timeWindow(Time.seconds(10),Time.seconds(5))
      .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
      .print("minAge")

    env.execute()
  }
}

4.3.2 SlidingProcessingTimeWindows

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TestSlidingProcessingTimeWindow extends App {

  private val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  private val stream: DataStream[String] = environment.socketTextStream("localhost", 9999)

  stream.map(data=>{
    val dataArray: Array[String] = data.split(",")
    Obj1(dataArray(0),dataArray(1),dataArray(2).toLong)
  }).keyBy(0)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
    .reduce(new MinDataReduceFunction)
    .print("TestSlidingProcessingTimeWindow")

  environment.execute()
}

4.3.3 SlidingEventTimeWindows

当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object TestSlidingEventTimeWindow extends App {

  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
  stream1.print("stream1")
  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    //println(arr)
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(SlidingEventTimeWindows.of(Time.seconds(6),Time.seconds(3)))
    .reduce(new MinDataReduceFunction)
    .print("TestSlidingEventTimeWindow")

  environment.execute()
}

4.3.4 SlidingTimeWindows

使用 EventTime 时间语义,使用前要设置 EventTime 相关内容,否则抛出异常

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Author haonan.bian
  * @Description //TODO
  * @Date 2021/1/3 17:55 
  **/
object TestSlidingTimeWindow extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    //println(arr)
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(SlidingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
    .reduce(new MinDataReduceFunction)
    .print("TestSlidingEventTimeWindow")

  environment.execute()
}

5. 会话窗口

5.1 介绍

  • Session window
  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是说一定时间没有收到新的数据就会生成一个新的窗口
  • 特点:无时间对其

5.2 图示

会话窗口示意图

5.3 实现方式

  • ProcessingTimeSessionWindows:处理时间会话窗口
  • EventTimeSessionWindows:事件时间会话窗口
  • DynamicEventTimeSessionWindows:动态事件时间会话窗口
  • DynamicProcessingTimeSessionWindows:动态处理时间会话窗口

5.3.1 ProcessingTimeSessionWindows

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows

object TestSessionProcessingTimeSessionWindow {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    stream1.print()
    val stream2: DataStream[Obj1] = stream1.map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    })

    stream2.keyBy(0)
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
      .reduce(new MinDataReduceFunction)
      .print("ProcessingTimeSessionWindows")

    env.execute()
  }
}

5.3.2 EventTimeSessionWindows

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time


object TestSessionEventTimeSessionWindows extends App {
  // 创建执行环境
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
    override def extractAscendingTimestamp(element: Obj1) = {
      // 提取当前的 EventTime,会设置当前的 EventTime 为 WaterMark
      element.time * 1000
    }
  })

  stream2.keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
    .reduce(new MinDataReduceFunction)
    .print("EventTimeSessionWindows")

  env.execute()

}

5.3.3 DynamicProcessingTimeSessionWindows

根据条件动态设置 session window 的间隙

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicProcessingTimeSessionWindows, EventTimeSessionWindows, SessionWindowTimeGapExtractor}

object TestSessionDynamicProcessingTimeSessionWindows extends App {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1 = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toLong)
  })

  val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
    // 动态返回时间间隔,单位毫秒
    override def extract(element: Obj1) = {
      // 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
      if (element.id >= "2"){
        3000L
      }else{
        5000L
      }
    }
  }

  stream2.keyBy(0)
    .window(
      DynamicProcessingTimeSessionWindows.withDynamicGap(sessionGap)
    )
    .reduce(new MinDataReduceFunction)
    .print("EventTimeSessionWindows")

  env.execute()
}

5.3.4 DynamicEventTimeSessionWindows

当使用 SlidingEventTimeWindows 窗口时一定要先设置 EventTime 否则会抛出异常

import com.hnbian.flink.common.{MinDataReduceFunction, Obj1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{DynamicEventTimeSessionWindows, SessionWindowTimeGapExtractor}

object TestSessionDynamicEventTimeSessionWindows extends App {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream1 = env.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Obj1] {
     def extractAscendingTimestamp(element: Obj1) ={
      element.time * 1000
    }
  })

  val sessionGap = new SessionWindowTimeGapExtractor[Obj1](){
    // 动态返回时间间隔,单位毫秒
    override def extract(element: Obj1) = {
      // 设置 当 ID 大于等于 2 时 session gap 时间为 3 秒 ,其余为 5 秒
      if (element.id >= "2"){
        3000L
      }else{
        5000L
      }
    }
  }

  stream2.keyBy(0)
    .window(
      DynamicEventTimeSessionWindows.withDynamicGap(sessionGap)
    )
    .reduce(new MinDataReduceFunction)
    .print("EventTimeSessionWindows")

  env.execute()
}

6. 滚动计数窗口

6.1 说明

  • Tumbling count window
  • 将数据依据固定的长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
  • 每条数据只会被划分到一个窗口中

6.2 图示

滚动计数窗口示意图

6.3 实现方式

import org.apache.flink.streaming.api.scala._

object TumblingCountWIndow {
  def main(args: Array[String]): Unit = {

    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    val stream2: DataStream[Record] = stream1.map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

    // 取出 2 条记录之内,每个 classId 年纪最小的用户
    stream2.map(record=>{
      (record.classId,record.age)
    }).keyBy(_._1)
      .countWindow(2) //  窗口数量 默认使用的是 processing time
      .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
      .print("minAge")
    env.execute()
  }

}

7. 滑动计数窗口

7.1 说明

  • Sliding count window
  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 窗口长度固定,窗口间有重叠的部分

7.2 图示

滑动计数窗口示意图

7.3 实现方式


object SlidingCountWindow {
  def main(args: Array[String]): Unit = {

    import org.apache.flink.streaming.api.scala._
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    val stream2: DataStream[Record] = stream1.map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

    // 取出 2 条记录之内,每个 classId 年纪最小的用户
    stream2.map(record=>{
      (record.classId,record.age)
    }).keyBy(_._1)
      .countWindow(4,2) //  窗口数量是 4 滑动为 2 , 默认使用的是 processing time
      .reduce((r1,r2)=>{(r1._1,r1._2.min(r2._2))})
      .print("minAge")

    env.execute()
  }
}

8. 全局窗口

8.1 说明

  • global window
  • 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。
  • 如果没有相应触发器,则计算将不会被执行。

8.2 图示

全局窗口示意图

8.3 实现方式

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger

object GlobanWindow {

  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

    stream1.print()

    // 当整个单词累计出现的次数每达到3次或者 3 次的倍数时,则触发计算,计算整个窗口内该单词出现的总数
    stream1
      .flatMap(str=>{str.split(" ")})
      .map(str=>{(str,1)})
      .keyBy(0)
      .windowAll(GlobalWindows.create())
      .trigger(CountTrigger.of(3)) // 设置触发条件
      .sum(1)
      .print()

    env.execute()
  }
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 9. 介绍 Flink 窗口触发器、移除器和延迟数据等 Flink系列 9. 介绍 Flink 窗口触发器、移除器和延迟数据等
1. 触发器1.1 触发器介绍 当数据到达某种情况,符合了某种条件去关闭窗口,这种到达条件就是触发。 设置到达某种条件并触发窗口执行就是触发器。 触发器的作用就是控制什么时候进行数据的聚合计算 flink 中的窗口计算依赖于触发器。 每种窗
2020-07-25
下一篇 
Flink系列 7. 介绍Flink中的时间语义与WaterMark Flink系列 7. 介绍Flink中的时间语义与WaterMark
1. 时间语义和 water mark1.1 时间语义介绍 EventTime (事件时间) : 数据产生时的时间,一般都包含在数据内,由用户指定数据中的时间戳。 Ingestion Time (摄取时间) : 数据进入 Flink 的
2020-07-11
  目录