Flink CEP 的使用场景与示例


1. 检测登录用户的 IP 变化

  • 使用场景

    在我们操作某些银行APP的时候,经常会发现,如果上一个操作与下一个操作IP变换了例如上一个操作使用的流量操作,下一个操作我连接上了wifi去操作,这时IP就会发生变化,那么APP就要求我们重新进行登录,避免由于IP变换产生的风险操作。

  • 需求

    用户上一个操作与下一个操作IP变换报警

  • 数据格式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
192.168.145.77,sunwukong,https://icbc.com.cn/login.html,2020-02-12 12:23:47
192.168.145.77,sunwukong,https://icbc.com.cn/transfer.html,2020-02-12 12:23:49
192.168.145.77,sunwukong,https://icbc.com.cn/save.html,2020-02-12 12:23:52
192.168.145.77,sunwukong,https://icbc.com.cn/buy.html,2020-02-12 12:23:58
192.168.89.189,sunwukong,https://icbc.com.cn/pay.html,2020-02-12 12:24:05
192.168.89.189,sunwukong,https://icbc.com.cn/login.html,2020-02-12 12:24:07
192.168.89.189,sunwukong,https://icbc.com.cn/pay.html,2020-02-12 12:24:09
192.168.89.189,sunwukong,https://icbc.com.cn/pay.html,2020-02-12 12:24:15

192.168.52.100,zhubajie,https://icbc.com.cn/login.html,2020-02-12 12:23:45
192.168.52.100,zhubajie,https://icbc.com.cn/transfer.html,2020-02-12 12:23:47
192.168.52.100,zhubajie,https://icbc.com.cn/save.html,2020-02-12 12:23:53
192.168.52.100,zhubajie,https://icbc.com.cn/buy.html,2020-02-12 12:23:59
192.168.44.110,zhubajie,https://icbc.com.cn/pay.html,2020-02-12 12:24:03
192.168.44.110,zhubajie,https://icbc.com.cn/login.html,2020-02-12 12:24:04
192.168.44.110,zhubajie,https://icbc.com.cn/pay.html,2020-02-12 12:24:06
192.168.44.110,zhubajie,https://icbc.com.cn/pay.html,2020-02-12 12:24:12

192.168.54.172,tangseng,https://icbc.com.cn/login.html,2020-02-12 12:23:46
192.168.54.172,tangseng,https://icbc.com.cn/transfer.html,2020-02-12 12:23:48
192.168.54.172,tangseng,https://icbc.com.cn/save.html,2020-02-12 12:23:54
192.168.54.172,tangseng,https://icbc.com.cn/buy.html,2020-02-12 12:23:57
192.168.38.135,tangseng,https://icbc.com.cn/pay.html,2020-02-12 12:24:04
192.168.38.135,tangseng,https://icbc.com.cn/login.html,2020-02-12 12:24:08
192.168.38.135,tangseng,https://icbc.com.cn/pay.html,2020-02-12 12:24:10
192.168.38.135,tangseng,https://icbc.com.cn/pay.html,2020-02-12 12:24:13

1.1 使用State编程实现

  • 代码开发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer

object StateCheckIpChange extends App{

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

environment.setParallelism(1)


// 1. 添加 socket 数据源
val sourceStream: DataStream[String] = environment.socketTextStream("localhost",9999)

import org.apache.flink.api.scala._
// 2. 数据处理
sourceStream.map(
x =>{
val strings: Array[String] = x.split(",")
(strings(1),UserLogin(strings(0),strings(1),strings(2),strings(3)))
}
).keyBy(x => x._1)
.process(new CheckIpChangeProcessFunction)
.print()

environment.execute("checkIpChange")
}


/**
* 自定义KeyedProcessFunction类
*/
class CheckIpChangeProcessFunction extends KeyedProcessFunction[String,(String,UserLogin),(String,ArrayBuffer[UserLogin])]{

var valueState: ValueState[UserLogin] = _

override def open(parameters: Configuration): Unit = {

val valueStateDescriptor = new ValueStateDescriptor[UserLogin]("changeIp", Types.of[UserLogin])
valueState = getRuntimeContext.getState(valueStateDescriptor)

}

/**
* 解析用户访问信息
* @param thisLogin 当前处理的登录数据
* @param ctx
* @param out
*/
override def processElement(
thisLogin: (String, UserLogin),
ctx: KeyedProcessFunction[String, (String, UserLogin), (String, ArrayBuffer[UserLogin])]#Context,
out: Collector[(String, ArrayBuffer[UserLogin])]): Unit = {

val array = new ArrayBuffer[UserLogin]()
array.append(thisLogin._2)

// 取出上次登录 IP
val prevLogin: UserLogin = valueState.value()

if (null == prevLogin ){
valueState.update(thisLogin._2)
}else if (!prevLogin.ip.equals(thisLogin._2.ip)){
println("IP 出现变化,重新登录")
// 更新 IP
valueState.update(thisLogin._2)
array.append(prevLogin)
}
out.collect((thisLogin._1,array))
}
}


/**
* 定义样例类
* @param ip ip
* @param username 用户名
* @param operateUrl 访问地址
* @param time 访问时间
*/
case class UserLogin(ip:String,username:String,operateUrl:String,time:String)

1.2 使用CEP编程实现

  • 导入cep依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
  • 代码开发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.mutable.ArrayBuffer

object CepCheckIpChange extends App{

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

//1. 添加 socket 数据源
val sourceStream: DataStream[String] = environment.socketTextStream("localhost",9999)

// 2. 数据处理
val keyedStream: KeyedStream[(String, UserLoginInfo), String] = sourceStream.map(
x => {
val strings: Array[String] = x.split(",")
(strings(1), UserLoginInfo(strings(0), strings(1), strings(2), strings(3)))
}
).keyBy(_._1)

// 3. 定义Pattern,指定相关条件和模型序列
val pattern: Pattern[(String, UserLoginInfo), (String, UserLoginInfo)] =
Pattern.begin[(String, UserLoginInfo)]("start").where(x => x._2.username != null)
// 使用宽松近邻,使用迭代条件,判断 IP 是否有变更
.followedBy("second").where(new IpChangeIterativeCondition)
// 可以指定模式在一段时间内有效
.within(Time.seconds(120))

// 4. 模式检测,将模式应用到流中
val patternStream: PatternStream[(String, UserLoginInfo)] = CEP.pattern(keyedStream,pattern)


// 5. 选取结果
patternStream.select(new PatternSelectIpChangeDataFunction).print()


// 6. 开启计算
environment.execute()
}

/**
* 使用迭代条件,判断 IP 是否有变更
*/
class IpChangeIterativeCondition extends IterativeCondition[(String, UserLoginInfo)]{

override def filter(
thisLogin: (String, UserLoginInfo),
ctx: IterativeCondition.Context[(String, UserLoginInfo)]): Boolean = {
var flag: Boolean = false
//获取满足前面条件的数据
val prevLogin: util.Iterator[(String, UserLoginInfo)] = ctx.getEventsForPattern("start").iterator()
//遍历
while (prevLogin.hasNext) {
val tuple: (String, UserLoginInfo) = prevLogin.next()
//ip不相同
if (!tuple._2.ip.equals(thisLogin._2.ip)) {
flag = true
}
}
flag
}
}


/**
* 自定义PatternSelectFunction类
*/
class PatternSelectIpChangeDataFunction
extends PatternSelectFunction[(String,UserLoginInfo),ArrayBuffer[UserLoginInfo]]{

override def select(
map: util.Map[String,
util.List[(String, UserLoginInfo)]]): ArrayBuffer[UserLoginInfo] = {
val array = new ArrayBuffer[UserLoginInfo]()

// 获取Pattern名称为start的事件
val prevLogin= map.get("start").iterator()
array.append(prevLogin.next()._2)

//获取Pattern名称为second的事件
val nextLotin = map.get("second").iterator()

array.append(nextLotin.next()._2)

array
}
}

case class UserLoginInfo(ip:String,username:String,operateUrl:String,time:String)

2. 检测设备温度变化

  • 场景介绍

    • 现在日常生活中当中有大量的传感设备,用于检测机器当中的各种指标数据,例如温度,湿度,气压等,并实时上报数据到数据中心,现在需要检测,某一个传感器上报的温度数据是否发生异常。
  • 异常的定义

    • 三分钟时间内,出现三次及以上的温度高于40度就算作是异常温度,进行报警输出
  • 收集数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
传感器设备mac地址,  检测机器mac地址,   温度, 湿度,气压, 数据产生时间

00-34-5E-5F-89-A4,00-01-6C-06-A6-29,38,0.52,1.1,2020-03-02 12:20:32
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,47,0.48,1.1,2020-03-02 12:20:35
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,50,0.48,1.1,2020-03-02 12:20:38
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,48,0.48,1.1,2020-03-02 12:20:39
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,52,0.48,1.1,2020-03-02 12:20:41
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,53,0.48,1.1,2020-03-02 12:20:43
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:45
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:46
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:47
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:48
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:49
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48,1.1,2020-03-02 12:20:50
  • 代码开发实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.util
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.mutable

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2021/4/7 17:56
* */
object CepDeviceTemperatureMonitor extends App{

private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
// 1. 指定时间类型
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.setParallelism(1)
import org.apache.flink.api.scala._

// 2. 接受数据
val sourceStream: DataStream[String] = environment.socketTextStream("localhost",9999)

val deviceStream: KeyedStream[DeviceDetail, String] = sourceStream.map(
x => {
val strings: Array[String] = x.split(",")
DeviceDetail(strings(0), strings(1), strings(2), strings(3), strings(4), strings(5))
}
).assignAscendingTimestamps(
x =>{
format.parse(x.date).getTime
}
).keyBy(x => x.sensorMac)


// 3. 定义Pattern,指定相关条件和模型序列
val pattern: Pattern[DeviceDetail, DeviceDetail] =
Pattern
.begin[DeviceDetail]("start")
.where(x =>x.temperature.toInt >= 40)
.timesOrMore(3).greedy
.within(Time.minutes(3))

// 4. 模式检测,将模式应用到流中
val patternResult: PatternStream[DeviceDetail] = CEP.pattern(deviceStream,pattern)

// 5. 选取结果
patternResult.select(new MyPatternResultFunction).print()

// 6. 启动
environment.execute("CepDeviceTemperatureMonitor")
}

//自定义PatternSelectFunction
class MyPatternResultFunction extends PatternSelectFunction[DeviceDetail,(String,mutable.Map[String,String])]{
override def select(pattern: util.Map[String, util.List[DeviceDetail]]): (String,mutable.Map[String,String]) = {
val startDetails: util.List[DeviceDetail] = pattern.get("start")

//1.通过对偶元组创建map 映射
val map = mutable.Map[String, String]()
var deviceMac:String = ""
for ( i <- 0 until startDetails.size()){
val deviceDetail = startDetails.get(i)
deviceMac = deviceDetail.deviceMac
map.put(deviceDetail.date,deviceDetail.temperature)
}
(deviceMac,map)
}
}

/**
* 定义温度信息样例类
* @param sensorMac 传感器设备mac地址
* @param deviceMac 检测机器mac地址
* @param temperature 温度
* @param dampness 湿度
* @param pressure 气压
* @param date 数据产生时间
*/
case class DeviceDetail(
sensorMac:String,
deviceMac:String,
temperature:String,
dampness:String,
pressure:String,
date:String)

3. 检测超时订单

  • 场景介绍

    在电商系统当中,经常会发现有些订单下单之后没有支付,就会有一个倒计时的时间值,提示你在15分钟之内完成支付,如果没有完成支付,那么该订单就会被取消,主要是因为拍下订单就会减库存,但是如果一直没有支付,那么就会影响库存商品数量,其他人购买的时候买不到

  • 需求

    • 创建订单之后15分钟之内一定要付款,否则就取消订单
  • 订单数据格式如下类型字段说明

    • 订单编号

    • 订单状态

      • 1.创建订单,等待支付
      • 2.支付订单完成
      • 3.取消订单,申请退款
      • 4.已发货
      • 5.确认收货,已经完成
    • 订单创建时间

    • 订单金额

1
2
3
4
5
6
7
8
20160728001511050311389390,1,2016-07-28 00:15:11,295
20160801000227050311955990,1,2016-07-28 00:16:12,165
20160728001511050311389390,2,2016-07-28 00:18:11,295
20160801000227050311955990,2,2016-07-28 00:18:12,165
20160728001511050311389390,3,2016-07-29 08:06:11,295
20160801000227050311955990,4,2016-07-29 12:21:12,165
20160804114043050311618457,1,2016-07-30 00:16:15,132
20160801000227050311955990,5,2016-07-30 18:13:24,165
  • 规则,出现 1 创建订单标识之后,紧接着需要在15分钟之内出现 2 支付订单操作,中间允许有其他操作
  • 代码开发实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

import java.util
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream, pattern}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object CepOrderMonitor extends App{
private val format: FastDateFormat = FastDateFormat.getInstance("yyy-MM-dd HH:mm:ss")

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.setParallelism(1)
import org.apache.flink.api.scala._
val sourceStream: DataStream[String] = environment.socketTextStream("localhost",9999)

val keyedStream: KeyedStream[OrderDetail, String] = sourceStream.map(x => {
val strings: Array[String] = x.split(",")
OrderDetail(strings(0), strings(1), strings(2), strings(3).toDouble)
}).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[OrderDetail](Time.seconds(5)
){
override def extractTimestamp(element: OrderDetail): Long = {
format.parse(element.orderCreateTime).getTime
}
}
).keyBy(x => x.orderId)

//定义Pattern模式,指定条件
val pattern: Pattern[OrderDetail, OrderDetail] =
Pattern.begin[OrderDetail]("start").where(_.status.equals("1"))
.followedBy("second").where(_.status.equals("2"))
.within(Time.minutes(15))


// 4. 调用select方法,提取事件序列,超时的事件要做报警提示
val orderTimeoutOutputTag = new OutputTag[OrderDetail]("orderTimeout")

val patternStream: PatternStream[OrderDetail] = CEP.pattern(keyedStream,pattern)

val selectResultStream: DataStream[OrderDetail] =
patternStream.select(
orderTimeoutOutputTag,
new OrderTimeoutPatternFunction,
new OrderPatternFunction)

// 打印支付成功数据
selectResultStream.print("success")

//打印侧输出流数据 过了15分钟还没支付的数据
selectResultStream.getSideOutput(orderTimeoutOutputTag).print("time out")

environment.execute()
}


// 获取超时数据的订单
class OrderTimeoutPatternFunction extends PatternTimeoutFunction[OrderDetail,OrderDetail]{
override def timeout(pattern: util.Map[String, util.List[OrderDetail]], l: Long): OrderDetail = {
val detail: OrderDetail = pattern.get("start").iterator().next()
detail
}
}

// 获取成功支付的订单
class OrderPatternFunction extends PatternSelectFunction[OrderDetail,OrderDetail] {
override def select(pattern: util.Map[String, util.List[OrderDetail]]): OrderDetail = {
val detail: OrderDetail = pattern.get("second").iterator().next()
detail
}
}

case class OrderDetail(orderId:String,status:String,orderCreateTime:String,price :Double)


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录