一、Spark SQL 概述
1.1 什么是Spark SQL
Spark SQL
是 Spark
用来处理结构化数据的一个模块,它提供了2个编程抽象:
DataFrame
和DataSet
,并且作为分布式 SQL 查询引擎的作用。
以 Hive
作为对比,Hive
是将 Hive SQL
转换成 MapReduce
然后提交到集群上执行,大大简化了编写 MapReduce
的程序的复杂性,由于 MapReduce
这种计算模型执行效率比较慢。所有Spark SQL
的应运而生,它是将 Spark SQL
转换成 RDD
,然后提交到集群执行,执行效率非常快。
1.2 Spark SQL 的特点
这里引用 :
① 易整合
② 统一的数据访问方式
③ 兼容Hive
④ 标准的数据连接
1.3 什么是 DataFrame
在 Spark
中,DataFrame
是一种以 RDD
为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame
与 RDD
的主要区别在于,前者带有 schema
元信息,即 DataFrame
所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL
得以洞察更多的结构信息,从而对藏于DataFrame
背后的数据源以及作用于 DataFrame
之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
反观 RDD
,由于无从得知所存二维数据的内部结构,Spark Core
只能在 stage
层面进行简单、通用的流水线优化。
图示
DataFrame
也是懒执行的,但性能上比 RDD
要高,主要原因:
优化的执行计划,即查询计划通过 Spark Catalyst Optimiser
进行优化。比如下面一个例子:
看看 Spark Core
和 Spark SQL
模块对这个计划的执行步骤:
1.4 什么是 DataSet
DataSet
也是分布式数据集合。
DataSet
是 Spark 1.6
中添加的一个新抽象,是 DataFrame
的一个扩展。它提供了 RDD
的优势(强类型,使用强大的 Lambda
函数的能力)以及 Spark SQL
优化执行引擎的优点,DataSet
也可以使用功能性的转换(操作 map
,flatMap
,filter
等等)。
具体的优势如下:
1)是 DataFrame API
的一个扩展,SparkSQL
最新的数据抽象;
2)用户友好的 API
风格,既具有类型安全检查也具有 DataFrame
的查询优化特性;
3)用样例类来对 DataSet
中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet
中的字段名称;
4)DataSet
是强类型的。比如可以有 DataSet[Car]
,DataSet[Person]
二、Spark SQL 编程
2.1 SparkSession
在老的版本中,Spark SQL
提供两种 SQL
查询起始点:一个叫 SQLContext
,用于 Spark
自己提供的 SQL
查询;一个叫 HiveContext
,用于连接 Hive
的查询。
SparkSession
是 Spark
最新的 SQL
查询起始点,实质上是 SQLContext
和 HiveContext
的组合,所以在 SQLContex
和 HiveContext
上可用的 API
在 SparkSession
上同样是可以使用的。
SparkSession
内部封装了 SparkContext
,所以计算实际上是由 SparkContext
完成的。
2.2 DataFrame
1. 创建
在 Spark SQL
中 SparkSession
是创建 DataFrame
和执行 SQL
的入口,创建 DataFrame
有三种方式
- 通过
Spark
的数据源进行创建; - 从一个存在的
RDD
进行转换; - 还可以从
Hive Table
进行查询返回
通过 Spark 的数据源进行创建
- 查看
Spark
数据源进行创建的文件格式
- 读取官网提供的
json
文件创建DataFrame
- 从
RDD
转换(详见 2.5 节) - 从
Hive Table
转换(详见 3.3节)
2. SQL 风格语法(主要)
直接通过 SQL
语句对 DataFrame
的数据进行操作
- 创建一个
DataFrame
- 对
DataFrame
创建一个临时表
创建临时表的三种方式
- 通过
SQL
语句实现查询全表
注意:普通临时表是 Session
范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
- 对于
DataFrame
创建一个全局表
df.createGlobalTempView("people")复制代码
- 通过
SQL
语句实现查询全表
spark.sql("SELECT * FROM global_temp.people").show()复制代码
spark.newSession().sql("SELECT * FROM global_temp.people").show()复制代码
以上两行代码的执行效果一致~
3. DSL 风格语法(次要)
使用更为简洁的语法对 DataFrame
的数据操作
-
创建一个
DataFrame
(同上) -
查看
DataFrame
的Schema
信息
- 只查看
name
列数据
- 查看
name
列数据以及age+1
数据
- 查看
age
大于21
的数据
- 按照
age
分组,查看数据条数
个人感觉简单的操作可以使用 DSL
,复杂查询再使用 SQL
是一个很不错的方案
注意:DSL
方法由 DataFrame
调用,而 SQL
由 SparkSession
调用
4. RDD 转换为 DateFrame
注意:如果需要 RDD
与 DF
或者 DS
之间操作,那么都需要引入 import spark.implicits._
【spark不是包名,而是 SparkSession
对象的名称】
前置条件:导入隐式转换并创建一个 RDD
- 通过手动确定转换
-
通过反射确定(需要用到样例类)
- 创建一个样例类
case class People(name:String, age:Int)复制代码
- 根据样例类将
RDD
转换为DataFrame
- 通过编程方式(了解)
- 导入所需的类型
import org.apache.spark.sql.types._复制代码
- 创建
Schema
val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)复制代码
- 导入所需的类型
import org.apache.spark.sql.Row复制代码
- 根据给定的类型创建二元组
RDD
val data = rdd.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}复制代码
- 根据数据及给定的
schema
创建DataFrame
val dataFrame = spark.createDataFrame(data, structType)复制代码
5. DateFrame 转换为 RDD
2.3 DataSet
DataSet
是具有强类型的数据集合,需要提供对应的类型信息。
DataSet
的创建可以直接使用 Seq
静态方法创建 或者 RDD
转换 或者 DataFrame
转换
1. 创建
- 创建一个样例类
case class Person(name: String, age: Long)复制代码
- 创建
DataSet
2. RDD 转换为 DataSet
Spark SQL
能够自动将包含有 case
类的 RDD
转换成 DataFrame
,case
类定义了 table
的结构,case
类属性通过反射变成了表的列名。case
类可以包含诸如 Seqs
或者 Array
等复杂的结构。
- 创建一个
RDD
- 创建一个样例类
case class Person(name: String, age: Int)复制代码
- 将
RDD
转化为DataSet
3. DataSet 转换为 RDD
2.4 DataFrame与DataSet的互操作
1. DataFrame 转 Dataset
- 创建一个
DateFrame
- 创建一个样例类并转换
2. Dataset 转 DataFrame
- 创建一个样例类(同上)
- 创建
DataSet
val ds = Seq(Person("Andy", 32)).toDS()复制代码
- 将
DataSet
转化为DataFrame
val df = ds.toDF复制代码
使用 as
方法,转成 Dataset
,这在数据类型是 DataFrame
又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._
不然 toDF
、toDS
无法使用。
2.5 RDD,DataFrame,DataSet
在 Spark SQL
中 Spark
为我们提供了两个新的抽象,分别是 DataFrame
和 DataSet
。他们和 RDD
有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark
版本中,DataSet
有可能会逐步取代 RDD
和 DataFrame
成为唯一的 API
接口。
1. 三者的共性
(1)RDD
、DataFrame
、Dataset
全都是 spark
平台下的分布式弹性数据集,为处理超大型数据提供便利;
(2)三者都有惰性机制,在进行创建、转换,如 map
方法时,不会立即执行,只有在遇到 Action
如 foreach
时,三者才会开始遍历运算;
(3)三者有许多共同的函数,如 filter
,排序
等;
(4)在对 DataFrame
和 Dataset
进行操作许多操作都需要这个包:import spark.implicits._
(在创建好 SparkSession
对象后尽量直接导入)
这里给出关于这三者讲解比较深入的
2. 三者的转换
2.6 IDEA 创建 Spark SQL 程序
通过一个简单的案例快速入手如何在 IDEA
上开发 Spark SQL
程序
导入以下依赖
复制代码 org.apache.spark spark-sql_2.11 2.1.1
代码实现
object Main2 { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate() import session.implicits._ val dataFrame: DataFrame = session.read.json("/home/cris/people.json") //打印 dataFrame.show() //DSL风格:查询年龄在21岁以上的 dataFrame.filter($"age" > 21).show() //创建临时表 dataFrame.createOrReplaceTempView("persons") //SQL风格:查询年龄在21岁以上的 session.sql("SELECT * FROM persons where age > 21").show() //关闭连接 session.stop() }}复制代码
无法找到主类
如果在执行 Scala
或者是 java
程序中,报无法找到主类执行的异常,可能是项目的结构有问题,将父模块直接移除掉,然后重新导入父模块即可
2.7 用户自定义函数
1. 用户自定义 UDF 函数
object MyFunc { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate() val dataFrame: DataFrame = session.read.json("/home/cris/people.json") /*用户自定义 UDF 函数*/ session.udf.register("addName", (x: String) => { "cool:" + x }) dataFrame.createOrReplaceTempView("people") session.sql("select addName(name),age from people").show() session.stop() }}复制代码
结果如下
2. 用户自定义 UDAF 函数
强类型的 Dataset
和弱类型的 DataFrame
都提供了相关的聚合函数, 如 count()
,countDistinct()
,avg()
,max()
,min()
。
除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction
来实现用户自定义聚合函数
/** * 定义自己的 UDAF 函数 * * @author cris * @version 1.0 **/object MyFunc extends UserDefinedAggregateFunction { // 聚合函数输入参数的数据类型 override def inputSchema: StructType = StructType(StructField("inputField", LongType) :: Nil) // 聚合缓冲区中值得数据类型 override def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // 返回值的数据类型 override def dataType: DataType = DoubleType // 对于相同的输入是否一直返回相同的输出 override def deterministic: Boolean = true // 初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { // 工资的总额 buffer(0) = 0L // 员工人数 buffer(1) = 0L } // 相同 Executor 间的数据合并 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } // 不同 Executor 间的数据合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 最终函数计算的返回值 override def evaluate(buffer: Row): Double = { buffer.getLong(0).toDouble / buffer.getLong(1) }}复制代码
测试代码
object MyFuncTest2 { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate() val dataFrame: DataFrame = session.read.json("/home/cris/employees.json") session.udf.register("avg", MyFunc) dataFrame.createTempView("emp") session.sql("select avg(salary) as avg_sal from emp").show() session.stop() }}复制代码
测试如下
三、Spark SQL 数据的加载与保存
3.1 通用加载/保存方法
1. 加载数据
- 通过
read
方法直接加载数据
scala> spark.read.csv jdbc json orc parquet textFile… …复制代码
注意:加载数据的相关参数需写到上述方法中。如:textFile
需传入加载数据的路径,jdbc
需传入 JDBC
相关参数
format
方法(了解)
scala> spark.read.format("…")[.option("…")].load("…")复制代码
用法详解:
(1)format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
2. 保存数据
write
直接保存数据
scala> df.write.csv jdbc json orc parquet textFile… …复制代码
注意:保存数据的相关参数需写到上述方法中。如:textFile
需传入加载数据的路径,jdbc
需传入 JDBC
相关参数
format
指定保存数据类型(了解)
scala> df.write.format("…")[.option("…")].save("…")复制代码
用法详解:
(1)format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
3. 最佳示例代码
object Main2 { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate() val dataFrame: DataFrame = session.read.json("/home/cris/people.json") //创建临时表 dataFrame.createOrReplaceTempView("persons") //SQL风格:查询年龄在21岁以上的 val frame: DataFrame = session.sql("SELECT * FROM persons where age > 21") frame.show() frame.write.json("/home/cris/output") //关闭连接 session.stop() }}复制代码
执行效果
4. 文件保存选项
可以采用SaveMode
执行存储操作,SaveMode
定义了对数据的处理模式。SaveMode
是一个枚举类,其中的常量包括:
(1)Append
:当保存路径或者表已存在时,追加内容;
(2)Overwrite
: 当保存路径或者表已存在时,覆写内容;
(3)ErrorIfExists
:当保存路径或者表已存在时,报错;
(4)Ignore
:当保存路径或者表已存在时,忽略当前的保存操作
使用如下
df.write.mode(SaveMode.Append).save("… …")复制代码
记得保存选项放在 save
操作之前执行
5. 默认数据源
Spark SQL
的默认数据源为 Parquet
格式。数据源为 Parquet
文件时,Spark SQL
可以方便的执行所有的操作。修改配置项 spark.sql.sources.default
,可修改默认数据源格式。
- 加载数据
val df = spark.read.load("./examples/src/main/resources/users.parquet")复制代码
- 保存数据
df.select("name", " color").write.save("user.parquet")复制代码
3.2 JSON 文件
Spark SQL
能够自动推测 JSON
数据集的结构,并将它加载为一个 Dataset[Row]
. 可以通过 SparkSession.read.json()
去加载一个 一个 JSON
文件。
注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式如下:
{ "name":"Michael"}{ "name":"Andy", "age":30}{ "name":"Justin", "age":19}复制代码
Spark-Shell
操作如下:
- 导入隐式转换
import spark.implicits._复制代码
- 加载
JSON
文件
val path = "examples/src/main/resources/people.json"val peopleDF = spark.read.json(path)复制代码
- 创建临时表
peopleDF.createOrReplaceTempView("people")复制代码
- 数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()+------+| name|+------+|Justin|+------+复制代码
3.3 MySQL
Spark SQL
可以通过 JDBC
从关系型数据库中读取数据的方式创建 DataFrame
,通过对 DataFrame
一系列的计算后,还可以将数据再写回关系型数据库中。
可在启动 shell
时指定相关的数据库驱动路径,或者将相关的数据库驱动放到 Spark
的类路径下(推荐)。
以 Spark-Shell 为例
- 启动
Spark-Shell
[cris@hadoop101 spark-local]$ bin/spark-shell --master spark://hadoop101:7077 [--jars mysql-connector-java-5.1.27-bin.jar]复制代码
建议将 MySQL
的驱动直接放入到 Spark
的类(jars
)路径下,就不用每次进入 Spark-Shell
带上 --jar
参数了
- 定义
JDBC
相关参数配置信息
val connectionProperties = new Properties()connectionProperties.put("user", "root")connectionProperties.put("password", "000000")复制代码
- 使用
read.jdbc
加载参数
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)复制代码
- 或者使用
format
形式加载配置参数(不推荐)
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark").option("dbtable", " person").option("user", "root").option("password", "000000").load()复制代码
- 使用
write.jdbc
保存数据(可以使用文件保存选项)
jdbcDF2.write.mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)复制代码
- 或者使用
format
形式保存数据(不推荐)
jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark").option("dbtable", "person").option("user", "root").option("password", "000000").save()复制代码
以 IDEA 操作为例
pom.xml
导入MySQL
驱动依赖
复制代码 mysql mysql-connector-java 5.1.47
MySQL
表数据
IDEA
操作代码如下
/** * IDEA 测试 Spark SQL 连接远程的 MySQL 获取数据和写入数据 * * @author cris * @version 1.0 **/object MysqlTest { def main(args: Array[String]): Unit = { // 获取 SparkSession val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate() // 设置配置参数 val properties = new Properties() properties.put("user", "root") properties.put("password", "000000") // 从 MySQL 获取数据,show() 方法实际调用的是 show(20),默认显示 20 行数据 val dataFrame: DataFrame = session.read.jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties) dataFrame.show() // 修改并保存数据到 MySQL dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties) session.stop() }}复制代码
注意:防止中文乱码,url
加上 ?characterEncoding=UTF-8
;写入数据最好指定保存模式 SaveMode
测试如下:
3.4 Hive
Apache Hive
是 Hadoop
上的 SQL
引擎,Spark SQL
编译时可以包含 Hive
支持,也可以不包含。包含 Hive
支持的 Spark SQL
可以支持 Hive
表访问、UDF
(用户自定义函数)以及 Hive
查询语言(HQL
)等。Spark-Shell
默认是Hive
支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
内置 Hive (了解)
如果要使用内嵌的 Hive
,直接用就可以了。
- 简单创建表
指定路径下就会生成该表的文件夹
- 导入文件为表数据
在当前 Spark-local
路径下,创建文件 bb
12345复制代码
然后创建表,导入数据
查询也没有问题
对应目录下也生成了 bb
表的文件夹
外置 Hive(重要)
如果想连接外部已经部署好的 Hive
,需要通过以下几个步骤:
- 将
Hive
中的hive-site.xml
拷贝或者软连接到Spark
安装目录下的conf
目录下
[cris@hadoop101 spark-local]$ cp /opt/module/hive-1.2.1/conf/hive-site.xml ./conf/复制代码
- 将
JDBC
的驱动包放置在Spark
的.jars
目录下,启动Spark-Shell
[cris@hadoop101 spark-local]$ cp /opt/module/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./jars/复制代码
可以通过 Hive
的客户端创建一张表 users
hive> create table users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';复制代码
并导入数据
hive> load data local inpath './user.txt' into users;复制代码
此时 HDFS
显示数据导入成功
在 Spark-Shell
窗口查看
执行 Hive
的查询语句
可以在 Spark-Shell
执行所有的 Hive
语句,并且执行流程走的是 Spark
,而不是 MapReduce
运行Spark SQL CLI
Spark SQL CLI
可以很方便的在本地运行 Hive
元数据服务以及从命令行执行查询任务。在 Spark
目录下执行如下命令启动 Spark SQL CLI
,直接执行 SQL
语句,类似一个 Hive
窗口。
[cris@hadoop101 spark-local]$ bin/spark-sql复制代码
如果使用这种模式进行测试,最好将 log4j
的日志级别设置为 error
,否则会有很多 info
级别的日志输出
IDEA 测试 Spark 和 Hive 配合(重要)
首先 pom.xml
引入 Hive
依赖
org.apache.spark spark-hive_2.11 2.1.1 复制代码 org.apache.hive hive-exec 1.2.1
然后将 Hive
的配置文件 hive-site.xml
放入 resource
路径下
hive-site.xml
复制代码 javax.jdo.option.ConnectionURL jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName root username to use against metastore database javax.jdo.option.ConnectionPassword 000000 password to use against metastore database hive.cli.print.header true hive.cli.print.current.db true hive.zookeeper.quorum hadoop101,hadoop102,hadoop103 The list of ZooKeeper servers to talk to. This is only needed for read/write locks. hive.zookeeper.client.port 2181 The port of ZooKeeper servers to talk to. This is only needed for read/write locks.
具体的配置介绍这里不再赘述,可以参考我的 Hive 笔记
测试代码如下:
/** * IDEA 测试 Spark SQL 和 Hive 的联动 * * @author cris * @version 1.0 **/object HiveTest { def main(args: Array[String]): Unit = { // 注意开启 enableHiveSupport val session: SparkSession = SparkSession.builder().enableHiveSupport().appName("spark sql").master("local[*]") .getOrCreate() session.sql("show tables").show() // 注意关闭 session 连接 session.stop() }}复制代码
执行结果如下
正好就是刚才创建的 Hive
表
IDEA 自动换行设置
Cris
的 IDEA
设置一行字数最多 120
,否则就自动换行,大大提高阅读的舒适感和编码的规范性
Deepin 的 Terminal 右键复制
因为 Cris
使用的是 Linux
桌面系统 Deepin
,所以经常使用自带的 Terminal
连接远程服务器,这里给出快速右键复制 Terminal
内容的设置
Typora 的快捷键自定义设置
因为 Cris
之前使用的是 MacBook
,输入法搜狗会很智能的为输入的英文进行前后空格分割,换了 Deepin
后,自带的虽然也是搜狗输入法,但是没有对英文自动空格分割的功能了,后面想想办法,看怎么解决~
因为要对英文和重要内容进行突出显示,Typora
中设置 code
的快捷键默认为 Ctrl+Shift+`,比较麻烦,网上找了找自定义快捷键的设置,最后设置成 Ctrl+C