1. ProcessFunction 介绍
1.1 介绍
Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
1.2 内部构件
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
events:数据流中的元素
state:状态,用于容错和一致性,仅用于keyed stream
timers:定时器,支持事件时间和处理时间,仅用于keyed stream
1.3 分类
Flink 提供了 8 个 Process Function:
ProcessFunction:用于DataStream流数据处理
KeyedProcessFunction:用于keyBy之后的KeyedStream流处理
CoProcessFunction:用于connect连接的流处理
ProcessJoinFunction:用于join流操作
BroadcastProcessFunction:用于广播
KeyedBroadcastProcessFunction:keyBy之后的广播
ProcessWindowFunction:窗口增量聚合
ProcessAllWindowFunction:全窗口增量聚合
2. ProcessFunction
2.1 ProcessFunction 介绍
- 可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
- 从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer
2.2 ProcessFunction 源码
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* ProcessFunction处理数据的主要方法,处理输入流中的每个元素
* 此函数可以使用Collector输出零个或多个元素,
* 还可以使用Context参数更新内部状态或设置计时器。
*
* @param value 输入类型.
* @param ctx 上下文
* @param out 使用 collector 输出数据
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* 在使用TimerService设置的计时器触发时调用。
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* 上下文
*/
public abstract class Context {
/**
* 当前正在处理的元素的时间戳或触发计时器的时间戳
*/
public abstract Long timestamp();
/**
* 用于注册计时器和查询时间的TimerService
*/
public abstract TimerService timerService();
/**
* 向OutputTag标识的侧输出流发出记录
*
* @param outputTag 指定侧输出流
* @param value 发送的记录
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends Context {
/**
* 触发计时器的 TimeDomain
*/
public abstract TimeDomain timeDomain();
}
}
2.3 测试代码
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
.process(new CustomProcessFunction)
.print("TestProcessFunction")
env.execute()
}
class CustomProcessFunction extends ProcessFunction[Obj1,String]{
/**
* 处理流中的每个数据,返回 ID 大于 10 的数据与处理数据的 processTime
* @param value
* @param ctx
* @param out
*/
override def processElement(value: Obj1, ctx: ProcessFunction[Obj1, String]#Context, out: Collector[String]): Unit = {
if(value.id > "10"){
out.collect(s"${value.name},${ctx.timerService().currentProcessingTime()}")
}
}
}
3. KeyedProcessFunction
3.1 介绍
- KeyedProcessFunction 用来处理 KeyedStream 中的数据。
- KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法:
/**
* 流中的每一个元素都会在这个方法中进行处理
* 参数说明
* Collector :将处理完成的数据输出,可能输出 0 到多个结果
* Context:可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。
* : 还可以将结果输出到别的流(side outputs)。
* I :输入数据类型
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* onTimer是一个回调函数。当之前注册的定时器触发时调用。
* 参数说明
* timestamp:为定时器所设定的触发的时间戳。
* Collector:为输出结果的集合。
* OnTimerContext:和processElement的Context参数一样,提供上下文的一些信息,例如定时器触发的时间信息
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
3.2 测试代码
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestKeyedProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
.keyBy(_.id)
.process(new CustomKeyedProcessFunction)
.print("TestKeyedProcessFunction")
env.execute()
}
/**
* KeyedProcessFunction
* String, 输入的 key 的数据类型
* Obj1, 输入的数据类型
* String 输出的数据类型
*/
class CustomKeyedProcessFunction extends KeyedProcessFunction[String, Obj1, String]{
override def processElement(value: Obj1, ctx: KeyedProcessFunction[String, Obj1, String]#Context, out: Collector[String]): Unit = {
println(s"当前 key:${ctx.getCurrentKey}")
println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()}")
out.collect(value.name)
}
}
4. CoProcessFunction
DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理
CoProcessFunction 提供了操作每一个输入流的方法: processElement1() 和 processElement2()
类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。
import com.hnbian.flink.common.Obj1
import com.hnbian.flink.common.Record
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestCoProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)
private val stream1Obj: DataStream[Obj1] = stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
val stream2Rec: DataStream[Record] = stream2.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
stream1Obj
.connect(stream2Rec)
.process(new CustomCoProcessFunction)
.print()
env.execute()
}
/**
* 第一个流输入类型为 Obj1
* 第二个流输入类型为 Record
* 返回类型为 String
*/
class CustomCoProcessFunction extends CoProcessFunction[Obj1,Record,String]{
override def processElement1(value: Obj1, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
out.collect(s"processElement1:${value.name},${value.getClass}")
}
override def processElement2(value: Record, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
out.collect(s"processElement2:${value.name},${value.getClass}")
}
}
5. ProcessJoinFunction
leftKeyedStream
// intervalJoin目前只支持Event Time
.intervalJoin(rightKeyedStream)
// 时间间隔,设定下界和上界
.between(Time.minutes(-10),Time.seconds(0))
// 不包含下界
//.lowerBoundExclusive()
// 不包含上界
//.upperBoundExclusive()
// 自定义ProcessJoinFunction 处理Join到的元素
.process(ProcessJoinFunction)
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object TestProcessJoinFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)
private val stream1Obj: DataStream[Obj1] = stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
override def extractTimestamp(element: Obj1) = element.time * 1000
})
val stream1Obj2: DataStream[Obj1] = stream2.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
override def extractTimestamp(element: Obj1) = element.time * 1000
})
private val value: KeyedStream[Obj1, String]#IntervalJoined[Obj1, Obj1, String] = stream1Obj
.keyBy(_.name)
// 指定时间区间 join 数据
.intervalJoin(stream1Obj2.keyBy(_.name))
// 设置时间范围 从 EventTime前10分钟,到EventTime 时间
.between(Time.minutes(-10), Time.seconds(0))
value.process(new CustomProcessJoinFunction).print("TestProcessJoinFunction")
env.execute()
}
class CustomProcessJoinFunction extends ProcessJoinFunction[Obj1,Obj1,(String,Obj1,Obj1)]{
override def processElement
(obj: Obj1,
obj2: Obj1,
ctx: ProcessJoinFunction[Obj1, Obj1, (String, Obj1, Obj1)]#Context,
out: Collector[(String, Obj1, Obj1)]): Unit = {
out.collect((obj.name,obj,obj2))
}
}
6. BroadcastProcessFunction
import com.hnbian.flink.common.{Class,Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestBroadcastProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)
private val StudentStream: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})
val descriptor = new MapStateDescriptor[String, String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
val ClassStream: DataStream[Class] = stream2.map(data => {
val arr = data.split(",")
Class(arr(0), arr(1))
})
val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)
StudentStream
.connect(ClassBradoStream)
.process(new CustomBroadcastProcessFunction)
.print("TestBroadcastProcessFunction")
env.execute()
}
/**
* 参数
* 未广播数据类型
* 广播数据类型
* 输出数据类型
*/
class CustomBroadcastProcessFunction extends BroadcastProcessFunction[Student,Class,String]{
override def processElement(value: Student, ctx: BroadcastProcessFunction[Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {
val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)
val className: String = classInfo.get(value.classId)
out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className}")
}
override def processBroadcastElement(value: Class, ctx: BroadcastProcessFunction[Student, Class, String]#Context, out: Collector[String]): Unit = {
val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)
println("更新状态")
classInfo.put(value.id,value.name)
}
}
7. KeyedBroadcastProcessFunction
import com.hnbian.flink.common.{Class, Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TestKeyedBroadcastProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)
private val StudentStream: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})
val descriptor = new MapStateDescriptor[String, String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
val ClassStream: DataStream[Class] = stream2.map(data => {
val arr = data.split(",")
Class(arr(0), arr(1))
})
val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)
StudentStream.keyBy(_.classId)
.connect(ClassBradoStream)
.process(new CustomKeyedBroadcastProcessFunction)
.print("TestBroadcastProcessFunction")
env.execute()
}
/**
* key 类型
* 未广播数据类型
* 广播数据类型
* 输出数据类型
*/
class CustomKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction[String,Student,Class,String]{
override def processElement(value: Student, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {
println(s"processElement.key = ${ctx.getCurrentKey}")
val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)
val className: String = classInfo.get(value.classId)
out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className}")
}
override def processBroadcastElement(value: Class, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#Context, out: Collector[String]): Unit = {
val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)
println("更新状态")
classInfo.put(value.id,value.name)
}
}
8. ProcessWindowFunction
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
import java.text.SimpleDateFormat
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestProcessWindowFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 5 秒的窗口
stream2
.keyBy(_.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new CuntomProcessFunction)
.print("TestWindowFunction")
environment.execute()
}
/**
*
* IN –输入值的类型。
* OUT –输出值的类型。
* KEY –密钥的类型。
* W –窗口的类型
*/
class CuntomProcessFunction extends ProcessWindowFunction[Obj1, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Obj1], out: Collector[String]): Unit = {
var count = 0
val sdf = new SimpleDateFormat("HH:mm:ss")
println(
s"""
|window key:${key},
|开始时间:${sdf.format(context.window.getStart)},
|结束时间:${sdf.format(context.window.getEnd)},
|maxTime:${sdf.format(context.window.maxTimestamp())}
|""".stripMargin)
// 遍历,获得窗口所有数据
for (obj <- elements) {
println(obj.toString)
count += 1
}
out.collect(s"Window ${context.window} , count : ${count}")
}
}
9. ProcessAllWindowFunction
与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
object TestProcessWindowAllFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)
val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})
// 设置一个窗口时间是 5 秒的窗口
private val allValueStream: AllWindowedStream[Obj1, TimeWindow] = stream2
.keyBy(_.id)
.timeWindowAll(Time.seconds(5))
allValueStream
.process(new CustomProcessAllWindowFunction)
.print("TestProcessWindowAllFunction")
environment.execute()
}
/**
* 定义一个ProcessAllWindowFunction
* 把窗口内所有用户名 拼接成字符串 用 "," 分隔
* 输入类型 obj1
* 输出类型 元组(Long,String)
* 窗口类型 TimeWindow
*/
class CustomProcessAllWindowFunction extends ProcessAllWindowFunction[Obj1, (Long,String), TimeWindow]{
override def process(context: Context, elements: Iterable[Obj1], out: Collector[(Long, String)]): Unit = {
println(s"start:${context.window.getStart}")
println(s"end:${context.window.getEnd}")
println(s"maxTimestamp:${context.window.maxTimestamp()}")
val key = context.window.getStart
val value = new ArrayBuffer[String]()
for (obj1<- elements){
value.append(obj1.name)
}
// 把窗口内所有用户名 拼接成字符串 用 "," 分隔
out.collect((key,value.mkString(",")))
}
}
10. SideOutput(侧输出流)
大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
import com.hnbian.flink.common.Student
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/21 11:38
**/
object TestSideOutput extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream1: DataStream[String] = env.socketTextStream("localhost", 9999)
private val stream2: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})
val StudentStream: DataStream[Student] = stream2.process(new ProcessFunction[Student,Student] {
override def processElement(value: Student, ctx: ProcessFunction[Student, Student]#Context, out: Collector[Student]) = {
lazy val outputTag: OutputTag[Student] = new OutputTag[Student]("class2")
if (value.classId == "2") {
ctx.output(outputTag, value)
}else{
// 所有数据直接常规输出到主流
out.collect(value)
}
}
})
StudentStream
.getSideOutput(new OutputTag[Student]("class2"))
.print("outPutTag")
StudentStream.print("StudentStream")
env.execute()
}