1. CEP 是什么
CEP 是 Complex Event Processing 三个单词的缩写,表示复杂事件处理。
CEP 是 Flink 专门为我们提供的一个基于复杂事件监测处理的库。
CEP 通过一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
CEP 复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。
Flink 基于 DataStrem API 提供了 FlinkCEP 组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
2. CEP 的特点
- 目标:从有序的简单事件流中发现一些高阶特征
- 输入:一个或多个简单事件构成的事件流
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
- 输出:满足规则的复杂事件
3. Pattern 介绍
CEP 提供了 Pattern API,定义用于对输入流数据进行复杂事件的规则,用来提取符合规则的事件结果
处理事件的规则,被叫做“模式(Pattern)”
包含四个步骤
- 创建输入事件流
- 定义 匹配事件 Pattern 的规则,
- 将定义的 Pattern 应用在事件流上并对事件流进行检测
- 选取符合 Pattern 匹配到的结果
简单的 Pattern API 示例
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time
object CEPTest extends App{
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
private val value: DataStream[Record] = stream1
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 2. 定义一个 Pattern, 匹配两秒内处理到的年龄都是 20 岁的两个人
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 岁
.next("next").where(_.age == 20)// 匹配第二个获取到的年龄是 20 岁
.within(Time.seconds(2)) // 设定约束时间为 2 秒
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](value, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(new PatternMacthData)
result.print("CEPTest")
env.execute()
}
/**
* 定义一个Pattern Select Function 对匹配到的数据进行处理
*/
class PatternMacthData extends PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
val next: Record = pattern.get("next").iterator().next()
s"${start.name} 与 ${next.name} 都是 ${start.age} 岁"
}
}
4. Pattern 的定义与类型
定义 Pattern 可以是单次执行模式,也可以是循环执行模式。
单次执行模式一次只接受 一个事件
循环执行模式可以接收一个或者多个事件
可以通过指定循环次数将单次执行模式变为循环执行模式。
每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过 where 方法进行叠加。
每个 Pattern 都是通过 begin 方法为开始进行Pattern定义的,下一步通过 Pattern.where()方法在 Pattern 上指定匹配条件,只有当 匹配条件满足之后,当前的 Pattern 才会接受事件。
val pattern = Pattern
.begin[Event]("start") // 指定匹配模式的名称,后续该名称会作为 key 对应匹配到的数据
.where(_.id >= "100") // 设置匹配条件, 匹配 id 大于 100 的数据
4.1 定义 Pattern 的条件
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在 FlinkCFP 中通过 pattern.where()、 pattern.or() 及 pattern.until() 方法来为 Pattern 指定条件,且 Pattern 条件有 Simple Conditions 及Combining Conditions 等类型
- 使用 Pattern API 之前需要导入 CEP 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
4.1.1 简单条件
简单条件(Simple Conditions):其主要根据事件中的字段信息进行判断,决定是否接受该事件。
通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
- 简单条件代码示例
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepConditionSimpleTest extends App{
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//##########################################################################
//##########################################################################
//##########################################################################
// 2.定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
//##########################################################################
//##########################################################################
//##########################################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
start.toString
}
}
)
result.print("CepConditionSimpleTest")
env.execute()
}
4.1.2 组合条件
- 组合条件(Combining Conditions):是将简单条件进行合并
- .where() 方法进行条件的组合,表示 AND
- .or() 方法进行条件的组合,表示 OR
// 把id 大于等于 100 或者年龄大于 30 的事件挑选出来
val start = Pattern.begin[Event]("start")
.where(_.id >= "100").or(_.age >30)
- 组合条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepConditionCombiningTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
recordStream.print()
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern1: Pattern[Record, Record] = Pattern
// 匹配年龄为 20 的记录
.begin[Record]("start")
.where(_.age == 20) // age == 20
private val pattern2: Pattern[Record, Record] = Pattern
// 匹配年龄为 20 或年龄为 18 的记录
.begin[Record]("start")
.where(_.age == 20)
.or( _.age==18 ) // (age==20 || age == 18)
private val pattern3: Pattern[Record, Record] = Pattern
// 匹配年龄为 20 或年龄为 18 但 classId 为 1 的记录
.begin[Record]("start")
.where(_.age == 20)
.or( _.age==18 )
.where(_.classId=="1") // ((age==20 || age == 18)&& classId == "1")
private val pattern4: Pattern[Record, Record] = Pattern
// 匹配年龄为 20 或年龄为 18 但 classId 为 1 或 name为小明的记录
.begin[Record]("start")
.where(_.age == 20)
.or( _.age==18 )
.where(_.classId=="1")
.or(_.name == "小明") // ((age==20 || age == 18)&& classId == "1") || name== "小明"
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern4)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
start.toString
}
}
)
result.print("CepConditionSimpleTest")
env.execute()
}
4.1.3 迭代条件
迭代条件:(Iterative Condition)
对前面条件匹配到的数据再次进行匹配
def where(condition: IterativeCondition[F]): Pattern[T, F] = {
jPattern.where(condition)
this
}
- 迭代条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepConditionIterativeTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
recordStream.print()
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 匹配年龄为 20 或年龄为 18 的记录
.begin[Record]("start")
.where(_.age == 20)
.or( _.age==18 ) // (age==20 || age == 18)
.where(new MyIterativeCondition())
// 定义迭代匹配条件
class MyIterativeCondition extends IterativeCondition[Record]{
override def filter(value: Record, ctx: IterativeCondition.Context[Record]): Boolean = {
if(value.name=="小明"){ // 前面匹配到的记录 name 不能为小明
false
}else if (value.classId == "1"){ // 前面匹配到的数据 classId 为 1 才会匹配成功
true
}else{ // 其他情况 匹配不成功
false
}
}
}
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
start.toString
}
}
)
result.print("CepConditionIterativeTest")
env.execute()
}
4.1.4 终止条件
- 终止条件( Stop condition):如果程序中使用了 oneOrMore 或者 oneOrMore().optional() 方法,还可以指定停止条件,否则模式中的规则会一直循环下去,如下终止条件通过 until()方法指定
- 终止条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepConditionStopTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 匹配所有 age 为 20 的记录,一直到接到 classId 为 2 的记录
.begin[Record]("start")
.where(_.age == 20)
.oneOrMore
.until(_.classId=="2")
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
start.toString
}
}
)
result.print("CepConditionStopTest")
env.execute()
}
4.2 设置时间约束
可以为模式指定时间约束,用来要求在多长时间内匹配有效
// 定义一个 Pattern, 匹配五秒内处理到的年龄都是 20 的两个事件
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 的事件
.next("next").where(_.age == 20)// 匹配第二个获取到的年龄是 20 的事件
.within(Time.seconds(5)) // 设置时间约束
4.3 设置循环 Pattern
对于已经创建好的 Pattern,可以在一个简单条件后追加量词,也就是指定循环次数,形成循环执行的 Pattern
4.3.1 times
- 可以通过 times 指定固定的循环执行次数
- 不会匹配出累积到触发次数最后一条数据,可以想象为数组下标最大值为 数组长度-1
//指定循环触发4次
start.times(3)
//可以执行触发次数范围,让循环执行次数在该范围之内
start.times(2, 4)
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepQuantifierTimesTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
/*
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20).times(2)
.next("next").where(_.classId == "2")
*/
/*
1,小红1,20
1,小红2,20
2,小小小小小小,22 》 Record(1,小红1,20),Record(2,小小小小小小,22)
1,小明3,20
1,小红4,20
1,小红5,20
2,小小小小小小,22 》Record(1,小红4,20),Record(2,小小小小小小,22)
*/
// 在 classId 为 2 之前 累积 age 匹配次数为 2或3或4 次时触发
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20).times(2, 4)
.next("next").where(_.classId == "2")
/**
1,小红0,20 -》触发 1
2,xx,100 未触发,因为 age==20 只累积一次
1,小红1,20 -》触发 1
1,小红2,20 -》触发2
2,xx,200 触发1 》 Record(1,小红1,20),Record(2,xx,200)
Record(1,小红0,20),Record(2,xx,200)
1,小红3,20 -》触发2
1,小红4,20 -》触发2
1,小红5,20 -》触发 3 前累积5 次 超过最大值的 4次,故丢弃该数据
2,xx,300 触发2 》 Record(1,小红3,20),Record(2,xx,300)
Record(1,小红2,20),Record(2,xx,300)
Record(1,小红4,20),Record(2,xx,300)
1,小红6,20 触发3
1,小红7,20 触发3
1,小红8,20 触发3
1,小红9,20 -》触发 3 前累积7 次 超过最大值的 4次,故丢弃该数据
2,xx,400 触发3 》Record(1,小红6,20),Record(2,xx,400)
Record(1,小红8,20),Record(2,xx,400)
Record(1,小红7,20),Record(2,xx,400)
1,小红10,20 -》触发 3 前累积7 次 超过最大值的 4次,故丢弃该数据
1,小红11,20 触发4
1,小红12,20 触发4
1,小红13,20 触发4
1,小红14,20 未触发
2,xx,400 触发4 》Record(1,小红11,20),Record(2,xx,400)
Record(1,小红12,20),Record(2,xx,400)
Record(1,小红13,20),Record(2,xx,400)
2,xx,500 未触发
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
val next: Record = pattern.get("next").iterator().next()
s"${start.toString},${next.toString}"
}
}
)
result.print("CepQuantifierTimesTest")
env.execute()
}
4.3.2 optional
- 可以通过 optional 关键字指定触发条件有两个
- 当到达触发条件时出现的次数为 0 会触发
- 当到达触发条件是出现次数达到指定次数触发
- 注意,当出现次数为 0 触发时,获取对应事件时可能为 null
// 指定条件出现 4 次或 0 次 才会触发
start.times(4).optional
// 指定条件出现 2、3、4 次或 0 次 才会触发
start.times(2, 4).optional
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepQuantifierOptionalTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20).times(2).optional
.next("next").where(_.classId == "2")
/**
* 2,xx,100 > end:Record(2,xx,100)
* 1,小红1,20
* 1,小红2,20
* 2,xx,300 >
* end:Record(2,xx,300)
* start:Record(1,小红1,20),end:Record(2,xx,300)
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
//
val result:StringBuffer = new StringBuffer()
var start: Record = null
// 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
val next: Record = pattern.get("next").iterator().next()
result.append("end:").append(next.toString)
result.toString
}
}
)
result.print("CepQuantifierOptionalTest")
env.execute()
}
4.3.3 greedy
- 可以通过 greedy 将 Pattern 标记为贪婪模式
- 在 Pattern 匹配成功的前提下,会尽可能多次触发
- 下一次触发可能会重复匹配前一次触发时匹配到的数据
// 指定条件出现 2、3、4次触发,并且匹配尽可能多的次数
start.times(2, 4).greedy
// 指定条件出现 0、2、3、4次触发,并且匹配尽可能多的次数
start.times(2, 4).optional.greedy
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepQuantifierGreedyTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
// 匹配在 classId 为 2 之前,出现 2-7 次 age 为 20 的记录,尽可能多的匹配,会出现一条记录多次批评的记录
.where(_.age == 20).times(2,7).greedy
.next("next").where(_.classId == "2")
/**
1,小红0,20 触发 1 触发 2
2,xx,100
1,小红1,20 触发 1 触发 2
1,小红2,20 触发 2
2,xx,200 触发 1>
start:Record(1,小红0,20),end:Record(2,xx,200)
start:Record(1,小红1,20),end:Record(2,xx,200)
1,小红3,20 触发 2 触发 3
1,小红4,20 触发 2 触发 3
1,小红5,20 触发 3 触发 4
2,xx,300 触发 2>
start:Record(1,小红0,20),end:Record(2,xx,300)
start:Record(1,小红2,20),end:Record(2,xx,300)
start:Record(1,小红1,20),end:Record(2,xx,300)
start:Record(1,小红4,20),end:Record(2,xx,300)
start:Record(1,小红3,20),end:Record(2,xx,300)
1,小红6,20 触发 3 触发 4 触发 5
1,小红7,20 触发 3 触发 4 触发 5
1,小红8,20 触发 3 触发 4 触发 5
1,小红9,20 触发 4 触发 5
2,xx,400 触发 3 >
start:Record(1,小红8,20),end:Record(2,xx,400)
start:Record(1,小红6,20),end:Record(2,xx,400)
start:Record(1,小红5,20),end:Record(2,xx,400)
start:Record(1,小红3,20),end:Record(2,xx,400)
start:Record(1,小红4,20),end:Record(2,xx,400)
start:Record(1,小红7,20),end:Record(2,xx,400)
1,小红10,20 触发 4 触发 5
1,小红11,20 触发 5
2,xx,400 触发 4 >
start:Record(1,小红7,20),end:Record(2,xx,400)
start:Record(1,小红10,20),end:Record(2,xx,400)
start:Record(1,小红8,20),end:Record(2,xx,400)
start:Record(1,小红5,20),end:Record(2,xx,400)
start:Record(1,小红6,20),end:Record(2,xx,400)
start:Record(1,小红9,20),end:Record(2,xx,400)
1,小红12,20
2,xx,500 触发 5>
start:Record(1,小红10,20),end:Record(2,xx,500)
start:Record(1,小红9,20),end:Record(2,xx,500)
start:Record(1,小红8,20),end:Record(2,xx,500)
start:Record(1,小红7,20),end:Record(2,xx,500)
start:Record(1,小红11,20),end:Record(2,xx,500)
start:Record(1,小红6,20),end:Record(2,xx,500)
2,xx,600 未触发
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
//
val result:StringBuffer = new StringBuffer()
var start: Record = null
// 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
val next: Record = pattern.get("next").iterator().next()
result.append("end:").append(next.toString)
result.toString
}
}
)
result.print("CepQuantifierGreedyTest")
env.execute()
}
4.3.4 oneOrMore
- 可以通过 oneOrMore 方法指定触发一次或多次
- 匹配触发条件前的一次到多次
- 已经前一次匹配的数据在后一次触发还会被匹配到
- 与 times 不同的是会匹配到触发事件及其之前的所有数据
// 触发一次或者多次
start.oneOrMore()
//触发一次或者多次,尽可能重复执行
start.oneOrMore().greedy()
// 触发0次或者多次
start.oneOrMore().optional()
// 触发0次或者多次,尽可能重复执行
start.oneOrMore().optional().greedy()
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepQuantifierOneOrMoreTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
// 当 遇见 classId 为 2 之前 累积 age==20 匹配一次或多次时触发
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20).oneOrMore
.next("next").where(_.classId == "2")
/**
1,小红0,20 触发 1 触发 2 触发 3
2,xx,100》 触发 1
Record(1,小红0,20),Record(2,xx,100)
1,小红1,20 触发 2 触发 3
1,小红2,20 触发 2 触发 3
2,xx,200 》 触发 2 触发 3
Record(1,小红2,20),Record(2,xx,200)
Record(1,小红0,20),Record(2,xx,200)
Record(1,小红1,20),Record(2,xx,200)
1,小红3,20 触发 3
1,小红4,20 触发 3
1,小红5,20 触发 3
2,xx,300》 触发 3
Record(1,小红5,20),Record(2,xx,300)
Record(1,小红3,20),Record(2,xx,300)
Record(1,小红0,20),Record(2,xx,300)
Record(1,小红4,20),Record(2,xx,300)
Record(1,小红2,20),Record(2,xx,300)
Record(1,小红1,20),Record(2,xx,300)
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
val next: Record = pattern.get("next").iterator().next()
s"${start.toString},${next.toString}"
}
}
)
result.print("CepQuantifierOneOrMoreTest")
env.execute()
}
4.3.5 timesOrMor
通过 timesOrMore 方法可以指定触发固定次数以上,例如执行两次以上
// 触发两次或者多次
start.timesOrMore(2);
// 触发两次或者多次,尽可能重复执行
start.timesOrMore(2).greedy()
// 不触发或者触发两次以上,尽可能重执行
start.timesOrMore(2).optional().greedy()
- 测试点代码
package com.hnbian.flink.cep
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepQuantifierTimesOrMorTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
// 当 遇见 classId 为 2 之前 累积 age==20 匹配两次或多次时触发
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20).timesOrMore(2)
.next("next").where(_.classId == "2")
/**
1,小红0,20 触发1 触发2
2,xx,100
1,小红1,20 触发1 触发2
1,小红2,20 触发2
2,xx,200 》 触发1
CepQuantifierTimesOrMorTest:6> Record(1,小红0,20),Record(2,xx,200)
CepQuantifierTimesOrMorTest:7> Record(1,小红1,20),Record(2,xx,200)
1,小红3,20 触发2
1,小红4,20 触发2
1,小红5,20
2,xx,300 》 触发2
CepQuantifierTimesOrMorTest:9> Record(1,小红1,20),Record(2,xx,300)
CepQuantifierTimesOrMorTest:12> Record(1,小红4,20),Record(2,xx,300)
CepQuantifierTimesOrMorTest:10> Record(1,小红2,20),Record(2,xx,300)
CepQuantifierTimesOrMorTest:11> Record(1,小红3,20),Record(2,xx,300)
CepQuantifierTimesOrMorTest:8> Record(1,小红0,20),Record(2,xx,300)
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
val next: Record = pattern.get("next").iterator().next()
s"${start.toString},${next.toString}"
}
}
)
result.print("CepQuantifierTimesOrMorTest")
env.execute()
}
4.3.6 设置条件注意项
模式序列不能以 .notFollowedBy() 结束
“ not ” 类型的模式不能被 optional 所修饰
4.4 组合模式
将相互独立的模式进行组合然后形成模式序列。
模式序列基本的编写方式和独立模式一 致,各个模式之间通过邻近条件进行连接即可,
其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
模式序列必须是以一个“begin”开始:val start = Pattern.begin(“start”)
4.4.1 严格近邻
严格近邻:Strict Contiguity
所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定
例如对于模式 “a next b “ 事件序列 【a,c,b,d】则没有匹配
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepContiguityStrictTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20)
// 定义严格近邻模式,age 为 20 的记录后必须紧跟 classId 为 2 的记录才会匹配
.next("next").where(_.classId == "2")
/**
1,小红0,20 》 匹配 1
2,xx,100 》 匹配 1
CepContiguityStrictTest:6> Record(1,小红0,20),Record(2,xx,100)
1,小红1,20 》未匹配
1,小红2,20 》未匹配
3,xx,200 》未匹配
1,小红3,20 》未匹配
1,小红4,20 》未匹配
1,小红5,20 》 匹配 2
2,xx,300 》 匹配 2
CepContiguityStrictTest:7> Record(1,小红5,20),Record(2,xx,300)
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
val next: Record = pattern.get("next").iterator().next()
s"${start.toString},${next.toString}"
}
}
)
result.print("CepContiguityStrictTest")
env.execute()
}
4.4.2 宽松近邻
宽松近邻(Relaxed Contiguity)
允许中间出现不匹配的时间,由.followBy()指定
例如对于模式 “a followedBy b “ 事件序列 【a,c,b,d】匹配为{a,b}
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepContiguityRelaxedTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20)
// 定义宽松近邻模式,匹配 age 为 20 的记录后 classId 为 2 的记录
.followedBy("followed").where(_.classId == "2")
/**
1,小红0,20 匹配 1
2,xx,100 匹配 1
CepContiguityRelaxedTest:10> start:Record(1,小红0,20),followed:Record(2,xx,100),
1,小红1,20 匹配 2
1,小红2,20 匹配 2
2,xx,200 匹配 2
CepContiguityRelaxedTest:11> start:Record(1,小红1,20),followed:Record(2,xx,200),
CepContiguityRelaxedTest:12> start:Record(1,小红2,20),followed:Record(2,xx,200),
1,小红3,20 匹配 3
3,xx,100
1,小红4,20 匹配 3
3,xx,100
2,xx,300 匹配 3
CepContiguityRelaxedTest:2> start:Record(1,小红4,20),followed:Record(2,xx,300),
CepContiguityRelaxedTest:1> start:Record(1,小红3,20),followed:Record(2,xx,300),
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val result:StringBuffer = new StringBuffer()
var start: Record = null
var followed: Record = null
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
if (null != pattern.get("followed")){
followed = pattern.get("followed").iterator().next()
result
.append("followed:")
.append(followed.toString)
.append(",")
}
result.toString
}
}
)
result.print("CepContiguityRelaxedTest")
env.execute()
}
4.4.3 非确定性宽松近邻
非确定性宽松近邻:Non-Deterministic Relaxed Contiguity
进一步放宽条件,之前已经匹配过得事件也可以再次使用,由.followedByAny()指定
例如对于模式 “a followedByAny b “ 事件序列 【a,c,b1,b2】匹配为{a,b1},{a,b2}
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepContiguityRelaxedNonDeterministicTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start")
.where(_.age == 20)
// 定义非确定性宽松近邻
.followedByAny("followedByAny").where(_.classId == "2")
/**
1,小红0,20 触发 1 触发 2 触发 3 触发 4 触发 5
2,xx,100 触发 1
CepContiguityRelaxedNonDeterministicTest:2> start:Record(1,小红0,20),followed:Record(2,xx,100),
2,xx,200 触发 2
CepContiguityRelaxedNonDeterministicTest:3> start:Record(1,小红0,20),followed:Record(2,xx,200),
1,小红1,20 触发 3 触发 4 触发 5
1,小红2,20 触发 3 触发 4 触发 5
2,xx,300 触发 3
CepContiguityRelaxedNonDeterministicTest:4> start:Record(1,小红0,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:5> start:Record(1,小红1,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:6> start:Record(1,小红2,20),followed:Record(2,xx,300),
1,小红3,20 触发 4 触发 5
1,小红4,20 触发 4 触发 5
1,小红5,20 触发 4 触发 5
3,xx,300
2,xx,300 》 触发 4
CepContiguityRelaxedNonDeterministicTest:8> start:Record(1,小红1,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:11> start:Record(1,小红4,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:12> start:Record(1,小红5,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:7> start:Record(1,小红0,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:10> start:Record(1,小红3,20),followed:Record(2,xx,300),
CepContiguityRelaxedNonDeterministicTest:9> start:Record(1,小红2,20),followed:Record(2,xx,300),
2,xx,400 触发 5
CepContiguityRelaxedNonDeterministicTest:4> start:Record(1,小红3,20),followed:Record(2,xx,400),
CepContiguityRelaxedNonDeterministicTest:2> start:Record(1,小红1,20),followed:Record(2,xx,400),
CepContiguityRelaxedNonDeterministicTest:3> start:Record(1,小红2,20),followed:Record(2,xx,400),
CepContiguityRelaxedNonDeterministicTest:5> start:Record(1,小红4,20),followed:Record(2,xx,400),
CepContiguityRelaxedNonDeterministicTest:1> start:Record(1,小红0,20),followed:Record(2,xx,400),
CepContiguityRelaxedNonDeterministicTest:6> start:Record(1,小红5,20),followed:Record(2,xx,400),
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val result:StringBuffer = new StringBuffer()
var start: Record = null
var followed: Record = null
// 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
if (null != pattern.get("followedByAny")){
followed = pattern.get("followedByAny").iterator().next()
result
.append("followed:")
.append(followed.toString)
.append(",")
}
result.toString
}
}
)
result.print("CepContiguityRelaxedNonDeterministicTest")
env.execute()
}
4.4.4 严禁近邻
严禁近邻:Never Strict Contiguity
所有事件按照严格的顺序出现,A 事件后不能紧跟随 B 事件
由A .notNext B 指定
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepContiguityStrictNeverTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20)
// 定义严格近邻模式,age 为 20 的记录后跟随 classId 不为 3 的记录才会匹配
.notNext("next").where(_.classId =="3")
/**
1,小红0,20
2,xx,100 > 匹配 1
CepContiguityStrictNeverTest:12> start:Record(1,小红0,20),
1,小红1,20
4,xx,200 >未匹配
2,xx,300 >未匹配
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val result:StringBuffer = new StringBuffer()
var start: Record = null
var next: Record = null
// 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
println(pattern.get("next"))
if (null != pattern.get("next")){
next = pattern.get("next").iterator().next()
result
.append("next:")
.append(next.toString)
.append(",")
}
result.toString
}
}
)
result.print("CepContiguityStrictNeverTest")
env.execute()
}
4.4.5 非中间时间
.notFollowedBy :不想让某个事件在两个事件之间发生
- 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepContiguityNotFollowedByTest extends App{
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
//#########################################################
//#########################################################
//#########################################################
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20)
// 定义严格近邻模式,age 为 20 的记录后跟随 classId 不为 3 的记录才会匹配
.notFollowedBy("notFollowedBy").where(_.classId == "3")
.next("next").where(_.classId == "2")
/**
1,小红0,20 触发 1
2,xx,100 触发 1 》
CepContiguityStrictNeverTest:9> start:Record(1,小红0,20),next:Record(2,xx,100),
1,小红1,20
3,xx,200
2,xx,300
1,小红2,20 触发 2
2,xx,400 》 触发 2
CepContiguityStrictNeverTest:10> start:Record(1,小红2,20),next:Record(2,xx,400),
1,小红3,20
4,小红3,20 触发 3
2,xx,20 > 触发 3
CepContiguityStrictNeverTest:12> start:Record(4,小红3,20),next:Record(2,xx,20),
*/
//#########################################################
//#########################################################
//#########################################################
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(
new PatternSelectFunction[Record, String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val result:StringBuffer = new StringBuffer()
var start: Record = null
var notFollowedBy: Record = null
var next: Record = null
if (null != pattern.get("start")){
start = pattern.get("start").iterator().next()
result
.append("start:")
.append(start.toString)
.append(",")
}
println("notFollowedBy=="+pattern.get("notFollowedBy"))
if (null != pattern.get("notFollowedBy")){
notFollowedBy = pattern.get("notFollowedBy").iterator().next()
result
.append("notFollowedBy:")
.append(notFollowedBy.toString)
.append(",")
}
if (null != pattern.get("next")){
next = pattern.get("next").iterator().next()
result
.append("next:")
.append(next.toString)
.append(",")
}
result.toString
}
}
)
result.print("CepContiguityNotFollowedByTest")
env.execute()
}
5. Pattern 在事件流上检测
- 指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
- 调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
// 定义一个 Pattern, 匹配两秒内处理到的年龄都是 20 岁的两个人
private val pattern: Pattern[Record, Record] = Pattern
.begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 岁
//.next("next").where(_.age == 20) // 匹配第二个获取到的年龄是 20 岁
.within(Time.seconds(2)) // 设定约束时间为 2 秒
import org.apache.flink.cep.scala.CEP
// 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](value, pattern)
6. 选取结果
- 创建 PatternStream 之后,就可以应用 select 或者 flatselect方法,从检测到的事件序列中提取事件
- select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它
- select() 以一个 Map[String,Iterable[N]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所接收到的事件的 Iterable 类型
6.1 Select Funciton
可以通过在 PatternStream 的 Select 方法中传入自定义 Select Funciton 完成对匹配 事件的转换与输出。
Select Funciton 的输入参数为 Map[String, Iterable[IN]],Map 中的 key 为模式序列中的 Pattern 名称,Value 为对应 Pattern 所接受的事件集合,格式为输入事件的数据类型。
6.1.1 Select Funciton抽取正常事件
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object CepSelectFuncitonTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.select(new selectFunction)
result.print("CepSelectFuncitonTest")
env.execute()
}
/**
* 定义 PatternSelectFunction
*/
class selectFunction extends PatternSelectFunction[Record,String]{
override def select(pattern: util.Map[String, util.List[Record]]): String = {
// 从 map 中根据名称获取对应的事件
val start: Record = pattern.get("start").iterator().next()
start.toString
}
}
6.1.2 Select Funciton抽取超时事件
如果模式中有 within(time),那么就很有可能有超时的数据存在,通过 PatternStream,Select 方法分别获取超时事件和正常事件。首先需要创建 OutputTag 来标记超时事件,然后在 PatternStream.select 方法中使用 OutputTag,就可以将超时事件从 PatternStream中抽取出来。
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object CepSelectFunctionOutTimeTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
.next("next").where(_.classId == "2" )
.within(Time.seconds(2))
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
//创建OutputTag,并命名为timeout-output
val timeoutTag = OutputTag[String]("timeout-output")
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] =
patternStream.select(timeoutTag, new OutTimeSelectFunction2, new SelectFunction2)
// 输出正常事件
result.print("CepSelectFunctionOutTimeTest")
// 输出超时事件
result.getSideOutput(timeoutTag).print("timeout-output")
env.execute()
}
// 自定义正常事件序列处理函数
class SelectFunction2() extends PatternSelectFunction[Record,String] {
override def select(pattern: util.Map[String, util.List[Record]]): String = {
val start: Record = pattern.get("start").iterator().next()
s"${start.toString},success"
}
}
// 自定义超时事件序列处理函数
class OutTimeSelectFunction2() extends PatternTimeoutFunction[Record,String]{
override def timeout(pattern: util.Map[String, util.List[Record]], timeoutTimestamp: Long): String = {
val start: Record = pattern.get("start").iterator().next()
s"${start.toString},outTime"
}
}
6.2 Flat Select Funciton
Flat Select Funciton 和 Select Function 相似,不过 Flat Select Funciton 在每次调用可以返回任意数量的结果。
Flat Select Funciton 使用 Collector 作为返回结果的容器,可以将需要输出的事件都放置在 Collector 中返回。
6.2.1 Flat Select Funciton 抽取正常事件
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternFlatSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object CepSelectFuncitonFlatTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
.next("next").where(_.classId == "2")
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.flatSelect(new flatSelectFunction)
result.print("CepSelectFuncitonFlatTest")
env.execute()
}
/**
* 定义一个 PatternFlatSelectFunction
*/
class flatSelectFunction extends PatternFlatSelectFunction[Record,String]{
override def flatSelect(pattern: util.Map[String, util.List[Record]], out: Collector[String]): Unit = {
out.collect(
s"""
|flatSelectFunction:
| ${pattern.get("start").toString},
| ${pattern.get("next").toString}
|""".stripMargin)
}
}
6.2.1 Flat Select Funciton 抽取超时事件
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/4/7 14:06
* */
object CepSelectFunctionFlatOutTimeTest extends App {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 获取数据输入流
val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)
private val recordStream: DataStream[Record] = socketStream
.map(data => {
val arr = data.split(",")
Record(arr(0), arr(1), arr(2).toInt)
})
// 2. 定义一个 Pattern
private val pattern: Pattern[Record, Record] = Pattern
// 设置一个简单条件,匹配年龄为 20 的记录
.begin[Record]("start").where(_.age == 20)
.next("next").where(_.classId == "2")
.within(Time.seconds(2))
// 3. 将创建好的 Pattern 应用到输入事件流上
private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)
// 创建OutputTag,并命名为timeout-output
val timeoutTag = OutputTag[String]("timeout-output")
// 4. 获取事件序列,得到匹配到的数据
private val result: DataStream[String] = patternStream.flatSelect(timeoutTag,new FlatSelectTimeoutFunction,new FlatSelectFunction)
result.print("CepSelectFunctionFlatOutTimeTest")
result.getSideOutput(timeoutTag).print("time out")
env.execute()
}
// 自定义正常事件序列处理函数
class FlatSelectFunction extends PatternFlatSelectFunction[Record,String]{
override def flatSelect(pattern: util.Map[String, util.List[Record]], out: Collector[String]): Unit = {
out.collect(
s"""
|flatSelectFunction:
| ${pattern.get("start").toString},
| ${pattern.get("next").toString}
|""".stripMargin)
}
}
// 自定义超时事件序列处理函数
class FlatSelectTimeoutFunction extends PatternFlatTimeoutFunction[Record,String]{
override def timeout(pattern: util.Map[String, util.List[Record]], timeoutTimestamp: Long, out: Collector[String]): Unit = {
out.collect(
s"""
|flatSelectFunction:
| ${pattern.get("start").toString}
|""".stripMargin)
}
}