1. ProcessFunction 介绍 1.1 介绍 Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的 window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
1.2 内部构件 ProcessFunction 是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
1.3 分类 Flink 提供了 8 个 Process Function:
ProcessFunction:用于DataStream流数据处理
KeyedProcessFunction:用于keyBy之后的KeyedStream流处理
CoProcessFunction:用于connect连接的流处理
ProcessJoinFunction:用于join流操作
BroadcastProcessFunction:用于广播
KeyedBroadcastProcessFunction:keyBy之后的广播
ProcessWindowFunction:窗口增量聚合
ProcessAllWindowFunction:全窗口增量聚合
2. ProcessFunction 2.1 ProcessFunction 介绍
可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer
2.2 ProcessFunction 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public abstract class ProcessFunction<I , O> extends AbstractRichFunction { private static final long serialVersionUID = 1 L; public abstract void processElement(I value, Context ctx, Collector <O > out) throws Exception ; public void onTimer(long timestamp, OnTimerContext ctx, Collector <O > out) throws Exception {} public abstract class Context { public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X > void output(OutputTag <X > outputTag, X value); } public abstract class OnTimerContext extends Context { public abstract TimeDomain timeDomain(); } }
2.3 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestProcessFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) stream1 .map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) .process(new CustomProcessFunction ) .print("TestProcessFunction" ) env.execute() } class CustomProcessFunction extends ProcessFunction [Obj1 ,String ] { override def processElement (value: Obj1 , ctx: ProcessFunction [Obj1 , String ]#Context , out: Collector [String ]): Unit = { if (value.id > "10" ){ out.collect(s"${value.name} ,${ctx.timerService().currentProcessingTime()} " ) } } }
3. KeyedProcessFunction 3.1 介绍
KeyedProcessFunction 用来处理 KeyedStream 中的数据。
KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public abstract void processElement(I value, Context ctx, Collector <O > out) throws Exception ; public void onTimer(long timestamp, OnTimerContext ctx, Collector <O > out) throws Exception {}
3.2 测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestKeyedProcessFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) stream1 .map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) .keyBy(_.id) .process(new CustomKeyedProcessFunction ) .print("TestKeyedProcessFunction" ) env.execute() } class CustomKeyedProcessFunction extends KeyedProcessFunction [String , Obj1 , String ] { override def processElement (value: Obj1 , ctx: KeyedProcessFunction [String , Obj1 , String ]#Context , out: Collector [String ]): Unit = { println(s"当前 key:${ctx.getCurrentKey} " ) println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()} " ) out.collect(value.name) } }
4. CoProcessFunction
DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理
CoProcessFunction 提供了操作每一个输入流的方法: processElement1() 和 processElement2()
类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import com.hnbian.flink.common.Obj1 import com.hnbian.flink.common.Record import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestCoProcessFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [String ] = env.socketTextStream("localhost" ,8888 ) private val stream1Obj: DataStream [Obj1 ] = stream1 .map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }) val stream2Rec: DataStream [Record ] = stream2.map(data => { val arr = data.split("," ) Record (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream1Obj .connect(stream2Rec) .process(new CustomCoProcessFunction ) .print() env.execute() } class CustomCoProcessFunction extends CoProcessFunction [Obj1 ,Record ,String ] { override def processElement1 (value: Obj1 , ctx: CoProcessFunction [Obj1 , Record , String ]#Context , out: Collector [String ]): Unit = { out.collect(s"processElement1:${value.name} ,${value.getClass} " ) } override def processElement2 (value: Record , ctx: CoProcessFunction [Obj1 , Record , String ]#Context , out: Collector [String ]): Unit = { out.collect(s"processElement2:${value.name} ,${value.getClass} " ) } }
5. ProcessJoinFunction ProcessJoinFunction 是一个接口,它定义了对在窗口中的两个流进行内连接操作时应用的函数。此函数会接收两个流中的匹配元素以及元素的时间戳,并产生一个结果。
这个接口通常与 IntervalJoin API一起使用,以在满足特定条件的情况下将两个流进行连接。这个条件通常是时间条件,即只有当两个元素的时间戳在指定的时间间隔内时,才会将它们进行连接。
1 2 3 4 5 6 7 8 9 10 11 12 leftKeyedStream .intervalJoin(rightKeyedStream) .between(Time .minutes(-10 ),Time .seconds(0 )) .process(ProcessJoinFunction )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector object TestProcessJoinFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [String ] = env.socketTextStream("localhost" ,8888 ) private val stream1Obj: DataStream [Obj1 ] = stream1 .map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor [Obj1 ](Time .seconds(3 )) { override def extractTimestamp (element: Obj1 ) = element.time * 1000 }) val stream1Obj2: DataStream [Obj1 ] = stream2.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toLong) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor [Obj1 ](Time .seconds(3 )) { override def extractTimestamp (element: Obj1 ) = element.time * 1000 }) private val value: KeyedStream [Obj1 , String ]#IntervalJoined [Obj1 , Obj1 , String ] = stream1Obj .keyBy(_.name) .intervalJoin(stream1Obj2.keyBy(_.name)) .between(Time .minutes(-10 ), Time .seconds(0 )) value.process(new CustomProcessJoinFunction ).print("TestProcessJoinFunction" ) env.execute() } class CustomProcessJoinFunction extends ProcessJoinFunction [Obj1 ,Obj1 ,(String ,Obj1 ,Obj1 )] { override def processElement (obj: Obj1 , obj2: Obj1 , ctx: ProcessJoinFunction [Obj1 , Obj1 , (String , Obj1 , Obj1 )]#Context , out: Collector [(String , Obj1 , Obj1 )]): Unit = { out.collect((obj.name,obj,obj2)) } }
6. BroadcastProcessFunction BroadcastProcessFunction 是一个特殊的 CoProcessFunction,用于处理被广播的数据流。这种处理函数允许开发者创建一个或多个数据流,其中一个或多个可以被标记为广播流,并将它们的数据广播到所有并行实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import com.hnbian.flink.common.{Class ,Student }import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestBroadcastProcessFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [String ] = env.socketTextStream("localhost" ,8888 ) private val StudentStream : DataStream [Student ] = stream1 .map(data => { val arr = data.split("," ) Student (arr(0 ), arr(1 ), arr(2 )) }) val descriptor = new MapStateDescriptor [String , String ]("classInfo" , BasicTypeInfo .STRING_TYPE_INFO , BasicTypeInfo .STRING_TYPE_INFO ) val ClassStream : DataStream [Class ] = stream2.map(data => { val arr = data.split("," ) Class (arr(0 ), arr(1 )) }) val ClassBradoStream : BroadcastStream [Class ] = ClassStream .broadcast(descriptor) StudentStream .connect(ClassBradoStream ) .process(new CustomBroadcastProcessFunction ) .print("TestBroadcastProcessFunction" ) env.execute() } class CustomBroadcastProcessFunction extends BroadcastProcessFunction [Student ,Class ,String ] { override def processElement (value: Student , ctx: BroadcastProcessFunction [Student , Class , String ]#ReadOnlyContext , out: Collector [String ]): Unit = { val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction .descriptor) val className: String = classInfo.get(value.classId) out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className} " ) } override def processBroadcastElement (value: Class , ctx: BroadcastProcessFunction [Student , Class , String ]#Context , out: Collector [String ]): Unit = { val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction .descriptor) println("更新状态" ) classInfo.put(value.id,value.name) } }
7. KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction 是用于处理带键的数据流和被广播的数据流的处理函数。KeyedBroadcastProcessFunction 继承自 BroadcastProcessFunction,并添加了对键控流(即带键的数据流)的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import com.hnbian.flink.common.{Class , Student }import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestKeyedBroadcastProcessFunction extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = env.socketTextStream("localhost" ,9999 ) val stream2: DataStream [String ] = env.socketTextStream("localhost" ,8888 ) private val StudentStream : DataStream [Student ] = stream1 .map(data => { val arr = data.split("," ) Student (arr(0 ), arr(1 ), arr(2 )) }) val descriptor = new MapStateDescriptor [String , String ]("classInfo" , BasicTypeInfo .STRING_TYPE_INFO , BasicTypeInfo .STRING_TYPE_INFO ) val ClassStream : DataStream [Class ] = stream2.map(data => { val arr = data.split("," ) Class (arr(0 ), arr(1 )) }) val ClassBradoStream : BroadcastStream [Class ] = ClassStream .broadcast(descriptor) StudentStream .keyBy(_.classId) .connect(ClassBradoStream ) .process(new CustomKeyedBroadcastProcessFunction ) .print("TestBroadcastProcessFunction" ) env.execute() } class CustomKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction [String ,Student ,Class ,String ] { override def processElement (value: Student , ctx: KeyedBroadcastProcessFunction [String , Student , Class , String ]#ReadOnlyContext , out: Collector [String ]): Unit = { println(s"processElement.key = ${ctx.getCurrentKey} " ) val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction .descriptor) val className: String = classInfo.get(value.classId) out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className} " ) } override def processBroadcastElement (value: Class , ctx: KeyedBroadcastProcessFunction [String , Student , Class , String ]#Context , out: Collector [String ]): Unit = { val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction .descriptor) println("更新状态" ) classInfo.put(value.id,value.name) } }
8. ProcessWindowFunction ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import java.text.SimpleDateFormat import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction , WindowFunction }import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object TestProcessWindowFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) stream2 .keyBy(_.id) .window(TumblingProcessingTimeWindows .of(Time .seconds(5 ))) .process(new CuntomProcessFunction ) .print("TestWindowFunction" ) environment.execute() } class CuntomProcessFunction extends ProcessWindowFunction [Obj1 , String , String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Obj1 ], out: Collector [String ]): Unit = { var count = 0 val sdf = new SimpleDateFormat ("HH:mm:ss" ) println( s"" " |window key:${key}, |开始时间:${sdf.format(context.window.getStart)}, |结束时间:${sdf.format(context.window.getEnd)}, |maxTime:${sdf.format(context.window.maxTimestamp())} |" "" .stripMargin) for (obj <- elements) { println(obj.toString) count += 1 } out.collect(s"Window ${context.window} , count : ${count} " ) } }
9. ProcessAllWindowFunction 与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import com.hnbian.flink.common.Obj1 import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object TestProcessWindowAllFunction extends App { val environment:StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val stream1: DataStream [String ] = environment.socketTextStream("localhost" ,9999 ) val stream2: DataStream [Obj1 ] = stream1.map(data => { val arr = data.split("," ) Obj1 (arr(0 ), arr(1 ), arr(2 ).toInt) }) private val allValueStream: AllWindowedStream [Obj1 , TimeWindow ] = stream2 .keyBy(_.id) .timeWindowAll(Time .seconds(5 )) allValueStream .process(new CustomProcessAllWindowFunction ) .print("TestProcessWindowAllFunction" ) environment.execute() } class CustomProcessAllWindowFunction extends ProcessAllWindowFunction [Obj1 , (Long ,String ), TimeWindow ] { override def process (context: Context , elements: Iterable [Obj1 ], out: Collector [(Long , String )]): Unit = { println(s"start:${context.window.getStart} " ) println(s"end:${context.window.getEnd} " ) println(s"maxTimestamp:${context.window.maxTimestamp()} " ) val key = context.window.getStart val value = new ArrayBuffer [String ]() for (obj1<- elements){ value.append(obj1.name) } out.collect((key,value.mkString("," ))) } }
10. SideOutput(侧输出流) 大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import com.hnbian.flink.common.Student import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object TestSideOutput extends App { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream1: DataStream [String ] = env.socketTextStream("localhost" , 9999 ) private val stream2: DataStream [Student ] = stream1 .map(data => { val arr = data.split("," ) Student (arr(0 ), arr(1 ), arr(2 )) }) val StudentStream : DataStream [Student ] = stream2.process(new ProcessFunction [Student ,Student ] { override def processElement (value: Student , ctx: ProcessFunction [Student , Student ]#Context , out: Collector [Student ]) = { lazy val outputTag: OutputTag [Student ] = new OutputTag [Student ]("class2" ) if (value.classId == "2" ) { ctx.output(outputTag, value) }else { out.collect(value) } } }) StudentStream .getSideOutput(new OutputTag [Student ]("class2" )) .print("outPutTag" ) StudentStream .print("StudentStream" ) env.execute() }