Flink底层API概述
ProcessFunction API是属于Flink三层API中最底层的一层API.最底层的API意味着什么呢?
顾名思义就是我们可以做任何你想要做的任何事情.你可以理解为底层的API我们想要处理一些细腻化的操作,或者要处理一些特殊的业务.我如果使用高级的API ,DataStream API已经搞定不了的情况下.最后呢,我们就可以是用ProcessFunction API. ProcessFunction API是属于我们Flink里面最底层的转换算子.在这个最底层的转换算子中.我们可以有很多的工作可以做.或者有很多的功能可以完成.可以访问时间戳.那我们访问的时间戳是什么时间戳呢?是处理的时间戳.也可以是当前进来的那条数据的时间戳.那么当前进来的时间戳和处理的时间戳我们不是可以使用system.currenttimemillis()吗?但是这个时候我们使用这个是不准确的.你要想访问准确的时间的话.我们就可以使用底层的API.他就给我们提供了一个方法.我们可以访问watermark,还可以查看当前的水位线.水位线实际上是这样的,就是他每个两百毫秒调用一次.是需要更新这个水位线的.因为水位线有两种.
一种是周期性的水位线.
还有一种是间断性的水位线.
周期性的水位线就是说我每隔多长时间来更新一下或者设置一下这个新的水位线.就是这个意思.那么当前最新的水位线是多少我们都可以通过在底层API中去拿到.甚至我们还可以注册定时的事件.
什么叫注册定时的事件呢?
比如说:我们到达某一个条件之后我想触发一个事件.触发一个新的事件.这个事件我规定你在三秒钟之后你给我执行.那么在三秒钟还没到的时候我就有必要把这个事件注册一下.注册之后,从现在开始到三秒钟之后他会自动触发.所以你可以理解为定义,我们定义一个马上过一段时间要触发的一个事件.
还可以输出一个特定的一些事件.比如说超时的一些事件.什么意思呢?比如说我们现在正在处理数据.假设处理数据的时候我们发现可能某一个条件还没成熟.那么没关系,我们可以在几秒钟之后设置一个超时时间.在几秒钟之后我们再进行处理.这就是所谓的超时时间.
实际上几秒钟之后再处理的话,也是需要注册一个定时事件的.因为你要注册一个处理的时间,多长时间之后再进行处理.这些都是原来我们的API所无法做到的.那我们就可以通过ProcessFunction API这个底层的API来进行处理.
Flink给我们提供了8中ProcessFunction API.
这些窗口函数说白了就是一个个的对象,看着像函数,实际上他是对象.或者说就是一个类.那我们要用的话是声明一个匿名方法吗?不是的,是声明一个匿名函数吗?不是的.而是你需要写一个类去继承这8个ProcessFunction.他是我们真正的窗口函数类.如果你要把前面的知识点串联起来的话.
Flink里面我们提供了三种函数类
匿名函数不好
一个是普通的函数类.
第二个是富函数类.
第三个是底层处理的函数类.
所以我们有三种函数类.这块儿函数类有一个特点,他不是说你要声明写一个匿名函数就可以了.而是你必须要写一个类去继承他.
这里最常用的是ProcessFunction和KeyedProcessFunction.
下面以KeyedProcessFunction举例子.
我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level(底层的)转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
Flink提供了8个Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction
这里以KeyedProcessFunction举例,因为他用的最多.上面的这8个ProcessFunction API即底层API,都是按照下面连个案例去操作的.
以下例子说明,就是你可以通过这个Process Function中,拿到你想要拿到的任何东西,包括你对侧输出流的管理,包括你对当前的运行时间还有当前的watermark,当前的TimerService(时间服务),TimerService里面又可以注册一个触发器,并且还可以删除一个触发器.总之一句话,就是底层的ProcessFunction提供了所有你想要拿到的东西.都可以在底层的ProcessFunction API里面拿
KeyedProcessFunction是专门用来操作KeyedStream的.只会操作KeyedStream.
KeyedProcessFunction他会处理我们流的每一个元素.可以输出多个元素.也可以输出0个,也可以输出1个.这个我们之前说的DataStream API里面的处理函数或者转换函数不一样的.比如我们在DataStream中说的FlatMap这个处理函数(注意:处理函数就是所谓的转换函数),这里FlatMap他是把一条数据变成n条数据,即把一个元素转换成多个元素.还有一个,比如说map算子,map算子就是把一条数据通过map输出一条数据.我们可不可以用map算子来把一条数输出0条数据呢?(0个元素就代表不输出的意思.) 请问可以这样做吗?不可以的.那为什么我们后面不能输出null呢?输出null不就是代表不输出了呢?因为null也是等于0的呀!因为我们的map是要申明类型的,一个输入类型,一个输出类型.那我们写代码的时候输出一个null不就行了嘛?你什么代码也不写,这是不行的,因为这个代码是必须有返回值的.你这个方法声明的返回值你却没有返回任何东西的话,你的语法就通不过,然后他的编译也通不过.那什么都不输出的话编译不通过,那我就输出个null不行吗?不行,输出null的话,在执行过程中是会报错的.为什么?因为他要把输出的数据还要反序列化一下.在我们Flink中,流的处理,尤其在后面写到Sink里面他要进行序列化的.也就是说,他会造成后面的算子报错的现象.可能你这个map算子不会报错,但是会造成后面的算子报错.因为后面的算子他要反序列化你这个map算子输出的对象.那这个时候因为你为null,那他就会报一个空指针异常的错误.所以说,他做不到.
但是我们当前的底层API即KeyedProcessFunction这个底层API他就可以做到,他可以不输出.因为它里面的方法是没有返回值的.他既然没有返回值,我不输出,我不写代码就可以了.他可以输出1个元素也可以输出0个元素.他是非常灵活的.这就是所谓的底层API.当然,灵活归灵活,只不过代码写的稍微多一点.以前是调一个函数就ok了.现在我们用底层API我们还得写一个类.去继承这个父类.然后重写父类的所有抽象方法.而且我们所有的ProcessFunction都继承自RichFunction接口.RichFunction接口的特点是,有生命周期的管理.你可以知道我们这个subtask,就是我们这算子所对应的subtask在什么时候初始化,什么时候关闭.什么时候初始化的回调方法就是Open()方法.什么时候关闭的回调方法就是close()方法.这就是所谓的声明周期.还可以的到当前运行的上下文环境getRuntimeContext().这个上下文环境以后是要用的.为什么呢?因为我们前面说,我们的底层API可以访问时间,可以访问watermark(水位线),可以注册事件.那么可以通过谁来注册事件呢?就是通过我们运行时的上下环境,就是getRuntimeContext()来进行注册事件.
归纳总结: 我们整个Flink学了三种函数类.普通的函数类.富函数类和底层处理的函数类.那我们是不是可以认为底层处理的函数类可以做前面两种所有能做的事情.除此之外,我们的底层处理的函数还能做额外的一些事情,比如说注册新的事件,访问时间戳等等.或者说底层处理的函数类,就是我们所谓的写代码过程中发现一个业务数据做不到的我们就可以放大招了,使用我们底层处理的函数类来完成我们想要的需求.那如果底层处理的函数类都搞定不了的话,那就是我们技术选型有问题了.我们就不该选用Flink了.
时间服务 (TimerService )和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。
下面举个例子说明KeyedProcessFunction如何操作KeyedStream。
需求:监控温度传感器的温度值,如果温度值在一秒钟之内(processing time)连续上升,则报警。
1 | val warnings = readings |
看一下TempIncreaseAlertFunction如何实现, 程序中使用了ValueState这样一个状态变量。
1 | class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] { |
侧输出流(SideOutput)【注意:测输出流是用来替代Split算子的。Split算子已经过时了。】
侧输出流是用来替代Split算子的,split算子实际上已经过时了.侧输出流的意思就是说,我们把一条流分成多个流.这个侧输出流可以这样子的.就是一条主流两个测流.或者一条主流,三个测流.这就根据你自己的意思来决定.但是有一个特点就是你的类型必须相同.那么,如果你输出侧输出流的话.你必须需要用到我们的底层,也需要用到底层的function才行的.所以这个侧输出流他也是属于我们的ProcessFunction API里面的.就是底层API里面的.
由于我们可以输出多条测流,那么每一个测流呢,我们可以定义一个OutputTag[X]对象来区分.其中的X就是他每条测流里面的每个元素的类型.所以需要在前面就得定义一个OutputTag[X].有多少个侧输出流,你就需要定义多少个OutputTag[X].实际上你可以理解为就是我当前测流的一个标记.或者我当前测流的一个标签.到时候你去拿我这个测流的时候,你也是根据这个标签去拿.
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
下面是一个示例程序:
1 | val monitoredReadings: DataStream[SensorReading] = readings |
接下来我们实现FreezingMonitor函数,用来监控传感器温度值,将温度值低于32F的温度输出到side output。
1 | class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] { |
CoProcessFunction(这是将两条流汇成一个流的)
CoProcessFunction,这个是将两个流汇成一个流的.CoProcessFunction和我们之前讲的两个底层的函数不一样.它里面有两个需要实现的方法.如果你需要把两条流汇成一个流的话.你就要分别处理,所以要重写CoProcessFunction里面的两个方法:ProcessElement1()和ProcessElement2().那如果我想要将他汇成一个流的话,我只需要在这两个方法中用同样的代码去删除就可以了.
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。
类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Flink-原理与实现-Flink的ProcessFunction-API(底层API)/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/