Flink系列 17. 复杂事件处理 CEP


1. CEP 是什么

  • CEP 是 Complex Event Processing 三个单词的缩写,表示复杂事件处理。

  • CEP 是 Flink 专门为我们提供的一个基于复杂事件监测处理的库。

  • CEP 通过一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

  • CEP 复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。

  • Flink 基于 DataStrem API 提供了 FlinkCEP 组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。

2. CEP 的特点

  • 目标:从有序的简单事件流中发现一些高阶特征
  • 输入:一个或多个简单事件构成的事件流
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  • 输出:满足规则的复杂事件
使用 CEP 进行事件匹配

3. Pattern 介绍

  • CEP 提供了 Pattern API,定义用于对输入流数据进行复杂事件的规则,用来提取符合规则的事件结果

  • 处理事件的规则,被叫做“模式(Pattern)”

  • 包含四个步骤

    1. 创建输入事件流
    2. 定义 匹配事件 Pattern 的规则,
    3. 将定义的 Pattern 应用在事件流上并对事件流进行检测
    4. 选取符合 Pattern 匹配到的结果
  • 简单的 Pattern API 示例

import java.util

import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time

object CEPTest extends App{

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // 1. 获取数据输入流
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
  private val value: DataStream[Record] = stream1
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  // 2. 定义一个 Pattern, 匹配两秒内处理到的年龄都是 20 岁的两个人
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 岁
    .next("next").where(_.age == 20)// 匹配第二个获取到的年龄是 20 岁
    .within(Time.seconds(2)) // 设定约束时间为 2 秒

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](value, pattern)

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.select(new PatternMacthData)
  result.print("CEPTest")

  env.execute()
}

 /**
  * 定义一个Pattern Select Function 对匹配到的数据进行处理
  */
class PatternMacthData extends PatternSelectFunction[Record,String]{
  override def select(pattern: util.Map[String, util.List[Record]]): String = {
    // 从 map 中根据名称获取对应的事件
    val start: Record = pattern.get("start").iterator().next()
    val next: Record = pattern.get("next").iterator().next()

    s"${start.name} 与 ${next.name} 都是 ${start.age} 岁"
  }
}

4. Pattern 的定义与类型

  • 定义 Pattern 可以是单次执行模式,也可以是循环执行模式。

  • 单次执行模式一次只接受 一个事件

  • 循环执行模式可以接收一个或者多个事件

  • 可以通过指定循环次数将单次执行模式变为循环执行模式。

  • 每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过 where 方法进行叠加。

每个 Pattern 都是通过 begin 方法为开始进行Pattern定义的,下一步通过 Pattern.where()方法在 Pattern 上指定匹配条件,只有当 匹配条件满足之后,当前的 Pattern 才会接受事件。

val pattern = Pattern
                .begin[Event]("start") // 指定匹配模式的名称,后续该名称会作为 key 对应匹配到的数据
                .where(_.id >= "100") // 设置匹配条件, 匹配 id 大于 100 的数据

4.1 定义 Pattern 的条件

每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在 FlinkCFP 中通过 pattern.where()、 pattern.or() 及 pattern.until() 方法来为 Pattern 指定条件,且 Pattern 条件有 Simple Conditions 及Combining Conditions 等类型

  • 使用 Pattern API 之前需要导入 CEP 依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

4.1.1 简单条件

  • 简单条件(Simple Conditions):其主要根据事件中的字段信息进行判断,决定是否接受该事件。

  • 通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件

  private val pattern: Pattern[Record, Record] = Pattern
    // 设置一个简单条件,匹配年龄为 20 的记录
    .begin[Record]("start").where(_.age == 20)
  • 简单条件代码示例
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepConditionSimpleTest extends App{
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //##########################################################################
  //##########################################################################
  //##########################################################################
  // 2.定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    // 设置一个简单条件,匹配年龄为 20 的记录
    .begin[Record]("start").where(_.age == 20)

  //##########################################################################
  //##########################################################################
  //##########################################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record,String]{
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        start.toString
      }
    }
  )

  result.print("CepConditionSimpleTest")

  env.execute()
}

4.1.2 组合条件

  • 组合条件(Combining Conditions):是将简单条件进行合并
  • .where() 方法进行条件的组合,表示 AND
  • .or() 方法进行条件的组合,表示 OR
//  把id 大于等于 100 或者年龄大于 30 的事件挑选出来 
val start = Pattern.begin[Event]("start")
                .where(_.id >= "100").or(_.age >30)
  • 组合条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepConditionCombiningTest extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  recordStream.print()
  //#########################################################
  //#########################################################
  //#########################################################

  // 2. 定义一个 Pattern
  private val pattern1: Pattern[Record, Record] = Pattern
    // 匹配年龄为 20 的记录
    .begin[Record]("start")
    .where(_.age == 20) // age == 20


  private val pattern2: Pattern[Record, Record] = Pattern
    // 匹配年龄为 20 或年龄为 18 的记录
    .begin[Record]("start")
    .where(_.age == 20)
    .or( _.age==18 )   // (age==20 || age == 18)



  private val pattern3: Pattern[Record, Record] = Pattern
    // 匹配年龄为 20 或年龄为 18 但 classId 为 1 的记录
    .begin[Record]("start")
    .where(_.age == 20)
    .or( _.age==18 )
    .where(_.classId=="1") // ((age==20 || age == 18)&& classId == "1")


  private val pattern4: Pattern[Record, Record] = Pattern
    // 匹配年龄为 20 或年龄为 18 但 classId 为 1  或 name为小明的记录
    .begin[Record]("start")
    .where(_.age == 20)
    .or( _.age==18 )
    .where(_.classId=="1")
    .or(_.name == "小明") // ((age==20 || age == 18)&& classId == "1") || name== "小明"

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern4)

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record,String]{
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        start.toString
      }
    }
  )

  result.print("CepConditionSimpleTest")

  env.execute()
}

4.1.3 迭代条件

  • 迭代条件:(Iterative Condition)

  • 对前面条件匹配到的数据再次进行匹配

def where(condition: IterativeCondition[F]): Pattern[T, F] = {
    jPattern.where(condition)
    this
  }
  • 迭代条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepConditionIterativeTest  extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  recordStream.print()

  //#########################################################
  //#########################################################
  //#########################################################

  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    // 匹配年龄为 20 或年龄为 18 的记录
    .begin[Record]("start")
    .where(_.age == 20)
    .or( _.age==18 )   // (age==20 || age == 18)
    .where(new MyIterativeCondition())

  // 定义迭代匹配条件
  class MyIterativeCondition extends IterativeCondition[Record]{
    override def filter(value: Record, ctx: IterativeCondition.Context[Record]): Boolean = {
      if(value.name=="小明"){ // 前面匹配到的记录 name 不能为小明
        false
      }else if (value.classId == "1"){ // 前面匹配到的数据 classId 为 1 才会匹配成功
        true
      }else{ // 其他情况 匹配不成功
        false
      }
    }
  }

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record,String]{
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        start.toString
      }
    }
  )

  result.print("CepConditionIterativeTest")

  env.execute()
}

4.1.4 终止条件

  • 终止条件( Stop condition):如果程序中使用了 oneOrMore 或者 oneOrMore().optional() 方法,还可以指定停止条件,否则模式中的规则会一直循环下去,如下终止条件通过 until()方法指定
  • 终止条件测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepConditionStopTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  //#########################################################
  //#########################################################
  //#########################################################

  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    // 匹配所有 age 为 20 的记录,一直到接到 classId 为 2 的记录
    .begin[Record]("start")
    .where(_.age == 20)
    .oneOrMore
    .until(_.classId=="2")

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record,String]{
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        start.toString
      }
    }
  )

  result.print("CepConditionStopTest")

  env.execute()
}

4.2 设置时间约束

可以为模式指定时间约束,用来要求在多长时间内匹配有效

 // 定义一个 Pattern, 匹配五秒内处理到的年龄都是 20 的两个事件
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 的事件
    .next("next").where(_.age == 20)// 匹配第二个获取到的年龄是 20 的事件
    .within(Time.seconds(5)) // 设置时间约束

4.3 设置循环 Pattern

对于已经创建好的 Pattern,可以在一个简单条件后追加量词,也就是指定循环次数,形成循环执行的 Pattern

4.3.1 times

  • 可以通过 times 指定固定的循环执行次数
  • 不会匹配出累积到触发次数最后一条数据,可以想象为数组下标最大值为 数组长度-1
//指定循环触发4次 
start.times(3)

//可以执行触发次数范围,让循环执行次数在该范围之内 
start.times(2, 4)
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepQuantifierTimesTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
/*  
    private val pattern: Pattern[Record, Record] = Pattern
      .begin[Record]("start")
      .where(_.age == 20).times(2)
      .next("next").where(_.classId == "2")
*/

/*
  1,小红1,20
  1,小红2,20
  2,小小小小小小,22 》 Record(1,小红1,20),Record(2,小小小小小小,22)
  1,小明3,20
  1,小红4,20
  1,小红5,20
  2,小小小小小小,22 》Record(1,小红4,20),Record(2,小小小小小小,22)
*/

  // 在 classId 为 2 之前 累积 age 匹配次数为 2或3或4 次时触发
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start")
    .where(_.age == 20).times(2, 4)
    .next("next").where(_.classId == "2")

/**
  1,小红0,20 -》触发 1
  2,xx,100 未触发,因为 age==20  只累积一次

  1,小红1,20 -》触发 1
  1,小红2,20 -》触发2
  2,xx,200 触发1 》 Record(1,小红1,20),Record(2,xx,200)
                  Record(1,小红0,20),Record(2,xx,200)

  1,小红3,20 -》触发2
  1,小红4,20 -》触发2
  1,小红5,20 -》触发 3 前累积5 次 超过最大值的 4次,故丢弃该数据
  2,xx,300 触发2 》 Record(1,小红3,20),Record(2,xx,300)
                  Record(1,小红2,20),Record(2,xx,300)
                  Record(1,小红4,20),Record(2,xx,300)

  1,小红6,20 触发3
  1,小红7,20 触发3
  1,小红8,20 触发3
  1,小红9,20 -》触发 3 前累积7 次 超过最大值的 4次,故丢弃该数据
  2,xx,400 触发3 》Record(1,小红6,20),Record(2,xx,400)
                  Record(1,小红8,20),Record(2,xx,400)
                  Record(1,小红7,20),Record(2,xx,400)

  1,小红10,20 -》触发 3 前累积7 次 超过最大值的 4次,故丢弃该数据
  1,小红11,20 触发4
  1,小红12,20 触发4
  1,小红13,20 触发4
  1,小红14,20 未触发
  2,xx,400 触发4 》Record(1,小红11,20),Record(2,xx,400)
                  Record(1,小红12,20),Record(2,xx,400)
                  Record(1,小红13,20),Record(2,xx,400)
  2,xx,500 未触发
*/

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        val next: Record = pattern.get("next").iterator().next()
        s"${start.toString},${next.toString}"
      }
    }
  )


  result.print("CepQuantifierTimesTest")

  env.execute()
}

4.3.2 optional

  • 可以通过 optional 关键字指定触发条件有两个
  • 当到达触发条件时出现的次数为 0 会触发
  • 当到达触发条件是出现次数达到指定次数触发
  • 注意,当出现次数为 0 触发时,获取对应事件时可能为 null
// 指定条件出现 4 次或 0 次 才会触发
start.times(4).optional


// 指定条件出现 2、3、4 次或 0 次 才会触发
start.times(2, 4).optional
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepQuantifierOptionalTest  extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
    private val pattern: Pattern[Record, Record] = Pattern
      .begin[Record]("start")
      .where(_.age == 20).times(2).optional
      .next("next").where(_.classId == "2")
  /**
    * 2,xx,100 > end:Record(2,xx,100)
    * 1,小红1,20
    * 1,小红2,20
    * 2,xx,300 >
    *             end:Record(2,xx,300)
    *             start:Record(1,小红1,20),end:Record(2,xx,300)
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        //
        val result:StringBuffer = new StringBuffer()
        var start: Record = null

        // 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }
        val next: Record = pattern.get("next").iterator().next()
        result.append("end:").append(next.toString)
        result.toString
      }
    }
  )

  result.print("CepQuantifierOptionalTest")

  env.execute()
}

4.3.3 greedy

  • 可以通过 greedy 将 Pattern 标记为贪婪模式
  • 在 Pattern 匹配成功的前提下,会尽可能多次触发
  • 下一次触发可能会重复匹配前一次触发时匹配到的数据
// 指定条件出现 2、3、4次触发,并且匹配尽可能多的次数
start.times(2, 4).greedy

// 指定条件出现 0、2、3、4次触发,并且匹配尽可能多的次数
start.times(2, 4).optional.greedy
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepQuantifierGreedyTest  extends App {


  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start")
    // 匹配在 classId 为 2 之前,出现 2-7 次 age 为 20 的记录,尽可能多的匹配,会出现一条记录多次批评的记录
    .where(_.age == 20).times(2,7).greedy
    .next("next").where(_.classId == "2")
  /**
  1,小红0,20 触发 1 触发 2
2,xx,100
1,小红1,20 触发 1 触发 2
1,小红2,20 触发 2
2,xx,200 触发 1>
          start:Record(1,小红0,20),end:Record(2,xx,200)
          start:Record(1,小红1,20),end:Record(2,xx,200)

1,小红3,20 触发 2 触发 3
1,小红4,20 触发 2 触发 3
1,小红5,20 触发 3 触发 4
2,xx,300 触发 2>
          start:Record(1,小红0,20),end:Record(2,xx,300)
          start:Record(1,小红2,20),end:Record(2,xx,300)
          start:Record(1,小红1,20),end:Record(2,xx,300)
          start:Record(1,小红4,20),end:Record(2,xx,300)
          start:Record(1,小红3,20),end:Record(2,xx,300)

1,小红6,20 触发 3 触发 4 触发 5
1,小红7,20 触发 3 触发 4 触发 5
1,小红8,20 触发 3 触发 4 触发 5
1,小红9,20 触发 4 触发 5
2,xx,400 触发 3 >
          start:Record(1,小红8,20),end:Record(2,xx,400)
          start:Record(1,小红6,20),end:Record(2,xx,400)
          start:Record(1,小红5,20),end:Record(2,xx,400)
          start:Record(1,小红3,20),end:Record(2,xx,400)
          start:Record(1,小红4,20),end:Record(2,xx,400)
          start:Record(1,小红7,20),end:Record(2,xx,400)

1,小红10,20 触发 4 触发 5
1,小红11,20 触发 5
2,xx,400 触发 4 >
            start:Record(1,小红7,20),end:Record(2,xx,400)
            start:Record(1,小红10,20),end:Record(2,xx,400)
            start:Record(1,小红8,20),end:Record(2,xx,400)
            start:Record(1,小红5,20),end:Record(2,xx,400)
            start:Record(1,小红6,20),end:Record(2,xx,400)
            start:Record(1,小红9,20),end:Record(2,xx,400)

1,小红12,20
2,xx,500 触发 5>
          start:Record(1,小红10,20),end:Record(2,xx,500)
          start:Record(1,小红9,20),end:Record(2,xx,500)
          start:Record(1,小红8,20),end:Record(2,xx,500)
          start:Record(1,小红7,20),end:Record(2,xx,500)
          start:Record(1,小红11,20),end:Record(2,xx,500)
          start:Record(1,小红6,20),end:Record(2,xx,500)

2,xx,600 未触发
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        //
        val result:StringBuffer = new StringBuffer()
        var start: Record = null

        // 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }
        val next: Record = pattern.get("next").iterator().next()
        result.append("end:").append(next.toString)
        result.toString
      }
    }
  )

  result.print("CepQuantifierGreedyTest")

  env.execute()
}

4.3.4 oneOrMore

  • 可以通过 oneOrMore 方法指定触发一次或多次
  • 匹配触发条件前的一次到多次
  • 已经前一次匹配的数据在后一次触发还会被匹配到
  • 与 times 不同的是会匹配到触发事件及其之前的所有数据
// 触发一次或者多次 
start.oneOrMore()

//触发一次或者多次,尽可能重复执行 
start.oneOrMore().greedy() 

// 触发0次或者多次 
start.oneOrMore().optional() 

// 触发0次或者多次,尽可能重复执行 
start.oneOrMore().optional().greedy()
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepQuantifierOneOrMoreTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  // 当 遇见 classId 为 2 之前 累积 age==20 匹配一次或多次时触发
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start")
    .where(_.age == 20).oneOrMore
    .next("next").where(_.classId == "2")

  /**
1,小红0,20 触发 1 触发 2 触发 3
2,xx,100》 触发 1
    Record(1,小红0,20),Record(2,xx,100)
1,小红1,20 触发 2 触发 3
1,小红2,20  触发 2 触发 3
2,xx,200 》 触发 2 触发 3
    Record(1,小红2,20),Record(2,xx,200)
    Record(1,小红0,20),Record(2,xx,200)
    Record(1,小红1,20),Record(2,xx,200)

1,小红3,20 触发 3
1,小红4,20 触发 3
1,小红5,20 触发 3
2,xx,300》 触发 3
    Record(1,小红5,20),Record(2,xx,300)
    Record(1,小红3,20),Record(2,xx,300)
    Record(1,小红0,20),Record(2,xx,300)
    Record(1,小红4,20),Record(2,xx,300)
    Record(1,小红2,20),Record(2,xx,300)
    Record(1,小红1,20),Record(2,xx,300)
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        val next: Record = pattern.get("next").iterator().next()
        s"${start.toString},${next.toString}"
      }
    }
  )

  result.print("CepQuantifierOneOrMoreTest")

  env.execute()
}

4.3.5 timesOrMor

通过 timesOrMore 方法可以指定触发固定次数以上,例如执行两次以上

// 触发两次或者多次 
start.timesOrMore(2);

// 触发两次或者多次,尽可能重复执行 
start.timesOrMore(2).greedy()

// 不触发或者触发两次以上,尽可能重执行 
start.timesOrMore(2).optional().greedy()
  • 测试点代码
package com.hnbian.flink.cep

import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepQuantifierTimesOrMorTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })


  //#########################################################
  //#########################################################
  //#########################################################

  // 2. 定义一个 Pattern
  // 当 遇见 classId 为 2 之前 累积 age==20 匹配两次或多次时触发
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start")
    .where(_.age == 20).timesOrMore(2)
    .next("next").where(_.classId == "2")

/**
1,小红0,20 触发1  触发2
2,xx,100
1,小红1,20 触发1  触发2
1,小红2,20  触发2
2,xx,200 》 触发1
    CepQuantifierTimesOrMorTest:6> Record(1,小红0,20),Record(2,xx,200)
    CepQuantifierTimesOrMorTest:7> Record(1,小红1,20),Record(2,xx,200)
1,小红3,20  触发2
1,小红4,20  触发2
1,小红5,20 
2,xx,300 》 触发2
  CepQuantifierTimesOrMorTest:9> Record(1,小红1,20),Record(2,xx,300)
  CepQuantifierTimesOrMorTest:12> Record(1,小红4,20),Record(2,xx,300)
  CepQuantifierTimesOrMorTest:10> Record(1,小红2,20),Record(2,xx,300)
  CepQuantifierTimesOrMorTest:11> Record(1,小红3,20),Record(2,xx,300)
  CepQuantifierTimesOrMorTest:8> Record(1,小红0,20),Record(2,xx,300)
*/

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()
        val next: Record = pattern.get("next").iterator().next()
        s"${start.toString},${next.toString}"
      }
    }
  )

  result.print("CepQuantifierTimesOrMorTest")

  env.execute()
}

4.3.6 设置条件注意项

  1. 模式序列不能以 .notFollowedBy() 结束

  2. not ” 类型的模式不能被 optional 所修饰

4.4 组合模式

  • 将相互独立的模式进行组合然后形成模式序列。

  • 模式序列基本的编写方式和独立模式一 致,各个模式之间通过邻近条件进行连接即可,

  • 其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。

  • 模式序列必须是以一个“begin”开始:val start = Pattern.begin(“start”)

4.4.1 严格近邻

  • 严格近邻:Strict Contiguity

  • 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定

  • 例如对于模式 “a next b “ 事件序列 【a,c,b,d】则没有匹配

严格近邻
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepContiguityStrictTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20)
        // 定义严格近邻模式,age 为 20 的记录后必须紧跟 classId 为 2 的记录才会匹配
    .next("next").where(_.classId == "2")
/**
1,小红0,20 》 匹配 1
2,xx,100 》 匹配 1
            CepContiguityStrictTest:6> Record(1,小红0,20),Record(2,xx,100)
1,小红1,20 》未匹配
1,小红2,20 》未匹配
3,xx,200 》未匹配
1,小红3,20 》未匹配
1,小红4,20 》未匹配
1,小红5,20 》 匹配 2
2,xx,300 》 匹配 2
        CepContiguityStrictTest:7> Record(1,小红5,20),Record(2,xx,300)
*/

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val start: Record = pattern.get("start").iterator().next()

        val next: Record = pattern.get("next").iterator().next()

        s"${start.toString},${next.toString}"

      }
    }
  )

  result.print("CepContiguityStrictTest")
  env.execute()
}

4.4.2 宽松近邻

  • 宽松近邻(Relaxed Contiguity)

  • 允许中间出现不匹配的时间,由.followBy()指定

  • 例如对于模式 “a followedBy b “ 事件序列 【a,c,b,d】匹配为{a,b}

宽松近邻
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepContiguityRelaxedTest  extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20)
    // 定义宽松近邻模式,匹配 age 为 20 的记录后 classId 为 2 的记录
    .followedBy("followed").where(_.classId == "2")
  /**
1,小红0,20 匹配 1
2,xx,100 匹配 1
    CepContiguityRelaxedTest:10> start:Record(1,小红0,20),followed:Record(2,xx,100),

1,小红1,20 匹配 2
1,小红2,20 匹配 2
2,xx,200 匹配 2
    CepContiguityRelaxedTest:11> start:Record(1,小红1,20),followed:Record(2,xx,200),
    CepContiguityRelaxedTest:12> start:Record(1,小红2,20),followed:Record(2,xx,200),

1,小红3,20 匹配 3
3,xx,100
1,小红4,20 匹配 3
3,xx,100
2,xx,300 匹配 3
    CepContiguityRelaxedTest:2> start:Record(1,小红4,20),followed:Record(2,xx,300),
    CepContiguityRelaxedTest:1> start:Record(1,小红3,20),followed:Record(2,xx,300),
*/
  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val result:StringBuffer = new StringBuffer()
        var start: Record = null
        var followed: Record = null

        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }

        if (null != pattern.get("followed")){
          followed = pattern.get("followed").iterator().next()
          result
            .append("followed:")
            .append(followed.toString)
            .append(",")
        }

        result.toString

      }
    }
  )

  result.print("CepContiguityRelaxedTest")
  env.execute()
}

4.4.3 非确定性宽松近邻

  • 非确定性宽松近邻:Non-Deterministic Relaxed Contiguity

  • 进一步放宽条件,之前已经匹配过得事件也可以再次使用,由.followedByAny()指定

  • 例如对于模式 “a followedByAny b “ 事件序列 【a,c,b1,b2】匹配为{a,b1},{a,b2}

非确定性宽松近邻
  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepContiguityRelaxedNonDeterministicTest  extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start")
    .where(_.age == 20)
    // 定义非确定性宽松近邻
    .followedByAny("followedByAny").where(_.classId == "2")
  /**
1,小红0,20 触发 1 触发 2  触发 3 触发 4 触发 5
2,xx,100 触发 1
    CepContiguityRelaxedNonDeterministicTest:2> start:Record(1,小红0,20),followed:Record(2,xx,100),
2,xx,200 触发 2
    CepContiguityRelaxedNonDeterministicTest:3> start:Record(1,小红0,20),followed:Record(2,xx,200),
1,小红1,20  触发 3 触发 4 触发 5
1,小红2,20  触发 3 触发 4 触发 5
2,xx,300 触发 3
    CepContiguityRelaxedNonDeterministicTest:4> start:Record(1,小红0,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:5> start:Record(1,小红1,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:6> start:Record(1,小红2,20),followed:Record(2,xx,300),
1,小红3,20 触发 4 触发 5
1,小红4,20 触发 4 触发 5
1,小红5,20 触发 4 触发 5
3,xx,300
2,xx,300 》 触发 4
    CepContiguityRelaxedNonDeterministicTest:8> start:Record(1,小红1,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:11> start:Record(1,小红4,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:12> start:Record(1,小红5,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:7> start:Record(1,小红0,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:10> start:Record(1,小红3,20),followed:Record(2,xx,300),
    CepContiguityRelaxedNonDeterministicTest:9> start:Record(1,小红2,20),followed:Record(2,xx,300),

2,xx,400 触发 5
  CepContiguityRelaxedNonDeterministicTest:4> start:Record(1,小红3,20),followed:Record(2,xx,400),
  CepContiguityRelaxedNonDeterministicTest:2> start:Record(1,小红1,20),followed:Record(2,xx,400),
  CepContiguityRelaxedNonDeterministicTest:3> start:Record(1,小红2,20),followed:Record(2,xx,400),
  CepContiguityRelaxedNonDeterministicTest:5> start:Record(1,小红4,20),followed:Record(2,xx,400),
  CepContiguityRelaxedNonDeterministicTest:1> start:Record(1,小红0,20),followed:Record(2,xx,400),
  CepContiguityRelaxedNonDeterministicTest:6> start:Record(1,小红5,20),followed:Record(2,xx,400),
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val result:StringBuffer = new StringBuffer()
        var start: Record = null
        var followed: Record = null

        // 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }

        if (null != pattern.get("followedByAny")){
          followed = pattern.get("followedByAny").iterator().next()
          result
            .append("followed:")
            .append(followed.toString)
            .append(",")
        }

        result.toString

      }
    }
  )

  result.print("CepContiguityRelaxedNonDeterministicTest")

  env.execute()
}

4.4.4 严禁近邻

  • 严禁近邻:Never Strict Contiguity

  • 所有事件按照严格的顺序出现,A 事件后不能紧跟随 B 事件

  • 由A .notNext B 指定

  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

object CepContiguityStrictNeverTest extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################

  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20)
    // 定义严格近邻模式,age 为 20 的记录后跟随 classId 不为 3 的记录才会匹配
    .notNext("next").where(_.classId =="3")
  /**
1,小红0,20
2,xx,100 > 匹配 1
    CepContiguityStrictNeverTest:12> start:Record(1,小红0,20),
1,小红1,20
4,xx,200 >未匹配
2,xx,300 >未匹配
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val result:StringBuffer = new StringBuffer()
        var start: Record = null
        var next: Record = null

        // 当只有 next 匹配到的时候也会触发,这是 start 未匹配到为空,需要进行判断
        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }

        println(pattern.get("next"))
        if (null != pattern.get("next")){
          next = pattern.get("next").iterator().next()
          result
            .append("next:")
            .append(next.toString)
            .append(",")
        }

        result.toString
      }
    }
  )

  result.print("CepContiguityStrictNeverTest")
  env.execute()
}

4.4.5 非中间时间

.notFollowedBy :不想让某个事件在两个事件之间发生

  • 测试代码
import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._


object CepContiguityNotFollowedByTest extends App{

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  //#########################################################
  //#########################################################
  //#########################################################


  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20)
    // 定义严格近邻模式,age 为 20 的记录后跟随 classId 不为 3 的记录才会匹配
    .notFollowedBy("notFollowedBy").where(_.classId == "3")
    .next("next").where(_.classId == "2")
  /**
1,小红0,20 触发 1
2,xx,100 触发 1 》
    CepContiguityStrictNeverTest:9> start:Record(1,小红0,20),next:Record(2,xx,100),

1,小红1,20
3,xx,200
2,xx,300
1,小红2,20 触发 2
2,xx,400 》 触发 2
    CepContiguityStrictNeverTest:10> start:Record(1,小红2,20),next:Record(2,xx,400),
1,小红3,20
4,小红3,20 触发 3
2,xx,20 > 触发 3
    CepContiguityStrictNeverTest:12> start:Record(4,小红3,20),next:Record(2,xx,20),
    */

  //#########################################################
  //#########################################################
  //#########################################################

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 4. 获取事件序列,得到匹配到的数据

  private val result: DataStream[String] = patternStream.select(
    new PatternSelectFunction[Record, String] {
      override def select(pattern: util.Map[String, util.List[Record]]): String = {
        // 从 map 中根据名称获取对应的事件
        val result:StringBuffer = new StringBuffer()
        var start: Record = null
        var notFollowedBy: Record = null
        var next: Record = null

        if (null != pattern.get("start")){
          start = pattern.get("start").iterator().next()
          result
            .append("start:")
            .append(start.toString)
            .append(",")
        }

        println("notFollowedBy=="+pattern.get("notFollowedBy"))
        if (null != pattern.get("notFollowedBy")){
          notFollowedBy = pattern.get("notFollowedBy").iterator().next()
          result
            .append("notFollowedBy:")
            .append(notFollowedBy.toString)
            .append(",")
        }

        if (null != pattern.get("next")){
          next = pattern.get("next").iterator().next()
          result
            .append("next:")
            .append(next.toString)
            .append(",")
        }

        result.toString

      }
    }
  )

  result.print("CepContiguityNotFollowedByTest")
  env.execute()
}

5. Pattern 在事件流上检测

  • 指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
  • 调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
 // 定义一个 Pattern, 匹配两秒内处理到的年龄都是 20 岁的两个人
 private val pattern: Pattern[Record, Record] = Pattern
    .begin[Record]("start").where(_.age == 20) // 匹配第一个获取到的年龄是 20 岁
    //.next("next").where(_.age == 20) // 匹配第二个获取到的年龄是 20 岁
    .within(Time.seconds(2)) // 设定约束时间为 2 秒

  import org.apache.flink.cep.scala.CEP
  // 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](value, pattern)

6. 选取结果

  • 创建 PatternStream 之后,就可以应用 select 或者 flatselect方法,从检测到的事件序列中提取事件
  • select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它
  • select() 以一个 Map[String,Iterable[N]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所接收到的事件的 Iterable 类型

6.1 Select Funciton

  • 可以通过在 PatternStream 的 Select 方法中传入自定义 Select Funciton 完成对匹配 事件的转换与输出。

  • Select Funciton 的输入参数为 Map[String, Iterable[IN]],Map 中的 key 为模式序列中的 Pattern 名称,Value 为对应 Pattern 所接受的事件集合,格式为输入事件的数据类型。

6.1.1 Select Funciton抽取正常事件

  import java.util

  import com.hnbian.flink.common.Record
  import org.apache.flink.cep.PatternSelectFunction
  import org.apache.flink.cep.scala.{CEP, PatternStream}
  import org.apache.flink.cep.scala.pattern.Pattern
  import org.apache.flink.streaming.api.scala._

  object CepSelectFuncitonTest  extends App {

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 1. 获取数据输入流
    val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

    private val recordStream: DataStream[Record] = socketStream
      .map(data => {
        val arr = data.split(",")
        Record(arr(0), arr(1), arr(2).toInt)
      })

    // 2. 定义一个 Pattern
    private val pattern: Pattern[Record, Record] = Pattern
      // 设置一个简单条件,匹配年龄为 20 的记录
      .begin[Record]("start").where(_.age == 20)

    // 3. 将创建好的 Pattern 应用到输入事件流上
    private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

    // 4. 获取事件序列,得到匹配到的数据
    private val result: DataStream[String] = patternStream.select(new selectFunction)

    result.print("CepSelectFuncitonTest")

    env.execute()
  }

  /**
    * 定义 PatternSelectFunction
    */
  class selectFunction extends PatternSelectFunction[Record,String]{
    override def select(pattern: util.Map[String, util.List[Record]]): String = {
      // 从 map 中根据名称获取对应的事件
      val start: Record = pattern.get("start").iterator().next()
      start.toString
    }
  }

6.1.2 Select Funciton抽取超时事件

如果模式中有 within(time),那么就很有可能有超时的数据存在,通过 PatternStream,Select 方法分别获取超时事件和正常事件。首先需要创建 OutputTag 来标记超时事件,然后在 PatternStream.select 方法中使用 OutputTag,就可以将超时事件从 PatternStream中抽取出来。

import java.util
import com.hnbian.flink.common.Record
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object CepSelectFunctionOutTimeTest  extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    // 设置一个简单条件,匹配年龄为 20 的记录
    .begin[Record]("start").where(_.age == 20)
    .next("next").where(_.classId == "2" )
    .within(Time.seconds(2))

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  //创建OutputTag,并命名为timeout-output
  val timeoutTag = OutputTag[String]("timeout-output")

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = 
  patternStream.select(timeoutTag, new OutTimeSelectFunction2, new SelectFunction2)

  // 输出正常事件
  result.print("CepSelectFunctionOutTimeTest")
  // 输出超时事件
  result.getSideOutput(timeoutTag).print("timeout-output")

  env.execute()
}

// 自定义正常事件序列处理函数
class SelectFunction2() extends PatternSelectFunction[Record,String] {
  override def select(pattern: util.Map[String, util.List[Record]]): String = {
    val start: Record = pattern.get("start").iterator().next()
    s"${start.toString},success"
  }
}

// 自定义超时事件序列处理函数
class OutTimeSelectFunction2() extends PatternTimeoutFunction[Record,String]{
  override def timeout(pattern: util.Map[String, util.List[Record]], timeoutTimestamp: Long): String = {
    val start: Record = pattern.get("start").iterator().next()
    s"${start.toString},outTime"
  }
}

6.2 Flat Select Funciton

  • Flat Select Funciton 和 Select Function 相似,不过 Flat Select Funciton 在每次调用可以返回任意数量的结果。

  • Flat Select Funciton 使用 Collector 作为返回结果的容器,可以将需要输出的事件都放置在 Collector 中返回。

6.2.1 Flat Select Funciton 抽取正常事件

  import java.util
  import com.hnbian.flink.common.Record
  import org.apache.flink.cep.PatternFlatSelectFunction
  import org.apache.flink.cep.scala.{CEP, PatternStream}
  import org.apache.flink.cep.scala.pattern.Pattern
  import org.apache.flink.streaming.api.scala._
  import org.apache.flink.util.Collector

  object CepSelectFuncitonFlatTest  extends App {

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 1. 获取数据输入流
    val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

    private val recordStream: DataStream[Record] = socketStream
      .map(data => {
        val arr = data.split(",")
        Record(arr(0), arr(1), arr(2).toInt)
      })

    // 2. 定义一个 Pattern
    private val pattern: Pattern[Record, Record] = Pattern
      // 设置一个简单条件,匹配年龄为 20 的记录
      .begin[Record]("start").where(_.age == 20)
      .next("next").where(_.classId == "2")

    // 3. 将创建好的 Pattern 应用到输入事件流上
    private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

    // 4. 获取事件序列,得到匹配到的数据
    private val result: DataStream[String] = patternStream.flatSelect(new flatSelectFunction)

    result.print("CepSelectFuncitonFlatTest")

    env.execute()

  }

  /**
    * 定义一个 PatternFlatSelectFunction
    */
  class flatSelectFunction extends PatternFlatSelectFunction[Record,String]{

    override def flatSelect(pattern: util.Map[String, util.List[Record]], out: Collector[String]): Unit = {

      out.collect(
        s"""
           |flatSelectFunction:
           |  ${pattern.get("start").toString},
           |  ${pattern.get("next").toString}
           |""".stripMargin)
    }
  }

6.2.1 Flat Select Funciton 抽取超时事件

import java.util

import com.hnbian.flink.common.Record
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**
  * @Author haonan.bian
  * @Description //TODO
  * @Date 2021/4/7 14:06 
  * */
object CepSelectFunctionFlatOutTimeTest extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  // 1. 获取数据输入流
  val socketStream: DataStream[String] = env.socketTextStream("localhost",9999)

  private val recordStream: DataStream[Record] = socketStream
    .map(data => {
      val arr = data.split(",")
      Record(arr(0), arr(1), arr(2).toInt)
    })

  // 2. 定义一个 Pattern
  private val pattern: Pattern[Record, Record] = Pattern
    // 设置一个简单条件,匹配年龄为 20 的记录
    .begin[Record]("start").where(_.age == 20)
    .next("next").where(_.classId == "2")
    .within(Time.seconds(2))

  // 3. 将创建好的 Pattern 应用到输入事件流上
  private val patternStream: PatternStream[Record] = CEP.pattern[Record](recordStream, pattern)

  // 创建OutputTag,并命名为timeout-output
  val timeoutTag = OutputTag[String]("timeout-output")

  // 4. 获取事件序列,得到匹配到的数据
  private val result: DataStream[String] = patternStream.flatSelect(timeoutTag,new FlatSelectTimeoutFunction,new FlatSelectFunction)

  result.print("CepSelectFunctionFlatOutTimeTest")
  result.getSideOutput(timeoutTag).print("time out")

  env.execute()

}

// 自定义正常事件序列处理函数
class FlatSelectFunction extends PatternFlatSelectFunction[Record,String]{

  override def flatSelect(pattern: util.Map[String, util.List[Record]], out: Collector[String]): Unit = {

    out.collect(
      s"""
         |flatSelectFunction:
         |  ${pattern.get("start").toString},
         |  ${pattern.get("next").toString}
         |""".stripMargin)
  }
}

// 自定义超时事件序列处理函数
class FlatSelectTimeoutFunction extends PatternFlatTimeoutFunction[Record,String]{
  override def timeout(pattern: util.Map[String, util.List[Record]], timeoutTimestamp: Long, out: Collector[String]): Unit = {
    out.collect(
      s"""
         |flatSelectFunction:
         |  ${pattern.get("start").toString}
         |""".stripMargin)
  }
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink CEP 的使用场景与示例 Flink CEP 的使用场景与示例
1. 检测登录用户的 IP 变化 使用场景 在我们操作某些银行APP的时候,经常会发现,如果上一个操作与下一个操作IP变换了例如上一个操作使用的流量操作,下一个操作我连接上了wifi去操作,这时IP就会发生变化,那么APP就要求我们重新进行
2020-11-07
下一篇 
Flink系列 16. 介绍Flink中状态一致性的保证 Flink系列 16. 介绍Flink中状态一致性的保证
1. 一致性1.1 介绍状态一致性 有状态的流处理,内部每个算子任务都可以有自己的状态 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的要保证计算结果准确,一条数据有也不丢失,也不会重复计算数据 在程序遇到故障时可以恢复任务状态,
2020-10-13
  目录