Flink CEP 代码案例
登录告警系统: 一堆的登录日志从,匹配一个恶意登录的模式(如果一个用户连续失败三次,则是恶意登录),从而找到哪些用户名是用于恶意 登录
登录失败CEP模型:
支付失败CEP模型:
什么是复杂事件处理CEP
CEP是做复杂事件处理的.如果你碰到一个业务需求非常之复杂,而且他的条件也是非常复杂的.比如说处理某一个,符合某一个条件,做什么事情.这个条件是非常复杂的.那么处理复杂的事件,Flink专门有一套API.这套API的名字叫CEP. 而spark是没有CEP的.
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
特征:
Ø 目标:从有序的简单事件流中发现一些高阶特征(无序的就得加延时操作)
Ø 输入:一个或多个由简单事件构成的事件流
Ø 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
Ø 输出:满足规则的复杂事件
如上图所示: 如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。
这个恶意登录监控有两种解决办法. 一.状态编程. 二.CEP编程.
比如同一个用户(可以是不同ip)在2秒内连续2次登录失败,我们就触发报警.打一个告警信息. 我们可以引入一个listState. 将上一次用户登录的数据放到这个列表中. 后面还出现相同数据,继续放到列表中. 列表的长度等于2了或者大于2了. 就开始发出一个告警. 不过用状态编程会有问题. 会造成内存的容量的增加.还可能造成一些精准的业务无法做到. 比如说我不要连续两秒内两次登录了. 改成两秒内连续5次登录失败.才将这样的用户找出来. 这种需求改起来特别复杂.因为中间很有可能穿插一些其他东西. 如上图: 比如我要删选长方形和紧跟着圆形的数据. 后来需求改成长方形后面有圆形的数据了. 这两个之间很可能穿插了一些其他的三角形的一些东西. 那这些穿插的需不需要考虑放到listState中去. 而且还要考虑,这个时间是否到达5秒. 所以用状态编程不适合做恶意登录风险.所以当前业务我们应该使用Flink的CEP的库来做.
CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
看起来很简单,但是它有很多不同的功能:
Ø 输入的流数据,尽快产生结果
Ø 在2个event流上,基于时间进行聚合类的计算
Ø 提供实时/准实时的警告和通知
Ø 在多样的数据源中产生关联并分析模式
Ø 高吞吐、低延迟的处理
市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。
Flink CEP
Flink为CEP提供了专门的Flink CEP library,它包含如下组件:
FlinkCEP的步骤就四个. 一是准备好数据.该分组分组.改设置waterMark的设置Watermark.该过滤过滤. 因为CEP本质上就是一个窗口函数.它里面封装了开窗.因为一般情况下,我们会设置在某一个时间内满足这个条件的.如果说没有时间限制的话.那这个符合这个规则的数据找到一堆. 或者找不到. 比如找到一本矩形开头的,但是后面数据源源不断的来.因为他是一个无线的流.所以会造成他匹配不成功.所以一般来说我们一定要限制一个时间范围.当你限制一个时间范围的话,实际上就是开窗了.所以前面你要定义时间语义,定义是否有水位线之类的.
第二个就是定义我们的模式. 即定义我们的pattern. 定义好了之后,我们的pattern会帮我们检测.模式就是我们的规则. 检测实际上会出现两种情况.一种情况符合这个规则. 还有种情况是不符合这个规则.其实就是一种匹配成功了.一种没有匹配成功. 但是一般情况下我们只是处理匹配成功的. 匹配成功的数据怎么拿出来呢? 就是生成Alert. 这里检测的时候为什么要说能够帮我检测根据规则去匹配成功的,也可以匹配哪些不成功的呢?因为我们以后可能遇到一些用排除法做模式匹配. 如果不用排除法发现一些业务太复杂了,我们不好加一些条件去设置规则. 到时候发现这个业务可以用排除法的话.而且反证的条件是容易定义的.那我就把反证的模式定义好.定义好之后我只需要找那些没有匹配上的.没有匹配上的就是我所需要的.
Ø Event Stream
Ø pattern定义
Ø pattern检测
Ø 生成Alert
Flink CEP 四步骤:
定义模式,只要匹配订单在创建之后15分钟内有支付的事件.单一模式
模式检测
生产Alter. 找到所有的在15分钟内支付的订单事件, 并且还要找到没有支付的订单事件
notFollowedBy的意思就是不想让某一个是事件在两个事件之间发生.
因为notFollowedBy()绝对不可能放在最后面. 他只会放在两个个体模式之间. 既然是在两个个体模式的意思.就意味着表示在两个个体不同的事件之间不出现一个什么样的事件. 所以称之为叫不想要某个事件在两个事件之间发生.这个模式也是可以指定时间的. 用within里面指定时间.
超时事件的提取: 就是提取匹配的反的模式
超时事件的提取. 假设我有一个窗口里面有许多数据, 我要匹配 :
矩形 followBy 圆形 的. 我只要是矩形开头圆形结尾就可以匹配到.
但是有这么一种情况就是: 我在窗口最后位置有一个矩形,后面就没数据了. 就是在这个5分钟窗口内找不到矩形开头后面紧跟着圆圈的了.像这样的一个数据事件我们也可以在后面选取的时候将他选着出来.所以他有两种事件的选取. 一种是匹配事件的选取. 一种是超时事件的提取. 超时事件提取的话,会用到另外一个函数类叫patternTimeOutFunction
首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。
为了使用Flink CEP,我们需要导入依赖:
1 | <dependency> |
Event Streams
以登陆事件流为例:
1 | case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: String) |
Pattern API
每个Pattern都应该包含几个步骤,或者叫做state。从一个state到另一个state,通常我们需要定义一些条件,例如下列的代码:
1 | val loginFailPattern = Pattern.begin[LoginEvent]("begin") |
每个state都应该有一个标示:
1 | 例如 .begin[LoginEvent]("begin")中的"begin" |
每个state都需要有一个唯一的名字,而且需要一个filter来过滤条件,这个过滤条件定义事件需要符合的条件,例如:
1 | .where(_.eventType.equals("fail")) |
我们也可以通过subtype来限制event的子类型:
1 | start.subtype(SubEvent.class).where(...); |
事实上,你可以多次调用subtype和where方法;而且如果where条件是不相关的,你可以通过or来指定一个单独的filter函数:
1 | pattern.where(...).or(...); |
之后,我们可以在此条件基础上,通过next或者followedBy方法切换到下一个state,next的意思是说上一步符合条件的元素之后紧挨着的元素;而followedBy并不要求一定是挨着的元素。这两者分别称为严格近邻和非严格近邻。
1 | val strictNext = start.next("middle") |
最后,我们可以将所有的Pattern的条件限定在一定的时间范围内:
1 | next.within(Time.seconds(10)) |
这个时间可以是Processing Time,也可以是Event Time。
Pattern 检测
通过一个input DataStream以及刚刚我们定义的Pattern,我们可以创建一个PatternStream:
1 | val input = ... |
一旦获得PatternStream,我们就可以通过select或flatSelect,从一个Map序列找到我们需要的警告信息。
select
select方法需要实现一个PatternSelectFunction,通过select方法来输出需要的警告。它接受一个Map对,包含string/event,其中key为state的名字,event则为真实的Event。
1 | val loginFailDataStream = patternStream |
其返回值仅为1条记录。
flatSelect
通过实现PatternFlatSelectFunction,实现与select相似的功能。唯一的区别就是flatSelect方法可以返回多条记录,它通过一个Collector[OUT]类型的参数来将要输出的数据传递到下游。
超时事件的处理
通过within方法,我们的parttern规则将匹配的事件限定在一定的窗口范围内。当有超过窗口时间之后到达的event,我们可以通过在select或flatSelect中,实现PatternTimeoutFunction和PatternFlatTimeoutFunction来处理这种情况。
1 | val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) |
附录:Flink的双流join
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Flink-原理与实现-Flink-CEP复杂事件处理/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/