1. 窗口函数(window function)介绍 定义完窗口分配器(WindowAssigner)之后,我们就需要指定要在每个窗口上执行的计算。这是Window Fucntion的职责,一旦系统确定窗口已准备好进行处理,就可以处理每个窗口的元素。
窗口函数可以分为 增量聚合函数 和 全窗口函数两类:
增量聚合函数(incremental aggregatement functions)
介绍:每条数据到来就进行计算,保持一个简单的状态,应用场景相对简单,适用于增量聚合的操作,可以更有效地执行,因为Flink 可以在每个窗口元素到达时以递增方式聚合它们
相关函数有 ReduceFunction、AggegateFunction、FoldFunction
全窗口函数(full window function)
全窗口函数会获取一个包含窗口中的所有元素的 Iterable 以及有关元素所属的窗口的其他元信息,这使得它不能够像增量聚合函数一样快速高效的执行,不过可以通过将 ProcessWindowFunction 与 ReduceFunction,AggregateFunction或FoldFunction结合使用来获得窗口元素的增量聚合以及ProcessWindowFunction接收的其他窗口元数据,从而减轻这种情况。
相关函数有 ProcessWindowFunction
2. ReduceFunction ReduceFunction 将窗口输入数据中的两个元素组合在一起以产生相同类型的输出元素。Flink 使用 ReduceFunction来逐步聚合窗口中的元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import com.hnbian.flink.common.Obj1 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment }import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object TestReduceFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.keyBy(0 ) .window(TumblingProcessingTimeWindows .of(Time .seconds(10 ))) .reduce(new MinTimeReduceFunction ) .print("TestReduceFunction" ) environment.execute() } class MinTimeReduceFunction extends ReduceFunction [Obj1 ] { def reduce (r1: Obj1 , r2: Obj1 ):Obj1 = { println(s"r1.time=${r1.time} ,r2.time=${r2.time} " ) if (r1.time > r2.time){ println(s"bigger is r1.time=${r1.time} " ) r1 }else { println(s"bigger is r2.time=${r2.time} " ) r2 } } }
3. AggegateFunction AggregateFunction是一种灵活的聚合函数,具有以下功能:
聚合可以对输入值,中间聚合和结果类型使用不同的类型,以支持各种聚合类型。
支持分布式聚合:可以将不同的中间聚合合并在一起,以实现预聚合/最终聚合优化。
AggregateFunction的中间聚合(进行中的聚合状态)称为累加器。 将值添加到累加器,并通过确定累加器状态获得最终的合计。 这支持聚合函数,其中中间状态需要不同于聚合值和最终结果类型,例如平均值(通常保留计数和总和)。 合并中间聚合(部分聚合)是指合并累加器。
AggregationFunction本身是无状态的。 为了允许单个AggregationFunction实例维护多个聚合(例如每个键一个聚合),无论何时启动新的聚合,AggregationFunction都会创建一个新的累加器。
聚合函数必须可Serializable因为它们是在分布式执行期间在分布式进程之间发送的。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import com.hnbian.flink.window.TumblingTimeWIndow .Record import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import scala.collection.mutable.ArrayBuffer object TestAggregateFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) stream1.print("print" ) val stream2: DataStream [Record ] = stream1.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.keyBy(0 ) .window(TumblingProcessingTimeWindows .of(Time .seconds(10 ))) .aggregate(new AgeAverageAggregateFunction ) .print("TestReduceFunction" ) environment.execute() } class AverageAccumulator { var records = ArrayBuffer [Record ]() var count:Long = 0 L var sum:Long = 0 L } class AgeAverageAggregateFunction extends AggregateFunction [Record , AverageAccumulator , (ArrayBuffer [Record ], Double )] { override def getResult (accumulator: AverageAccumulator ): (ArrayBuffer [Record ], Double )= { val avg = accumulator.sum./(accumulator.count.toDouble) (accumulator.records,avg) } override def merge (a: AverageAccumulator , b: AverageAccumulator ): AverageAccumulator = { a.count += b.count a.sum += b.sum a.records.appendAll(b.records) a } override def createAccumulator (): AverageAccumulator = { new AverageAccumulator () } override def add (value: Record , accumulator: AverageAccumulator ): AverageAccumulator = { accumulator.records.append(value) accumulator.sum += value.age accumulator.count+=1 accumulator } }
4. FoldFunction 官方已经不建议用 Fold 了,使用 aggregate 来代替,这里就不做介绍了。
1 2 3 4 5 6 7 8 9 @deprecated ("use [[aggregate()]] instead" )def fold [R : TypeInformation ](initialValue: R )(function: (R , T ) => R ): DataStream [R ] = { if (function == null ) { throw new NullPointerException ("Fold function must not be null." ) } val cleanFun = clean(function) val folder = new ScalaFoldFunction [T , R ](cleanFun) fold(initialValue, folder) }
5. WindowFunction WindowFunction 能够比前面几种函数更加灵活的计算窗口内的数据,但是这样的代价是牺牲了一部分的性能开销,因为WindowFunction 的窗口化 transformation 之前,必须在内部缓存一个包含窗口中所有 element 的 Iterable,及这些 element 所属的窗口的额外元信息(meta information)。不过我们可以使用 WindowFunction 结合 ReduceFunction 或者 AggregateFunction 在适当的提高性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import java.text.SimpleDateFormat import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object TestWindowFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) stream1 .flatMap(_.split("," )) .map((_,1 )) .keyBy(t =>t._1) .window(TumblingProcessingTimeWindows .of(Time .seconds(5 ))) .apply(new CustomWindowFunction ) .print("TestWindowFunction" ) environment.execute() } class CustomWindowFunction extends WindowFunction [(String ,Int ),String ,String ,TimeWindow ] { val sdf = new SimpleDateFormat ("HH:mm:ss" ) override def apply (key: String , window: TimeWindow , input: Iterable [(String , Int )], out: Collector [String ]): Unit = { println( s"" " |window key:${key}, |开始时间:${sdf.format(window.getStart)}, |结束时间:${sdf.format(window.getEnd)}, |maxTime:${sdf.format(window.maxTimestamp())} |" "" .stripMargin) out.collect(s"${key} ,${input.map(_._2).sum} " ) } }
6. ProcessWindowFunction ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import java.text.SimpleDateFormat import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction , WindowFunction }import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object TestProcessWindowFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2 .keyBy(_.id) .window(TumblingProcessingTimeWindows .of(Time .seconds(5 ))) .process(new CuntomProcessFunction ) .print("TestWindowFunction" ) environment.execute() } class CuntomProcessFunction extends ProcessWindowFunction [Obj1 , String , String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Obj1 ], out: Collector [String ]): Unit = { var count = 0 val sdf = new SimpleDateFormat ("HH:mm:ss" ) println( s"" " |window key:${key}, |开始时间:${sdf.format(context.window.getStart)}, |结束时间:${sdf.format(context.window.getEnd)}, |maxTime:${sdf.format(context.window.maxTimestamp())} |" "" .stripMargin) for (obj <- elements) { println(obj.toString) count += 1 } out.collect(s"Window ${context.window} , count : ${count} " ) } }
7. 全量窗口函数与增量窗口函数结合使用 使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction 或 WindowFunction,然后可以使用 context/window 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。
如下:我们使用 ReduceFunction 来计算 每个窗口的时间最小的元素,然后输出该元素和这个窗口的开始时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import com.hnbian.flink.common.Obj1 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object TestProcessWindowFunctionReduceFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2.keyBy(_.id) .window(TumblingProcessingTimeWindows .of(Time .seconds(10 ))) .reduce(new MinTimeReduceFunction2 ,new CustomProcessFunction2 ) .print("TestProcessWindowFunctionReduceFunction" ) environment.execute() } class CustomProcessFunction2 extends ProcessWindowFunction [Obj1 , (Long , Obj1 ), String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Obj1 ], out: Collector [(Long , Obj1 )]): Unit = { val min = elements.iterator.next out.collect((context.window.getStart, min)) } } class MinTimeReduceFunction2 extends ReduceFunction [Obj1 ] { override def reduce (r1: Obj1 , r2: Obj1 ):Obj1 = { if (r1.time > r2.time){ r1 }else { r2 } } }
8.ProcessAllWindowFunction 与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object TestProcessWindowAllFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) private val allValueStream: AllWindowedStream [Obj1 , TimeWindow ] = stream2 .keyBy(_.id) .timeWindowAll(Time .seconds(5 )) allValueStream .process(new CustomProcessAllWindowFunction ) .print("TestProcessWindowAllFunction" ) environment.execute() } class CustomProcessAllWindowFunction extends ProcessAllWindowFunction [Obj1 , (Long ,String ), TimeWindow ] { override def process (context: Context , elements: Iterable [Obj1 ], out: Collector [(Long , String )]): Unit = { println(s"start:${context.window.getStart} " ) println(s"end:${context.window.getEnd} " ) println(s"maxTimestamp:${context.window.maxTimestamp()} " ) val key = context.window.getStart val value = new ArrayBuffer [String ]() for (obj1<- elements){ value.append(obj1.name) } out.collect((key,value.mkString("," ))) } }