Flink系列 11. 介绍Flink中 ProcessFunction 的使用


1. ProcessFunction 介绍

1.1 介绍

Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。

Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的 window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。

1.2 内部构件

ProcessFunction 是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素

  • state:状态,用于容错和一致性,仅用于keyed stream

  • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

1.3 分类

Flink 提供了 8 个 Process Function:

  1. ProcessFunction:用于DataStream流数据处理

  2. KeyedProcessFunction:用于keyBy之后的KeyedStream流处理

  3. CoProcessFunction:用于connect连接的流处理

  4. ProcessJoinFunction:用于join流操作

  5. BroadcastProcessFunction:用于广播

  6. KeyedBroadcastProcessFunction:keyBy之后的广播

  7. ProcessWindowFunction:窗口增量聚合

  8. ProcessAllWindowFunction:全窗口增量聚合

2. ProcessFunction

2.1 ProcessFunction 介绍

  • 可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
  • 从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer
ProcessFunction 继承图

2.2 ProcessFunction 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

private static final long serialVersionUID = 1L;

/**
* ProcessFunction处理数据的主要方法,处理输入流中的每个元素
* 此函数可以使用Collector输出零个或多个元素,
* 还可以使用Context参数更新内部状态或设置计时器。
*
* @param value 输入类型.
* @param ctx 上下文
* @param out 使用 collector 输出数据
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

/**
* 在使用TimerService设置的计时器触发时调用。
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

/**
* 上下文
*/
public abstract class Context {

/**
* 当前正在处理的元素的时间戳或触发计时器的时间戳
*/
public abstract Long timestamp();

/**
* 用于注册计时器和查询时间的TimerService
*/
public abstract TimerService timerService();

/**
* 向OutputTag标识的侧输出流发出记录
*
* @param outputTag 指定侧输出流
* @param value 发送的记录
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}

/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends Context {
/**
* 触发计时器的 TimeDomain
*/
public abstract TimeDomain timeDomain();
}

}

2.3 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
.process(new CustomProcessFunction)
.print("TestProcessFunction")
env.execute()
}

class CustomProcessFunction extends ProcessFunction[Obj1,String]{

/**
* 处理流中的每个数据,返回 ID 大于 10 的数据与处理数据的 processTime
* @param value
* @param ctx
* @param out
*/
override def processElement(value: Obj1, ctx: ProcessFunction[Obj1, String]#Context, out: Collector[String]): Unit = {
if(value.id > "10"){
out.collect(s"${value.name},${ctx.timerService().currentProcessingTime()}")
}
}
}

3. KeyedProcessFunction

3.1 介绍

  • KeyedProcessFunction 用来处理 KeyedStream 中的数据。
  • KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 流中的每一个元素都会在这个方法中进行处理
* 参数说明
* Collector :将处理完成的数据输出,可能输出 0 到多个结果
* Context:可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。
* : 还可以将结果输出到别的流(side outputs)。
* I :输入数据类型
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;


/**
* onTimer是一个回调函数。当之前注册的定时器触发时调用。
* 参数说明
* timestamp:为定时器所设定的触发的时间戳。
* Collector:为输出结果的集合。
* OnTimerContext:和processElement的Context参数一样,提供上下文的一些信息,例如定时器触发的时间信息
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

3.2 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestKeyedProcessFunction extends App {

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})
.keyBy(_.id)
.process(new CustomKeyedProcessFunction)
.print("TestKeyedProcessFunction")
env.execute()
}

/**
* KeyedProcessFunction
* String, 输入的 key 的数据类型
* Obj1, 输入的数据类型
* String 输出的数据类型
*/
class CustomKeyedProcessFunction extends KeyedProcessFunction[String, Obj1, String]{

override def processElement(value: Obj1, ctx: KeyedProcessFunction[String, Obj1, String]#Context, out: Collector[String]): Unit = {
println(s"当前 key:${ctx.getCurrentKey}")
println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()}")

out.collect(value.name)
}
}

4. CoProcessFunction

  • DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理

  • CoProcessFunction 提供了操作每一个输入流的方法: processElement1()processElement2()

  • 类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import com.hnbian.flink.common.Obj1
import com.hnbian.flink.common.Record
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestCoProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)

private val stream1Obj: DataStream[Obj1] = stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
})

val stream2Rec: DataStream[Record] = stream2.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})

stream1Obj
.connect(stream2Rec)
.process(new CustomCoProcessFunction)
.print()

env.execute()
}

/**
* 第一个流输入类型为 Obj1
* 第二个流输入类型为 Record
* 返回类型为 String
*/
class CustomCoProcessFunction extends CoProcessFunction[Obj1,Record,String]{
override def processElement1(value: Obj1, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
out.collect(s"processElement1:${value.name},${value.getClass}")
}

override def processElement2(value: Record, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
out.collect(s"processElement2:${value.name},${value.getClass}")
}
}

5. ProcessJoinFunction

ProcessJoinFunction 是一个接口,它定义了对在窗口中的两个流进行内连接操作时应用的函数。此函数会接收两个流中的匹配元素以及元素的时间戳,并产生一个结果。

这个接口通常与 IntervalJoin API一起使用,以在满足特定条件的情况下将两个流进行连接。这个条件通常是时间条件,即只有当两个元素的时间戳在指定的时间间隔内时,才会将它们进行连接。

1
2
3
4
5
6
7
8
9
10
11
12
leftKeyedStream
// intervalJoin目前只支持Event Time
.intervalJoin(rightKeyedStream)
// 时间间隔,设定下界和上界
.between(Time.minutes(-10),Time.seconds(0))
// 不包含下界
//.lowerBoundExclusive()
// 不包含上界
//.upperBoundExclusive()
// 自定义ProcessJoinFunction 处理Join到的元素
.process(ProcessJoinFunction)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object TestProcessJoinFunction extends App {

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)

private val stream1Obj: DataStream[Obj1] = stream1
.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
override def extractTimestamp(element: Obj1) = element.time * 1000
})

val stream1Obj2: DataStream[Obj1] = stream2.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
override def extractTimestamp(element: Obj1) = element.time * 1000
})

private val value: KeyedStream[Obj1, String]#IntervalJoined[Obj1, Obj1, String] = stream1Obj
.keyBy(_.name)
// 指定时间区间 join 数据
.intervalJoin(stream1Obj2.keyBy(_.name))
// 设置时间范围 从 EventTime前10分钟,到EventTime 时间
.between(Time.minutes(-10), Time.seconds(0))

value.process(new CustomProcessJoinFunction).print("TestProcessJoinFunction")

env.execute()
}

class CustomProcessJoinFunction extends ProcessJoinFunction[Obj1,Obj1,(String,Obj1,Obj1)]{
override def processElement
(obj: Obj1,
obj2: Obj1,
ctx: ProcessJoinFunction[Obj1, Obj1, (String, Obj1, Obj1)]#Context,
out: Collector[(String, Obj1, Obj1)]): Unit = {
out.collect((obj.name,obj,obj2))
}
}

6. BroadcastProcessFunction

BroadcastProcessFunction 是一个特殊的 CoProcessFunction,用于处理被广播的数据流。这种处理函数允许开发者创建一个或多个数据流,其中一个或多个可以被标记为广播流,并将它们的数据广播到所有并行实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.hnbian.flink.common.{Class,Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestBroadcastProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)


private val StudentStream: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})

val descriptor = new MapStateDescriptor[String, String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)

val ClassStream: DataStream[Class] = stream2.map(data => {
val arr = data.split(",")
Class(arr(0), arr(1))
})
val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)

StudentStream
.connect(ClassBradoStream)
.process(new CustomBroadcastProcessFunction)
.print("TestBroadcastProcessFunction")

env.execute()
}

/**
* 参数
* 未广播数据类型
* 广播数据类型
* 输出数据类型
*/
class CustomBroadcastProcessFunction extends BroadcastProcessFunction[Student,Class,String]{
override def processElement(value: Student, ctx: BroadcastProcessFunction[Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {

val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)

val className: String = classInfo.get(value.classId)

out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className}")
}

override def processBroadcastElement(value: Class, ctx: BroadcastProcessFunction[Student, Class, String]#Context, out: Collector[String]): Unit = {

val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)
println("更新状态")
classInfo.put(value.id,value.name)
}
}

7. KeyedBroadcastProcessFunction

KeyedBroadcastProcessFunction 是用于处理带键的数据流和被广播的数据流的处理函数。KeyedBroadcastProcessFunction 继承自 BroadcastProcessFunction,并添加了对键控流(即带键的数据流)的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import com.hnbian.flink.common.{Class, Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestKeyedBroadcastProcessFunction extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
val stream2: DataStream[String] = env.socketTextStream("localhost",8888)


private val StudentStream: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})

val descriptor = new MapStateDescriptor[String, String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)

val ClassStream: DataStream[Class] = stream2.map(data => {
val arr = data.split(",")
Class(arr(0), arr(1))
})
val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)

StudentStream.keyBy(_.classId)
.connect(ClassBradoStream)
.process(new CustomKeyedBroadcastProcessFunction)
.print("TestBroadcastProcessFunction")

env.execute()
}

/**
* key 类型
* 未广播数据类型
* 广播数据类型
* 输出数据类型
*/
class CustomKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction[String,Student,Class,String]{
override def processElement(value: Student, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {

println(s"processElement.key = ${ctx.getCurrentKey}")

val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)

val className: String = classInfo.get(value.classId)

out.collect(s"stuId:${value.id} stuName:${value.name} stuClassName:${className}")
}

override def processBroadcastElement(value: Class, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#Context, out: Collector[String]): Unit = {
val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)
println("更新状态")
classInfo.put(value.id,value.name)
}
}

8. ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.text.SimpleDateFormat
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TestProcessWindowFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})

// 设置一个窗口时间是 5 秒的窗口
stream2
.keyBy(_.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new CuntomProcessFunction)
.print("TestWindowFunction")

environment.execute()

}

/**
*
* IN –输入值的类型。
* OUT –输出值的类型。
* KEY –密钥的类型。
* W –窗口的类型
*/
class CuntomProcessFunction extends ProcessWindowFunction[Obj1, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Obj1], out: Collector[String]): Unit = {
var count = 0
val sdf = new SimpleDateFormat("HH:mm:ss")

println(
s"""
|window key:${key},
|开始时间:${sdf.format(context.window.getStart)},
|结束时间:${sdf.format(context.window.getEnd)},
|maxTime:${sdf.format(context.window.maxTimestamp())}
|""".stripMargin)

// 遍历,获得窗口所有数据
for (obj <- elements) {
println(obj.toString)
count += 1
}
out.collect(s"Window ${context.window} , count : ${count}")
}
}

9. ProcessAllWindowFunction

与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer

object TestProcessWindowAllFunction extends App {
val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

val stream2: DataStream[Obj1] = stream1.map(data => {
val arr = data.split(",")
Obj1(arr(0), arr(1), arr(2).toInt)
})

// 设置一个窗口时间是 5 秒的窗口
private val allValueStream: AllWindowedStream[Obj1, TimeWindow] = stream2
.keyBy(_.id)
.timeWindowAll(Time.seconds(5))

allValueStream
.process(new CustomProcessAllWindowFunction)
.print("TestProcessWindowAllFunction")

environment.execute()
}


/**
* 定义一个ProcessAllWindowFunction
* 把窗口内所有用户名 拼接成字符串 用 "," 分隔
* 输入类型 obj1
* 输出类型 元组(Long,String)
* 窗口类型 TimeWindow
*/
class CustomProcessAllWindowFunction extends ProcessAllWindowFunction[Obj1, (Long,String), TimeWindow]{
override def process(context: Context, elements: Iterable[Obj1], out: Collector[(Long, String)]): Unit = {

println(s"start:${context.window.getStart}")
println(s"end:${context.window.getEnd}")
println(s"maxTimestamp:${context.window.maxTimestamp()}")
val key = context.window.getStart
val value = new ArrayBuffer[String]()

for (obj1<- elements){
value.append(obj1.name)
}
// 把窗口内所有用户名 拼接成字符串 用 "," 分隔
out.collect((key,value.mkString(",")))
}
}

10. SideOutput(侧输出流)

大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

import com.hnbian.flink.common.Student
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/1/21 11:38
**/
object TestSideOutput extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream1: DataStream[String] = env.socketTextStream("localhost", 9999)

private val stream2: DataStream[Student] = stream1
.map(data => {
val arr = data.split(",")
Student(arr(0), arr(1), arr(2))
})


val StudentStream: DataStream[Student] = stream2.process(new ProcessFunction[Student,Student] {
override def processElement(value: Student, ctx: ProcessFunction[Student, Student]#Context, out: Collector[Student]) = {
lazy val outputTag: OutputTag[Student] = new OutputTag[Student]("class2")
if (value.classId == "2") {
ctx.output(outputTag, value)
}else{
// 所有数据直接常规输出到主流
out.collect(value)
}
}
})

StudentStream
.getSideOutput(new OutputTag[Student]("class2"))
.print("outPutTag")

StudentStream.print("StudentStream")

env.execute()
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录