Flink的一个优势,是其他计算引擎所做不到的.或者说能做到,但是代码特别麻烦,我们的Flink中每一个算子,他都给你提供了一个函数对象作为参数
好记心烂笔头:
为什么我们不用匿名函数(lambda function)去写Flink代码呢?
1 | 因为Flink中花括号里面的代码都是运行在slot中的,但是那么多slot,每个slot里面都可能执行,那数据库就被搞死了.所以我们就不用匿名函数的方式写代码了.而是使用函数类的方式. |
函数类(Function Classes)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction做转换的, FilterFunction做过滤的, ProcessFunction不知道做转换,做过滤还是做其他的,但是我肯定要处理数据,则使用processFunction,他是没有限制的,你想做什么你就在这里面写什么就好了等等。
下面例子实现了FilterFunction接口:
1 | class FilterFilter extends FilterFunction[String] { |
还可以将函数实现成匿名类
1 | val flinkTweets = tweets.filter( |
我们filter的字符串”flink”还可以当作参数传进去。
1 | val tweets: DataStream[String] = ... |
匿名函数(Lambda Functions)
1 | val tweets: DataStream[String] = ... |
富函数(Rich Functions)
富函数和函数类是一样的,不过他有一个特点.他增加了生命周期的管理.什么叫生命周期?就是 XXXFunction()什么时候初始化.什么时候销毁.什么时候执行你们的代码等就是所谓的生命周期的管理.XXXFunction()本身是没有生命周期的管理的.如果你需要增加生命周期的管理,你需要继承RichMapFunction()
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function有一个生命周期的概念。典型的生命周期方法有:
- open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
1 | class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] { |
案例代码
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Flink-原理与实现-Flink实现UDF函数——更细粒度的控制流/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/