Spark SQL编程
注意: 建表一定是数据集,对数据集进行建表即用df. 而执行SQL是spark.sql()
SparkSession新的起始点
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
现在是使用的是SparkSession了.实际上就是将两个结合了.所以你很方便的去查寻一个json文件,也可以查询一个hive数据.统一的数据入口
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口
DataFrame和dataset可以相互转换. DataFrame是Dataset里面的一种特殊形式
ResultSet():这个方法很恶心,就是编译期不做类型校验,但是你一运行就会报类型转换异常
DataFrame
创建
1 | 在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,所以DataFrame的创建是来自于sparkSession.从sparkSession中去找.而sparkSession是依赖于sparkContext来构建的.它里面放了一个sparkcontext对象,你可以拿出来. 你也可 以直接通过SparkConf来创建一个SparkSession.但是这种方式它里面也是在构建sparkSession之前构建了SparkContext. 因为SparkContext是用于跟Spark集群连接的. |
1)从Spark数据源进行创建
(1)查看Spark数据源进行创建的文件格式
1 | scala> spark.read. |
(2)读取json文件创建DataFrame
1 | scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") |
(3)展示结果
1 | scala> df.show 这里面用的比较多的行动算子,我们之前写sparkCore是用collect的这种方式展现的是有结构信息的 |
2)从RDD进行转换
后面讨论
3)从Hive Table进行查询返回
后面讨论
SQL风格语法(主要)
1)创建一个DataFrame(注意在DataSet里面定义的一些函数,我DataFrame也是可以用的)
1 | scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") |
2)对DataFrame创建一个临时表
View(视图),视图和table(表)有什么区别呢?视图使用来查的, 而表是用来增删改查的.因为当前的分布式数据集RDD具有不可变性.
1 | scala> df.createOrReplaceTempView("people") //参数是视图名 |
3)通过SQL语句实现查询全表
1 | scala> val sqlDF = spark.sql("SELECT * FROM people") |
4)结果展示
1 | scala> sqlDF.show |
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
5)对于DataFrame创建一个全局表
1 | scala> spark.sql("SELECT * FROM global_temp.people").show() |
DSL风格语法(次要) (DSL风格叫领域特定语言)
DSL风格叫领域特定语言,就是说,他只能在sparkSQL当中能用,换一个地方就不能用了.很恶心.
DSL语言风格就是使用select,filter,map等这些函数.
1)创建一个DataFrame
1 | scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") |
2)查看DataFrame的Schema信息
1 | scala> df.printSchema |
3)只查看”name”列数据
1 | scala> df.select("name").show() |
4)查看”name”列数据以及”age+1”数据
1 | scala> df.select($"name", $"age" + 1).show() |
5)查看”age”大于”21”的数据
1 | scala> df.filter($"age" > 21).show() |
6)按照”age”分组,查看数据条数
1 | scala> df.groupBy("age").count().show() |
创建DataFrame有三种方式:
1 | 4.通过Spark的数据源进行创建; |
RDD转换为DataFrame
注意:如果需要RDD与DF(DataFrame)或者DS(DataSet)之间操作,那么都需要引入 import spark.implicits._ (spark不是包名,而是sparkSession对象的名称)
前置条件:导入隐式转换并创建一个RDD
1 | scala> import spark.implicits._ |
1)通过手动确定转换
1 | scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age") |
2)通过反射确定(需要用到样例类)
(1)创建一个样例类,样例类其实就是和Java中的类一样
1 | scala> case class People(name:String, age:Int) |
(2)根据样例类将RDD转换为DataFrame
1 | scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF |
3)通过编程的方式(了解)
(1)导入所需的类型
1 | scala> import org.apache.spark.sql.types._ |
(2)创建Schema
1 | scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil) |
(3)导入所需的类型
1 | scala> import org.apache.spark.sql.Row |
(4)根据给定的类型创建二元组RDD
1 | scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)} |
(5)根据数据及给定的schema创建DataFrame
1 | scala> val dataFrame = spark.createDataFrame(data, structType) |
DataFrame转换为RDD
直接调用rdd即可
1)创建一个DataFrame
1 | scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") |
2)将DataFrame转换为RDD
1 | scala> val dfToRDD = df.rdd |
3)打印RDD
1 | scala> dfToRDD.collect |
DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
创建
1)创建一个样例类
1 | scala> case class Person(name: String, age: Long) |
2)创建DataSet
1 | scala> val caseClassDS = Seq(Person("Andy", 32)).toDS() |
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。
1)创建一个RDD
1 | scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt") |
2)创建一个样例类
1 | scala> case class Person(name: String, age: Long) |
3)将RDD转化为DataSet
1 | scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS |
DataSet转换为RDD
调用rdd方法即可。
1)创建一个DataSet
1 | scala> val DS = Seq(Person("Andy", 32)).toDS() |
2)将DataSet转换为RDD
1 | scala> DS.rdd |
DataFrame与DataSet的互操作
DataFrame转DataSet
1)创建一个DateFrame
1 | scala> val df = spark.read.json("examples/src/main/resources/people.json") |
2)创建一个样例类
1 | scala> case class Person(name: String, age: Long) |
3)将DataFrame转化为DataSet
1 | scala> df.as[Person] |
Dataset转DataFrame
1)创建一个样例类
1 | scala> case class Person(name: String, age: Long) |
2)创建DataSet
1 | scala> val ds = Seq(Person("Andy", 32)).toDS() |
3)将DataSet转化为DataFrame
1 | scala> val df = ds.toDF |
4)展示
1 | scala> df.show |
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。
RDD、DataFrame和DataSet
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
1 | RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6) |
三者的共性
1 | (1)RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利; |
三者的互相转化
1 | RDD到DataFrame或者到dataSet,以及DataFrame到DataSet,这个过程我们认为是由简单到复杂,需要加东西的过程. 这一套过程都需要用到一个样例类. |
IDEA创建SparkSQL程序
1)添加依赖
1 | <dependency> |
2)代码实现
1 | import org.apache.spark.sql.SparkSession |
用户自定义函数
在Shell窗口中可以通过spark.udf功能用户可以自定义函数。
UDF
1)创建DataFrame
1 | scala> val df = spark.read.json("examples/src/main/resources/people.json") |
2)打印数据
1 | scala> df.show() |
3)注册UDF,功能为在数据前添加字符串
因为自定义函数,最终是在SQL里面去用,所以你得有函数名addName, 后面要有函数如何添加? 相当于你在hive当中自定义的函数.
1 | scala> spark.udf.register("addName", (x:String)=> "Name:"+x) |
4)创建临时表
1 | scala> df.createOrReplaceTempView("people") |
5)应用UDF
1 | scala> spark.sql("Select addName(name), age from people").show() |
UDAF
1 | 自定义UDAF函数,多进一出,聚合函数. |
1)需求:实现求平均工资的自定义聚合函数。
2)代码实现
1 | import org.apache.spark.sql.expressions.MutableAggregationBuffer |
3)函数使用
1 | // 注册函数 |
- 本文作者: xubatian
- 本文链接: http://xubatian.cn/Spark原理与实现-Spark-SQL编程/
- 版权声明: 本博客所有文章除特别声明外均为原创,采用 CC BY 4.0 CN协议 许可协议。转载请注明出处:https://www.xubatian.cn/