时间语义与Wartermark(水位线)阐述:
用到了时间语义和watermark,所以Flink和sparkStreaming是有一定的区别的.
Flink中的时间语义有三种:
1 | Event Time(事件触发的时间或者是事件创建的时间), |
在生产环境中,大多数我们都关心Event Time,我们可能会把Event Time来作为我们开窗的时间参数.就是说我们任何一个开窗都会有一个开窗时间,我们的开窗时间是基于这三个时间的哪一种呢?他是基于Event Time这一种的.但是默认情况下,他的时间语义是基于Processing Time这种的.这是所谓的Flink中的时间语义.所以,如果我们不使用Flink默认的时间语义的话,而是使用Event Time的话,我首先还得想清楚,如何去引入这个Event Time.引入Event Time的两行代码,第一行时先声明,告诉我们的Flink,我不用processing Time,我想要使用Event Time. 如下所示:
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
第二行代码就是说,到底我们的每一条数据中的哪一个属性,数据中的哪一个小段,数据中的哪一个字段,他是属于Event Time的,或者说,具体的Event Time的值是多少,你得告诉我.所以第二行代码,如上图所示.
但是具体的Event Time的值是多少呢? 这个时候我还要你考虑一下,是否我们的数据严格按照我的Event Time的顺序进来呢,还是按照我的Event Time无序(乱序)的进来.所以我们必须考虑这两种情况.我们的开窗函数事实上在以后的业务中很有可能会考虑这两种情况.无论你是按照Event Time还是按照Processing Time,我们很有可能都需要考虑这两种情况.第一种情况就是我的数据是严格按照某一个时间语义升序进来的.还有一种情况就是按照某一个时间语义乱序进来的.升序进来的话就比较简单,但是如果是乱序进来的话.就需要考虑一个延迟的问题了.为了保证我们延迟的时候,可以再固定的时间触发.或者说在某一个条件下去触发,那我们就需要引入一个叫watermark(水位线)
watermark的作用其实只有两个作用: 一个是规定我们延迟之后(因为我们每一个窗口都要做延迟,为何要延迟?因为数据是乱序的)我到底在什么条件下去触发呢?那是由watermark来决定的.第二个作用是,watermark还可以确定,我们在触发的时候有哪些时间的数据已经进到我们的窗口里面来了.这就是watermark的两个具体的作用.
Flink中的时间语义
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示(Flink时间概念):
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入Flink的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
一个例子——电影《星球大战》:
例如,一条日志进入Flink的时间为2017-11-12 10:00:00.123,到达Window的系统时间为2017-11-12 10:00:01.234,日志的内容如下:
1 | 2017-11-02 18:37:15.624 INFO Fail over to rm2 |
对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。
EventTime的引入
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
Watermark
Watermark翻译成中文叫做水位线.他就是用来规定,什么样的数据算是迟到的数据.他就是定义一个标准,低于这个标准了,我们就认为是迟到的数据.这个就是所谓的水位线.这个水位线一般来说就是处理乱序的数据,这里我们得清楚,是基于哪一个时间乱序.在第七章我们探讨的是基于Event Time乱序.因为处理时间是不肯定乱序的.因为我是实时处理,所以我们严格按照时间的顺序来进行处理的.但是我进来的数据,他是有可能根据我们事件产生的时间产生一个乱序的行为.这个乱序我们如何处理呢?这就需要定义一个watermark.不但要定义一个Watermark,还要让这个watermark实时的,或者隔一段时间或者达到什么样的条件.他的水位线要跟着变化.但是一般情况下这个水位线是单调递增还是单调递减呢?还是保持不变呢?
你想一下,数据源源不断的从一个管道里面进来,这个数据总体上我们一般认为他是有序的.他不可能总体上无序,比如说我要处理1999年到2020年的数据,最后发现,1999年的数据最后再来.等我全部处理完成之后再来.那这个总体上就无序了.那如果我要处理1999年到2020年数据,这数据有一点乱序的,什么叫有一点乱序呀?就是1999年的某一天,假设是1月1号的某一天.有可能是10:05分的数据来了,05分的数据来了之后,又来了03分的数据.那发现这个数据就不对了,顺序变了.这种情况不止一处的话,说明总体上还是有序的,细节上还是不对的.
但是我的数据总体上完全乱序的,这个就不在我们的watermark的处理范围之内.
Watermark的具体的机制是:Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
我们设置watermark的一个延时的时长为T,这个T实际上是我们根据数据的特点来设置的.什么叫数据的特点呢?就是数据是进入窗口是乱序进来的,你这数据乱到什么程度.根据你数据乱到什么程度来确定我的T该设置为多少.比如说:本来按道理说是进来的是9:05分的数据了,可是你还进来的是8:05分的数据.这就很明显延迟了1个小时.那么,既然我的数据延迟了一个小时了,所以我的T也应该是要延迟一个小时.这就是所谓的根据你数据的乱序程度.
但是如果我数据应该进来了9:05分,可是呢.我进来的确是9:00的数据,那只是延迟5秒就可以了.所以这个T就要设置为5秒.实际上T是根据你数据乱序的特点来确定我T设置为多少.当你确定好T之后,我们的系统会校验已经到达的数据中最大的Event Time,然后认定我们的eventTime是否小于maxEventTime - t,如果我们的Evnet Time小于我们的maxEventTime-t,那这就说明我们所有的数据都已经到达.这个时候我们就要开始触发了.触发的时候需要判断我们窗口的停止时间是不是小于等于maxEventTime-t,窗口的停止时间就是窗口的结束时间.
我们窗口的起始时间和结束时间是有一个时间范围的.只要是范围的话我们就要考虑一个问题.就是到底是左闭右开,还是左开右闭,或者左闭右也是闭合的,或者包头不包尾的呢?
其实我们的时间范围就是包头不包尾的.啥意思呢?就是包括了起始时间,不包括结束时间.比如说的起始时间是9:00分到9:05分,那结束时间就是9:05分,比如说我现在有一条数据,他就是属于9:00的,那么请问这条数据是不是属于我当前这个窗口的呢?是的.那如果我这条数据是9:05分的数据呢?那他就不属于我们当前这个窗口的了,因为包头不包尾.他应该是属于下一个窗口的.所以我的窗口的起始时间小于等于maxEventTime-t那么我这个窗口就会被触发.
这样的话就有一个watermark这么一个机制来控制我的延迟触发的问题.
基本概念
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
数据的乱序图 示:
1 | 那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。 |
有序流的Watermarker如下图所示:
Watermark设置为0
乱序流的Watermarker如下图所示:
Watermark设置为2
当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于Watermark是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s5s,窗口2是6s10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。
只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
Watermark的引入
水位线有两种:周期性水位线和间断性的水位线
watermark的引入很简单,对于乱序数据,最常见的引用方式如下:
1 | //上面只说了要引入EventTime,但是没说到底什么时候引入eventTime. 这里就告诉你到底什么时候引入Event |
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
MyAssigner有两种类型
AssignerWithPeriodicWatermarks 按照周期来设置分配水位线
AssignerWithPunctuatedWatermarks 根据间断的特点来分配水位线
以上两个接口都继承自TimestampAssigner。
Assigner with periodic watermarks(根据周期来设置分配水位线)
Assigner with periodic watermarks 这是一个接口,一定要自己写一个类去实现这个接口,这是一种复杂的代码写法
如果你想要通过周期性来生成我的watermark的话,实际上你得告诉我周期的间隔时间.默认情况下,周期的时间是200毫秒.也就是说没隔200毫秒会插入水位线.会在数据流中插入水位线.插入一次一定就是调用一个方法去插入.如果我肯定要写一个类去实现AssignerWithPeriodicWatermarks(根据周期来设置分配水位线)这个接口.
当然这个200毫秒我也可以自己设置,在ExecutionConfig.setAutoWatermark
Interval()这个方法中,我可以指定一个时间.那么这个时间就是由我来设置这个周期性.这叫周期性的水位线.就是隔一段时间我给你水位线加上去,隔一段时间我就给你水位线加上去.
水位线有两个作用:
第一个作用是,我可以判断小于等于当前水位线的数据已经来了.
第二个作用是,我可以用他来判断他在什么时候应该触发我们window的function的执行.
周期性的生成watermark:系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
产生watermark的逻辑:每隔5秒钟,Flink会调用
AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
例子,自定义一个周期性的时间戳抽取:
1 | class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] { |
一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。
1 | val stream: DataStream[SensorReading] = ... |
而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间,就可以使用如下代码:
1 | val stream: DataStream[SensorReading] = ... |
Assigner with punctuated watermarks(间断式的生产水位线)
什么叫间断式的生产watermark(水位线)呢?本质上来说就是根据你指定的条件,他不是按照时间来的,而是让你来决定.你觉得可以加上一个watermark(水位线)了,那你就加.就是你写代码到达某一个条件了你就加一个水位线.当然你写代码时也可以判断时间.比如说你到达某一个时间,你加一个水位线.当然也可以不以时间为判断.比如说我判断你的键值对.键值对中的某一个键等于某个值的时候,我给你加个水位线.或者你的值等于什么什么的时候.我给你加一个水位线.所以,这叫健壮式的生产watermark(水位线).说白了这个所谓的健壮式的watermark(水位线),就是不以这个时间周期性来生成的.就是由我自己根据业务查询,自己根据条件去设置.
间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1的传感器的数据流插入watermark:
1 | class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { |
EvnetTime在window中的使用
滚动窗口(TumblingEventTimeWindows)
1 | def main(args: Array[String]): Unit = { |
结果是按照Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)
滑动窗口(SlidingEventTimeWindows)
1 | def main(args: Array[String]): Unit = { |
会话窗口(EventTimeSessionWindows)
相邻两次数据的EventTime的时间差超过指定的时间间隔就会触发执行。如果加入Watermark, 会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。
1 | def main(args: Array[String]): Unit = { |
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Flink-原理与实现-Flink-的时间语义与Wartermark-水位线-机制/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/