1. 数据缓存
性能调优主要是将数据放入内存中操作,spark缓存注册表的方法
- 缓存spark表
spark.catalog.cacheTable("tableName")
- 释放缓存表
spark.catalog.uncacheTable("tableName")
2. 性能优化相关参数
Sparksql仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力。
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。 |
spark.sql.files.maxPartitionBytes | 128 MB | 读取文件时单个分区可容纳的最大字节数(不过不推荐手动修改,可能在后续版本自动的自适应修改) |
spark.sql.files.openCostInBytes | 4M | 打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。 |
3. 表数据广播
在进行表join的时候,将小表广播可以提高性能,spark2.+ 中可以调整以下参数:
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.broadcastTimeout | 300 | 广播等待超时时间,单位秒 |
spark.sql.autoBroadcastJoinThreshold | 10M | 用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。 注意,当前数据统计仅支持已经运行了 ANALYZE TABLE |
4. 分区数的控制
spark任务并行度的设置中,spark有两个参数可以设置
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.shuffle.partitions | 200 | 用于配置 join 或aggregate shuffle数据时使用的分区数。 |
spark.default.parallelism | 对于分布式shuffle操作像reduceByKey和join,父RDD中分区的最大数目。对于无父RDD的并行化等操作,它取决于群集管理器: -本地模式:本地计算机上的核心数-Mesos fine grained mode:8-其他: 所有执行节点上的核心总数或2,以较大者为准 |
分布式shuffle操作的分区数 |
看起来它们的定义似乎也很相似,但在实际测试中,
- spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
- spark.sql.shuffle.partitions则是对sparks SQL专用的设置
5. 文件与分区
这个总共有两个参数可以调整:
- 读取文件的时候一个分区接受多少数据;
- 文件打开的开销,通俗理解就是小文件合并的阈值。
文件打开是有开销的,开销的衡量,Spark 采用了一个比较好的方式就是打开文件的开销用,相同时间能扫描的数据的字节数来衡量。
参数介绍如下:
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) |
打包传入一个分区的最大字节,在读取文件的时候 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) |
用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度) |
spark.sql.files.maxPartitionBytes 该值的调整要结合你想要的并发度及内存的大小来进行。
spark.sql.files.openCostInBytes 合并小文件的阈值,小于这个阈值的文件将会合并
6. 数据的本地性
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
先来看看一个 stage 里所有 task 运行的一些性能指标,其中的一些说明:
Scheduler Delay
: spark 分配 task 所花费的时间Executor Computing Time
: executor 执行 task 所花费的时间Getting Result Time
: 获取 task 执行结果所花费的时间Result Serialization Time
: task 执行结果序列化时间Task Deserialization Time
: task 反序列化时间Shuffle Write Time
: shuffle 写数据时间Shuffle Read Time
: shuffle 读数据所花费时间
下面是spark webUI监控Stage的一个图:
- PROCESS_LOCAL是指读取缓存在本地节点的数据
- NODE_LOCAL是指读取本地节点硬盘数据
- ANY是指读取非本地节点数据
- 通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。
7. sparkSQL参数调优总结
- 下列Hive参数对Spark同样起作用
# 是否允许动态生成分区
set hive.exec.dynamic.partition=true;
# 是否容忍指定分区全部动态生成
set hive.exec.dynamic.partition.mode=nonstrict;
# 动态生成的最多分区数
set hive.exec.max.dynamic.partitions = 100;
- 运行行为
# 大表 JOIN 小表,小表做广播的阈值
set spark.sql.autoBroadcastJoinThreshold;
# 开启动态资源分配
set spark.dynamicAllocation.enabled;
# 开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.maxExecutors;
# 开启动态资源分配后,最少可分配的Executor数
set spark.dynamicAllocation.minExecutors;
# 需要shuffle是mapper端写出的partition个数
set spark.sql.shuffle.partitions;
# 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.enabled;
# 开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize;
# 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.sql.adaptive.minNumPostShufflePartitions;
# 当几个stripe的大小大于该值时,会合并到一个task中处理
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;
- executor能力
# executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.executor.memory;
# Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.yarn.executor.memoryOverhead;
# 当用户的SQL中包含窗口函数时,不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.sql.windowExec.buffer.spill.threshold;
# 单个executor上可以同时运行的task数
set spark.executor.cores;