1. 窗口函数(window function)介绍
定义完窗口分配器(WindowAssigner)之后,我们就需要指定要在每个窗口上执行的计算。这是Window Fucntion的职责,一旦系统确定窗口已准备好进行处理,就可以处理每个窗口的元素。
窗口函数可以分为 增量聚合函数 和全窗口函数两类:
- 增量聚合函数(incremental aggregatement functions)
- 介绍:每条数据到来就进行计算,保持一个简单的状态,应用场景相对简单,适用于增量聚合的操作,可以更有效地执行,因为Flink可以在每个窗口元素到达时以递增方式聚合它们
- 相关函数有 ReduceFunction、AggegateFunction、FoldFunction
- 全窗口函数(full window function)
- 全窗口函数会获取一个包含窗口中的所有元素的 Iterable 以及有关元素所属的窗口的其他元信息,这使得它不能够像增量聚合函数一样快速高效的执行,不过可以通过将ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction结合使用来获得窗口元素的增量聚合以及ProcessWindowFunction接收的其他窗口元数据,从而减轻这种情况。
- 相关函数有ProcessWindowFunction
2. ReduceFunction
ReduceFunction 将窗口输入数据中的两个元素组合在一起以产生相同类型的输出元素。Flink使用ReduceFunction来逐步聚合窗口中的元素。
import com.hnbian.flink.common.Obj1
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TestReduceFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MinTimeReduceFunction)
.print("TestReduceFunction")
environment.execute()
}
/**
* 定义一个 ReduceFunction 比较两个元素的时间大小,将时间比较大的元素返回
*/
class MinTimeReduceFunction extends ReduceFunction[Obj1]{
def reduce(r1: Obj1, r2: Obj1):Obj1 = {
println(s"r1.time=${r1.time},r2.time=${r2.time}")
if(r1.time > r2.time){
println(s"bigger is r1.time=${r1.time}")
r1
}else{
println(s"bigger is r2.time=${r2.time}")
r2
}
}
}
3. AggegateFunction
AggregateFunction是一种灵活的聚合函数,具有以下功能:
- 聚合可以对输入值,中间聚合和结果类型使用不同的类型,以支持各种聚合类型。
- 支持分布式聚合:可以将不同的中间聚合合并在一起,以实现预聚合/最终聚合优化。
- AggregateFunction的中间聚合(进行中的聚合状态)称为累加器。 将值添加到累加器,并通过确定累加器状态获得最终的合计。 这支持聚合函数,其中中间状态需要不同于聚合值和最终结果类型,例如平均值(通常保留计数和总和)。 合并中间聚合(部分聚合)是指合并累加器。
- AggregationFunction本身是无状态的。 为了允许单个AggregationFunction实例维护多个聚合(例如每个键一个聚合),无论何时启动新的聚合,AggregationFunction都会创建一个新的累加器。
- 聚合函数必须可Serializable因为它们是在分布式执行期间在分布式进程之间发送的。
- 代码示例
import com.hnbian.flink.window.TumblingTimeWIndow.Record
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.mutable.ArrayBuffer
object TestAggregateFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
stream1.print("print")
val stream2: DataStream[Record] = stream1.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate(new AgeAverageAggregateFunction)
.print("TestReduceFunction")
environment.execute()
}
// 累加器,保存计算过程中的聚合状态
class AverageAccumulator {
var records = ArrayBuffer[Record]()
var count:Long = 0L
var sum:Long = 0L
}
/**
* 使用 AggregateFunction 计算最近十秒内流入数据的用户的平均年龄
*/
class AgeAverageAggregateFunction extends AggregateFunction[Record, AverageAccumulator, (ArrayBuffer[Record], Double)] {
override def getResult(accumulator: AverageAccumulator): (ArrayBuffer[Record], Double)= {//
val avg = accumulator.sum./(accumulator.count.toDouble)
(accumulator.records,avg)
}
override def merge(a: AverageAccumulator, b: AverageAccumulator): AverageAccumulator = {
a.count += b.count
a.sum += b.sum
a.records.appendAll(b.records)
a
}
override def createAccumulator(): AverageAccumulator = {
new AverageAccumulator()
}
override def add(value: Record, accumulator: AverageAccumulator): AverageAccumulator = {
accumulator.records.append(value)
accumulator.sum += value.age
accumulator.count+=1
accumulator
}
}
4. FoldFunction
官方已经不建议用 Fold 了,使用 aggregate 来代替,这里就不做介绍了。
- 源码
@deprecated("use [[aggregate()]] instead")
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
val cleanFun = clean(function)
val folder = new ScalaFoldFunction[T, R](cleanFun)
fold(initialValue, folder)
}
5. WindowFunction
WindowFunction 能够比前面几种函数更加灵活的计算窗口内的数据,但是这样的代价是牺牲了一部分的性能开销,因为WindowFunction 的窗口化 transformation 之前,必须在内部缓存一个包含窗口中所有element的Iterable,及这些element所属的窗口的额外元信息(meta information)。不过我们可以使用 WindowFunction 结合 ReduceFunction 或者 AggregateFunction 在适当的提高性能。
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestWindowFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
// 设置一个窗口时间是 5 秒的窗口
stream1
.flatMap(_.split(","))
.map((_,1))
.keyBy(t =>t._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new CustomWindowFunction)
.print("TestWindowFunction")
environment.execute()
}
/**
* (String,Int) –输入值的类型
* String –输出值的类型
* String key 类型 –密钥的类型
* TimeWindow window 类型 –可以应用此窗口功能的 Window 类型
*/
class CustomWindowFunction extends WindowFunction[(String,Int),String,String,TimeWindow]{
val sdf = new SimpleDateFormat("HH:mm:ss")
override def apply(key: String,
window: TimeWindow,
input: Iterable[(String, Int)],
out: Collector[String]): Unit = {
println(
s"""
|window key:${key},
|开始时间:${sdf.format(window.getStart)},
|结束时间:${sdf.format(window.getEnd)},
|maxTime:${sdf.format(window.maxTimestamp())}
|""".stripMargin)
out.collect(s"${key},${input.map(_._2).sum}")
}
}
6. ProcessWindowFunction
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
import java.text.SimpleDateFormat
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestProcessWindowFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 5 秒的窗口
stream2
.keyBy(_.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new CuntomProcessFunction)
.print("TestWindowFunction")
environment.execute()
}
/**
*
* IN –输入值的类型。
* OUT –输出值的类型。
* KEY –密钥的类型。
* W –窗口的类型
*/
class CuntomProcessFunction extends ProcessWindowFunction[Obj1, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Obj1], out: Collector[String]): Unit = {
var count = 0
val sdf = new SimpleDateFormat("HH:mm:ss")
println(
s"""
|window key:${key},
|开始时间:${sdf.format(context.window.getStart)},
|结束时间:${sdf.format(context.window.getEnd)},
|maxTime:${sdf.format(context.window.maxTimestamp())}
|""".stripMargin)
// 遍历,获得窗口所有数据
for (obj <- elements) {
println(obj.toString)
count += 1
}
out.collect(s"Window ${context.window} , count : ${count}")
}
}
7. 全量窗口函数与增量窗口函数结合使用
使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction 或 WindowFunction,然后可以使用 context/window 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。
如下:我们使用 ReduceFunction 来计算 每个窗口的时间最小的元素,然后输出该元素和这个窗口的开始时间:
import com.hnbian.flink.common.Obj1
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestProcessWindowFunctionReduceFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 10 秒的窗口
stream2.keyBy(_.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MinTimeReduceFunction2,new CustomProcessFunction2)
.print("TestProcessWindowFunctionReduceFunction")
environment.execute()
}
class CustomProcessFunction2 extends ProcessWindowFunction[Obj1, (Long, Obj1), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Obj1], out: Collector[(Long, Obj1)]): Unit = {
val min = elements.iterator.next
out.collect((context.window.getStart, min))
}
}
/**
* 定义一个 ReduceFunction 比较两个元素的时间大小
*/
class MinTimeReduceFunction2 extends ReduceFunction[Obj1]{
override def reduce(r1: Obj1, r2: Obj1):Obj1 = {
if(r1.time > r2.time){
r1
}else{
r2
}
}
}
8.ProcessAllWindowFunction
与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
object TestProcessWindowAllFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 5 秒的窗口
private val allValueStream: AllWindowedStream[Obj1, TimeWindow] = stream2
.keyBy(_.id)
.timeWindowAll(Time.seconds(5))
allValueStream
.process(new CustomProcessAllWindowFunction)
.print("TestProcessWindowAllFunction")
environment.execute()
}
/**
* 定义一个ProcessAllWindowFunction
* 把窗口内所有用户名 拼接成字符串 用 "," 分隔
* 输入类型 obj1
* 输出类型 元组(Long,String)
* 窗口类型 TimeWindow
*/
class CustomProcessAllWindowFunction extends ProcessAllWindowFunction[Obj1, (Long,String), TimeWindow]{
override def process(context: Context, elements: Iterable[Obj1], out: Collector[(Long, String)]): Unit = {
println(s"start:${context.window.getStart}")
println(s"end:${context.window.getEnd}")
println(s"maxTimestamp:${context.window.maxTimestamp()}")
val key = context.window.getStart
val value = new ArrayBuffer[String]()
for (obj1<- elements){
value.append(obj1.name)
}
// 把窗口内所有用户名 拼接成字符串 用 "," 分隔
out.collect((key,value.mkString(",")))
}
}