Spark SQL 5.调优


1. 数据缓存

性能调优主要是将数据放入内存中操作,spark缓存注册表的方法

  • 缓存spark表
spark.catalog.cacheTable("tableName")
  • 释放缓存表
spark.catalog.uncacheTable("tableName")

2. 性能优化相关参数

Sparksql仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力。

属性 默认值 描述
spark.sql.inMemoryColumnarStorage.compressed true Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。
spark.sql.files.maxPartitionBytes 128 MB 读取文件时单个分区可容纳的最大字节数(不过不推荐手动修改,可能在后续版本自动的自适应修改)
spark.sql.files.openCostInBytes 4M 打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。

3. 表数据广播

在进行表join的时候,将小表广播可以提高性能,spark2.+ 中可以调整以下参数:

属性 默认值 描述
spark.sql.broadcastTimeout 300 广播等待超时时间,单位秒
spark.sql.autoBroadcastJoinThreshold 10M 用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。
注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

4. 分区数的控制

spark任务并行度的设置中,spark有两个参数可以设置

属性 默认值 描述
spark.sql.shuffle.partitions 200 用于配置 join 或aggregate shuffle数据时使用的分区数。
spark.default.parallelism 对于分布式shuffle操作像reduceByKey和join,父RDD中分区的最大数目。对于无父RDD的并行化等操作,它取决于群集管理器:
-本地模式:本地计算机上的核心数-Mesos fine grained mode:8-其他:
所有执行节点上的核心总数或2,以较大者为准
分布式shuffle操作的分区数

看起来它们的定义似乎也很相似,但在实际测试中,

  • spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
  • spark.sql.shuffle.partitions则是对sparks SQL专用的设置

5. 文件与分区

这个总共有两个参数可以调整:

  • 读取文件的时候一个分区接受多少数据;
  • 文件打开的开销,通俗理解就是小文件合并的阈值。

文件打开是有开销的,开销的衡量,Spark 采用了一个比较好的方式就是打开文件的开销用,相同时间能扫描的数据的字节数来衡量。

参数介绍如下:

属性 默认值 描述
spark.sql.files.maxPartitionBytes 134217728
(128 MB)
打包传入一个分区的最大字节,在读取文件的时候
spark.sql.files.openCostInBytes 4194304
(4 MB)
用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度)

spark.sql.files.maxPartitionBytes 该值的调整要结合你想要的并发度及内存的大小来进行。

spark.sql.files.openCostInBytes 合并小文件的阈值,小于这个阈值的文件将会合并

6. 数据的本地性

分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。

先来看看一个 stage 里所有 task 运行的一些性能指标,其中的一些说明:

  • Scheduler Delay : spark 分配 task 所花费的时间
  • Executor Computing Time : executor 执行 task 所花费的时间
  • Getting Result Time : 获取 task 执行结果所花费的时间
  • Result Serialization Time : task 执行结果序列化时间
  • Task Deserialization Time : task 反序列化时间
  • Shuffle Write Time : shuffle 写数据时间
  • Shuffle Read Time : shuffle 读数据所花费时间

​ 下面是spark webUI监控Stage的一个图:

  • PROCESS_LOCAL是指读取缓存在本地节点的数据
  • NODE_LOCAL是指读取本地节点硬盘数据
  • ANY是指读取非本地节点数据
  • 通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。

7. sparkSQL参数调优总结

  1. 下列Hive参数对Spark同样起作用
# 是否允许动态生成分区
set hive.exec.dynamic.partition=true; 
# 是否容忍指定分区全部动态生成
set hive.exec.dynamic.partition.mode=nonstrict; 
# 动态生成的最多分区数
set hive.exec.max.dynamic.partitions = 100; 
  1. 运行行为
# 大表 JOIN 小表,小表做广播的阈值
set spark.sql.autoBroadcastJoinThreshold; 
# 开启动态资源分配
set spark.dynamicAllocation.enabled; 
# 开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.maxExecutors; 
# 开启动态资源分配后,最少可分配的Executor数
set spark.dynamicAllocation.minExecutors; 
# 需要shuffle是mapper端写出的partition个数
set spark.sql.shuffle.partitions; 
# 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.enabled; 
# 开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; 
# 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.sql.adaptive.minNumPostShufflePartitions; 
# 当几个stripe的大小大于该值时,会合并到一个task中处理
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; 
  1. executor能力
# executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.executor.memory; 
# Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.yarn.executor.memoryOverhead; 
# 当用户的SQL中包含窗口函数时,不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.sql.windowExec.buffer.spill.threshold; 
# 单个executor上可以同时运行的task数
set spark.executor.cores; 

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark SQL 6.资源动态划分 Spark SQL 6.资源动态划分
1. spark的动态资源划分 动态资源划分,是指在spark当中用于对计算的时候资源,如果不够或者资源剩余的情况下进行动态的资源划分,以求资源的利用率达到最大 Spark中的资源单位一般指的是executors,和Yarn中的Conta
2018-06-23
下一篇 
Spark SQL 4.架构介绍 Spark SQL 4.架构介绍
1. SparkSQL架构介绍 SparkSQL 是spark技术栈当中又一非常实用的模块, SparkSQL 通过引入SQL的支持,大大降低了学习成本,让我们开发人员直接使用SQL的方式就能够实现大数据的开发, SparkSQL 同时支持
2018-06-05
  目录