整体介绍
Flink SQL 源码案例
https://github.com/ShangBaiShuYao/flink-learning-from-zhisheng/blob/main/flink-1.12.0-Demo/src/main/java/com/shangbaishuyao/demo/FlinkDemo10/ 从FlinkDemo10往后都是
什么是 Table API 和 Flink SQL
Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。
目前功能尚未完善,处于活跃的开发阶段。
Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink的SQL支持,基于实现了SQL标准的Apache Calcite(Apache开源SQL解析工具)。
无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果。
Table API和SQL:
实际上就是把我们一个流变成一个关系型的API.
如果我们有关系型的API的话.我们的代码变成就非常方便一些.这个和我们所说的hive是一样的.为什么要用hive呢?本来我们使用SparkCore和MapReduce也是能替代hive的.但是我们为什么用hive呢?因为我们就是不想写复杂的代码呀!当然了,也是为了以后的数据管理和数据分析方便一些.
所以我们把这个流变成关系型的API的话.把Data Stream变成关系型的API后,实际上就是方便以后我们对数据进行分析和处理的.而且他的可读性也强一些.
Table API是流处理和批处理通用的关系型API,Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:自动完成和语法检测。
需要引入的依赖
Table API和SQL需要引入的依赖有两个:planner和bridge。
1 | <dependency> |
flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner;
flink-table-api-java-bridge:bridge桥接器,主要负责table API和 DataStream/DataSet API的连接支持,按照语言分java和scala。
这里的两个依赖,是IDE环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了。
当然,如果想使用用户自定义函数,或是跟kafka做连接,需要有一个SQL client,这个包含在flink-table-common里。
两种planner(old & blink)的区别
批流统一:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。
因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。
Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog。
旧planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推。
基于字符串的键值配置选项仅适用于Blink planner。
PlannerConfig在两个planner中的实现不同。
Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立。
旧的planner不支持目录统计,而Blink planner支持。
API调用
基本程序结构
Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。
具体操作流程如下:
1 | StreamTableEnvironment tableEnv = ... // 创建表的执行环境 |
创建表环境
创建表环境最简单的方式,就是基于流处理执行环境,调create方法直接创建:
1 | StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); |
表环境(TableEnvironment)是flink中集成Table API & SQL的核心概念。它负责:
1 | 注册catalog |
在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置 TableEnvironment的一些特性。
比如,配置老版本的流式查询(Flink-Streaming-Query):
1 | EnvironmentSettings settings = EnvironmentSettings.newInstance() |
基于老版本的批处理环境(Flink-Batch-Query):
1 | ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment; |
基于blink版本的流处理环境(Blink-Streaming-Query):
1 | EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() |
基于blink版本的批处理环境(Blink-Batch-Query):
1 | EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() |
在Catalog中注册表
表(Table)的概念
TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表。它会维护一个Catalog-Table表之间的map。
表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。
表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果。
连接到文件系统(Csv格式)
连接外部系统在Catalog中注册表,直接调用tableEnv.connect()就可以,里面参数要传入一个ConnectorDescriptor,也就是connector描述器。对于文件系统的connector而言,flink内部已经提供了,就叫做FileSystem()。
代码如下:
1 | tableEnv |
这是旧版本的csv格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被弃用,以后会被一个符合RFC-4180标准的新format描述器取代。新的描述器就叫Csv(),但flink没有直接提供,需要引入依赖flink-csv:
1 | <dependency> |
代码非常类似,只需要把withFormat里的OldCsv改成Csv就可以了。
连接到Kafka
kafka的连接器flink-kafka-connector中,1.10版本的已经提供了Table API的支持。我们可以在 connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor。
1 | tableEnv.connect( |
当然也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的。
表的查询
利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。
Flink给我们提供了两种查询方式:Table API和 SQL。
Table API的调用
Table API是集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。
Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。
代码中的实现如下:
1 | Table sensorTable = tableEnv.from("inputTable"); |
SQL查询
Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。
代码实现如下:
1 | Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'"); |
当然,也可以加上聚合操作,比如我们统计每个sensor温度数据出现的个数,做个count统计:
1 | Table aggResultTable = sensorTable |
SQL的实现:
1 | Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id"); |
这里Table API里指定的字段,前面加了一个单引号’,这是Table API中定义的Expression类型的写法,可以很方便地表示一个表中的字段。
字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。以后的代码中,一般都用后一种形式。
将DataStream 转换成表
Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table。Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了。
代码表达
代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。
这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。
代码具体如下:
1 | DataStream<String> inputStream = env.readTextFile("sensor.txt"); |
数据类型与 Table schema的对应
在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。
基于名称的对应:
1 | Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature"); |
Flink的DataStream和 DataSet API支持多种类型。
组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。
创建临时视图(Temporary View)
创建临时视图的第一种方式,就是直接从DataStream转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。
代码如下:
1 | tableEnv.createTemporaryView("sensorView", dataStream); |
另外,当然还可以基于Table创建视图:
1 | tableEnv.createTemporaryView("sensorView", sensorTable); |
View和Table的Schema完全相同。事实上,在Table API中,可以认为View和Table是等价的。
输出表
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。
具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中。
输出到文件
代码如下:
1 | // 注册输出表 |
更新模式(Update Mode)
在流处理过程中,表的处理并不像传统定义的那样简单。
对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(update mode)指定。
Flink Table API中的更新模式有以下三种:
输出到Kafka
除了输出到文件,也可以输出到Kafka。我们可以结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出。
代码如下:
1 | // 输出到 kafka |
输出到ElasticSearch
ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,这样就可以使用Query定义的键(key)与外部系统交换UPSERT/DELETE消息。
另外,对于“仅追加”(append-only)的查询,connector还可以在append 模式下操作,这样就可以与外部系统只交换insert消息。
es目前支持的数据格式,只有Json,而flink本身并没有对应的支持,所以还需要引入依赖:
1 | <dependency> |
代码实现如下:
1 | // 输出到es |
输出到MySql
Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖:
1 | <dependency> |
jdbc连接的代码实现比较特殊,因为没有对应的java/scala类实现ConnectorDescriptor,所以不能直接tableEnv.connect()。不过Flink SQL留下了执行DDL的接口:tableEnv.sqlUpdate()。
对于jdbc的创建表操作,天生就适合直接写DDL来实现,所以我们的代码可以这样写:
1 | // 输出到 Mysql |
将表转换成DataStream
代码实现如下:
1 | DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); |
所以,没有经过groupby之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。
Query的解释和执行
1 | String explaination = tableEnv.explain(resultTable); |
Query的解释和执行过程,老planner和blink planner大体是一致的,又有所不同。整体来讲,Query都会表示成一个逻辑查询计划,然后分两步解释:
优化查询计划
解释成 DataStream 或者 DataSet程序
而Blink版本是批流统一的,所以所有的Query,只会被解释成DataStream程序;另外在批处理环境TableEnvironment下,Blink版本要到tableEnv.execute()执行调用才开始解释。
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Flink 原理与实现: FlinkSQL的Table API 与SQL概念/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/