Spark SQL 1. 常见概念与基本操作


1. SparkSQL 概述

1.1 Shark

  • Shark 是 Databricks 开发出专门针对于spark的构建大规模数据仓库系统的一个框架

  • Shark 与 Hive 兼容,同时也依赖于Spark版本

  • Shark是把sql语句解析成了Spark任务,Hivesql 底层把 sql 解析成了 mapreduce 程序

  • 随着对 Shark 性能优化的上限,以及集成SQL的一些复杂的分析功能,发现Hive的MapReduce思想限制了Shark的发展

  • 最后 Databricks 公司终止对 Shark 的开发,决定单独开发一个框架,不在依赖hive,把重点转移到了 SparkSQL 这个框架上

1.2 SparkSQL 是什么

Spark SQL is Apache Spark’s module for working with structured data.

SparkSQL是apache Spark用来处理结构化数据的一个模块

1.3 SparkSQL 的特性

  1. 易整合
  • SparkSQL将SQL查询与Spark程序无缝混合
  • SparkSQL 可以使用java、Scala、Python、R 等不同的语言进行代码开发

  1. 统一的数据源访问
1
2
// 以相同的方式连接到任何数据源
val dataFrame = sparkSession.read.文件格式的方法名("该文件格式的路径")

  1. 兼容hive

SparkSQL 可以支持 HiveSql 这种语法

  1. 支持标准的数据库连接

SparkSQL支持标准的数据库连接JDBC或者ODBC

2. DataFrame

2.1 DataFrame的由来

  • DataFrame 前身是 schemaRDD,schemaRDD 是RDD的一个实现类,直接继承自 RDD
  • 在spark1.3.0之后把 schemaRDD 改名为 DataFrame,它不在继承自RDD,而是自己实现RDD上的一些功能
  • DataFrame也可以调用rdd方法将其转换成一个rdd
1
val rdd1=dataFrame.rdd

2.2 DataFrame是什么

  • DataFrame 是Spark中一种 以RDD为基础的分布式数据集,类似于传统数据库的表格
  • DataFrame 带有 Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
  • DataFrame 可以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表
  • DataFrame 可以把内部是一个Row对象,它表示一行一行的数据(RDD可以把它内部元素看成是一个java对象)
  • DataFrame 相比于rdd来说,多了对数据的描述的schema元信息

2.3 DataFrame和RDD的对比

2.3.1 RDD

  • 优点
  1. 编译时类型安全开发会进行类型检查

  2. 在编译的时候及时发现错误 具有面向对象编程的风格

  • 缺点
  1. 构建大量的java对象占用了大量heap堆空间,导致频繁的GC

由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC),程序在进行垃圾回收的过程中,所有的任务都是暂停。影响程序执行的效率

  1. 数据的序列化和反序列性能开销很大

在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输,然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象

2.3.2 DataFrame

  • 优点
  1. DataFrame 引入off-heap(堆外内存) ,大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存,这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高,它是解决了RDD构建大量的java对象占用了大量heap堆空间,导致频繁的GC这个缺点。
  2. DataFrame 引入了schema元信息,就是数据结构的描述信息,后期spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉。这样一来数据网络传输的数据量是有所减少,数据的序列化和反序列性能开销就不是很大了。它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
  • 缺点

DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点

  1. 编译时类型不安全,编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
  2. 不在具有面向对象编程的风格

2.4 构建DataFrame

前提工作

1
2
3
4
5
6
7
8
9
10
11
12
13
# 1. 将数据文件传到服务器上
scp dataFile userName@serverNode:/path

# dataFile 数据文件
# username 登录到服务器的用户名
# serverNode 将数据上传到哪台服务器
# path 保存数据的路径

# 2. 进入 spark Home 目录
cd /opt/spark-2.3.3-bin-hadoop2.7/

# 3. 启动 spark shell
bin/spark-shell --master local[2] --jars /opt/hadoop-2.6.0-cdh5.14.2/share/hadoop/common/hadoop-lzo-0.4.20.jar

2.4.1 通过文本创建 DataFrame

1
2
3
4
5
6
7
8
9
10

val personDF=spark.read.text("file:///sparkdatas/person.txt")
//org.apache.spark.sql.DataFrame = [value: string]

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

  • 将文本转换成对象,再转换成 DataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
//加载数据
val rdd1=sc.textFile("file:///sparkdatas/person.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema
//展示数据
personDF.show

2.4.2 通过JSON文件创建 DataFrame

1
2
3
4
5
6
7
val df1=spark.read.json("file:///sparkdatas/person.json")

//打印schema信息
df1.printSchema

//展示数据
df1.show

2.4.3 通过Parquet创建 DataFrame

1
2
3
4
5
6
7
val df2=spark.read.parquet("file:///sparkdatas/person.parquet")

//打印schema信息
df2.printSchema

//展示数据
df2.show

2.5 DataFrame API

2.5.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.6.0-mr1-cdh5.14.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.3</version>
</dependency>
</dependencies>

2.5.2 常用 API 示例

  • DataFrame API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

//定义 Person 样例类
case class Person(id:String,name:String,age:Int)

object SparkDSL {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkDSL")

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("WARN")
//加载数据
val rdd1=sc.textFile("file:///D:\\datas/person.txt").map(x=>x.split(" "))

//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))

// 隐式转换
import sparkSession.implicits._

//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

//查询指定的字段
personDF.select("name").show
personDF.select($"name").show

//实现age+1
personDF.select($"name",$"age",$"age"+1).show()

//实现age大于30过滤
personDF.filter($"age" > 30).show

//按照age分组统计次数
personDF.groupBy("age").count.show

//按照age分组统计次数降序
personDF.groupBy("age").count().sort($"count".desc).show

sparkSession.stop()
sc.stop()
}
}
  • SQL 风格语法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

//定义 Person 样例类
case class Person(id:String,name:String,age:Int)

//可以把DataFrame注册成一张表,然后通过 sparkSession.sql(sql语句) 操作
object SparkDSL {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkDSL")

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("WARN")
//加载数据
val rdd1=sc.textFile("file:///D:\\datas/person.txt").map(x=>x.split(" "))

//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))

// 隐式转换
import sparkSession.implicits._

//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

//DataFrame注册成表
personDF.createTempView("person")

//使用SparkSession调用sql方法统计查询
spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select name,age from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select age,count(*) from person group by age").show
spark.sql("select age,count(*) as count from person group by age").show
spark.sql("select * from person order by age desc").show

sparkSession.stop()
sc.stop()
}
}

3. DataSet

3.1 DataSet是什么

  • DataSet 是分布式的数据集合,Dataset提供了 强类型支持,也是在RDD的每行数据加了类型约束。
  • DataSet 是在Spark1.6中添加的新的接口,它集中了RDD的优点,强类型和可以用强大lambda函数以及使用了Spark SQL优化的执行引擎。
  • DataSet 包含了 DataFrame 的功能,Spark2.0中两者统一
  • DataFrame 表示为DataSet[Row],即DataSet的子集
  • DataSet可以在编译时检查类型,并且是面向对象的编程接口。

3.2 构建 DataSet

  1. 通过sparkSession调用createDataset方法
1
2
val ds = spark.createDataset(1 to 10) //scala集合
val ds = spark.createDataset(sc.textFile("/person.txt")) //rdd
  1. 使用scala集合和rdd调用toDS方法
1
2
sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS
  1. 把一个DataFrame转换成DataSet
1
val dataSet=dataFrame.as[强类型]
  1. 通过一个DataSet转换生成一个新的DataSet
1
List(1,2,3,4,5).toDS.map(x=>x*10)

3.2 RDD,DataFrame,DataSet的区别与联系

Spark RDD、DataFrame和DataSet是Spark的三类API,DataFrame是spark1.3.0版本提出来的,spark1.6.0版本又引入了DateSet的,但是在spark2.0版本中,DataFrame和DataSet合并为DataSet。

3.2.1 RDD

  • 优点
  1. 相比于传统的MapReduce框架,Spark在RDD中内置很多函数操作,group,map,filter等,方便处理结构化或非结构化数据
  2. 面向对象编程,直接存储的java对象,类型转化也安全
  • 缺点
  1. 由于它基本和hadoop一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于sql来比非常麻烦
  2. 默认采用的是java序列号方式,序列化结果比较大,而且数据存储在java堆内存中,导致gc比较频繁

3.2.2DataFrame

  • 优点
  1. 结构化数据处理非常方便,支持Avro, CSV, elastic search, and Cassandra等kv数据,也支持HIVE tables, MySQL等传统数据表

  2. 有针对性的优化,如采用Kryo序列化,由于数据结构元信息spark已经保存,序列化时不需要带上元信息,大大的减少了序列化大小,而且数据保存在堆外内存中,减少了gc次数,所以运行更快。

  3. hive兼容,支持hql、udf等

  • 缺点
  1. 编译时不能类型转化安全检查,运行时才能确定是否有问题
  2. 对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象

3.2.3 DateSet

  • 优点
  1. DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据
  2. 和RDD一样,支持自定义对象存储
  3. 和DataFrame一样,支持结构化数据的sql查询
  4. 采用堆外内存存储,gc友好
  5. 类型转化安全,代码友好

3.3 RDD,DataFrame,DataSet数据结构

4. DataFrame,DataSet转换

4.1 介绍转换方式

  1. 把一个DataFrame转换成DataSet
1
val dataSet = dataFrame.as[强类型]
  1. 把一个DataSet转换成DataFrame
1
val dataFrame=dataSet.toDF
  1. 可以从dataFrame和dataSet获取得到rdd
1
2
3
val rdd1=dataFrame.rdd

val rdd2=dataSet.rdd

4.2 测试代码

4.2.1 利用反射机制

通常应用在在开发代码之前,是可以先确定好DataFrame的schema元信息的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// 定义一个样例类,后期直接映射成DataFrame的schema信息
case class Person(id:String,name:String,age:Int)

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}

//todo:利用反射机制实现把rdd转成dataFrame
case class Person(id:String,name:String,age:Int)

object CaseClassSchema {
def main(args: Array[String]): Unit = {

//1、构建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()

//2、获取sparkContext对象
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("warn")

//3、读取文件数据
val data: RDD[Array[String]] = sc.textFile("file:///D:\\datas").map(x=>x.split(" "))

//4、定义一个样例类

//5、将rdd与样例类进行关联
val personRDD: RDD[Person] = data.map(x=>Person(x(0),x(1),x(2).toInt))

//6、将rdd转换成dataFrame
//需要手动导入隐式转换
import spark.implicits._
val personDF: DataFrame = personRDD.toDF

//7、对dataFrame进行相应的语法操作
//todo:----------------- DSL风格语法-----------------start
//打印schema
personDF.printSchema()
//展示数据
personDF.show()

//获取第一行数据
val first: Row = personDF.first()
println("first:"+first)

//取出前3位数据
val top3: Array[Row] = personDF.head(3)
top3.foreach(println)

//获取name字段
personDF.select("name").show()
personDF.select($"name").show()
personDF.select(new Column("name")).show()
personDF.select("name","age").show()

//实现age +1
personDF.select($"name",$"age",$"age"+1).show()

//按照age过滤
personDF.filter($"age" >30).show()
val count: Long = personDF.filter($"age" >30).count()
println("count:"+count)

//分组
personDF.groupBy("age").count().show()

personDF.show()
personDF.foreach(row => println(row))

//使用foreach获取每一个row对象中的name字段
personDF.foreach(row =>println(row.getAs[String]("name")))
personDF.foreach(row =>println(row.get(1)))
personDF.foreach(row =>println(row.getString(1)))
personDF.foreach(row =>println(row.getAs[String](1)))
//todo:----------------- DSL风格语法--------------------end


//todo:----------------- SQL风格语法-----------------start
personDF.createTempView("person")
//使用SparkSession调用sql方法统计查询
spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select name,age from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select age,count(*) from person group by age").show
spark.sql("select age,count(*) as count from person group by age").show
spark.sql("select * from person order by age desc").show
//todo:----------------- SQL风格语法----------------------end

//关闭sparkSession对象
spark.stop()
}
}

4.2.2 通过StructType动态指定Schema

通常应用在,在开发代码之前,是无法确定需要的DataFrame对应的schema元信息的情况。需要在开发代码的过程中动态指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

//todo;通过动态指定dataFrame对应的schema信息将rdd转换成dataFrame
object StructTypeSchema {

def main(args: Array[String]): Unit = {
//1、构建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()

//2、获取sparkContext对象
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("warn")

//3、读取文件数据
val data: RDD[Array[String]] = sc.textFile("file:///D:\\datas").map(x=>x.split(" "))

//4、将rdd与Row对象进行关联
val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))

//5、指定dataFrame的schema信息
//这里指定的字段个数和类型必须要跟Row对象保持一致
val schema=StructType(
StructField("id",StringType)::
StructField("name",StringType)::
StructField("age",IntegerType)::Nil
)

val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)
dataFrame.printSchema()
dataFrame.show()

dataFrame.createTempView("user")
spark.sql("select * from user").show()
spark.stop()
}
}

5. SparkSQL 读取与写入数据示例

5.1 SparkSQL读取sql数据

spark sql可以通过 JDBC 从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中

  • 添加mysql连接驱动jar包
1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
  • 代码开发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:利用sparksql加载mysql表中的数据
object DataFromMysql {

def main(args: Array[String]): Unit = {
//1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")

//2、创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

//3、读取mysql表的数据
//3.1 指定mysql连接地址
val url="jdbc:mysql://localhost:3306/mydb?characterEncoding=UTF-8"
//3.2 指定要加载的表名
val tableName="jobdetail"
// 3.3 配置连接数据库的相关属性
val properties = new Properties()

//用户名
properties.setProperty("user","root")
//密码
properties.setProperty("password","123456")

val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)

//打印schema信息
mysqlDF.printSchema()

//展示数据
mysqlDF.show()

//把dataFrame注册成表
mysqlDF.createTempView("job_detail")

spark.sql("select * from job_detail where city = '广东' ").show()

spark.stop()
}
}

5.2 sparkSQL读取CSV文建写入MySQL

使用spark程序读取CSV文件,然后将读取到的数据内容,保存到mysql里面去,注意csv文件的换行问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object CSVOperate {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("sparkCSV")

val session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setLogLevel("WARN")
val frame: DataFrame = session
.read
.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("header", "true")
.option("multiLine", true)
.load("file:///D:\\datas")

frame.createOrReplaceTempView("job_detail")
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "123456")

frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8", "mydb.jobdetail_copy", prop)

}
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录