Spark 常用参数解析


1. shuffle 相关参数

1.1 spark.shuffle.manager

  • spark 1.2 官方支持两种shuffle ,即 HashBasedShuffleSortBasedShuffle
  • spark1.0之前仅支持 HashBasedShuffle.
  • spark 1.1的时候引入了sortBasedShuffle.
  • Spark 1.2的默认shuffle机制从hash变成Sort.
  • 如果需要HashBasedShuffle,可以将 spark.shuffle.manager 设置成 “hash” 即可

如果对性能有比较苛刻的要求,那么就要理解这两种不同的shuffle机制的原理,结合具体的应用场景进行选择。

HashBasedShuffle: 就是将数据根据Hash的结果,将各个Reducer Partition的数据写到单独的文件中,写数据时不会有排序的操作,这个问题就是如果Reducer的partition比较多的时候,会产生大量的磁盘文件. 这会带来两个问题,

  1. 同时打开的文件比较多,那么大量的文件句柄和写操作的临时内存会非常大,对于内存的使用和GC都带来了很多的压力,尤其是spark on yarn 的模式下,Executor 分配内存普遍比较小的时候,这个问题会更严重
  2. 从整体来看,这些文件带来大量的随机读操作,读性能可能会遇到瓶颈

SortBasedShuffle: 会根据实际情况对数据采用不同的方式进行sort. 这个排序可能仅仅是按照Reducer的partition 进行排序,保证同一个Shuffle Map Task 对应不同的Reducer 的partition是的数据都可以写到同一个文件,通过一个 OffSet来标记不同的Reducer Partition的分界,因此一个Shuffle map Task 仅会生成一个数据文件(还有一个index索引文件 ),从而避免了 HashBasedShuffle 文件数量过多的问题.

选择HashBasedShuffle 还是 SortBasedShuffle 取决于内存,排序和文件操作等因素的综合影响.

对于不需要进行排序的shuffle而且shuffle产生的文件数量不是特别多,HashBasedShuffle 可能是更好的选择,毕竟SortBasedShuffle至少会按照Reducer的Partition进行排序,

而SortBasedShuffle的优势就在于scalability(可伸缩性),它的出现实际上很大程度上是解决了HashBasedShuffle的scalaBility的问题,由于SortBasedShuffle还在不断演进中,因此SortBasedShuffle 的性能会得到不断的改善.

对选择哪种Shuffle,如果对于性能要求苛刻,最好还是通过实际的场景中测试后再决定,不过选择默认的SortBasedShuffle 可以满足大部分的场景需求

1.2 spark.shuffle.spill

  • 默认:true

在shuffle的过程中,如果涉及到排序、聚合等操作,势必会需要在内存中维护一些数据结构,进而占用额外的内存. 如果内存不够用怎么办,那只有两条路

  1. out of memory 出错了,
  2. 将部分数据临时写入到外部存储中去,最后再合并到最终的shuffle输出文件中去

这里的spark.shuffle.spill 就是决定是否将数据spill 到外部设备(- 默认:打开),如果内存足够使用,或者数据集足够小,可以不需要spill ,毕竟spill 带来了额外的磁盘操作

这个参数的默认值是 true,用于指定shuffle过程中如果内存中的数据超过阈值(参考 Spark.shuffle.memoryFraction的设置 ),那么是否需要将部分数据临时写入外部存储,如果设置为false,那么这个过程就会一直使用内存,会有out of memory 的风险. 因此只有在确定内存足够使用时,才可以将这个选项设置为false.

HashBasedShuffle 的 shuffle write 过程中使用的 org.apache.spark.util.collect.AppendOnlyMap 就是全内存方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap 对 AppendOnlyMap 有了进一步的封装,在内存使用超过阈值时,会将它spill 到外部存储,在最后的时候会将这些临时文件merge.

SortBasedShuffle 使用到的 org.apache.spark.util.collection.ExternalSorter 也会有类似的spill

ShuffleRead如果需要做Aggregate(聚合),也可能在Aggregate 的过程中将数据spill 到外部存储

1.3 spark.shuffle.memoryFraction 和 spark.shuffle.safetyFraction

  • spark.shuffle.memoryFraction:在启用spill的情况下,spark.shuffle.memoryFraction( 1.1后- 默认:为0.2 )决定了当shuffle过程中使用的内存达到多少比例的时候开始spill(就是executor - 默认:只有20% 的内存用来进行该操作,shuffle 操作在进行聚合时,如果发现使用的内存超过了20%的限制,那么多余的数据就会写入到磁盘中) ,通过spark.shuffle.memoryFraction 可以调整Spill的出发条件,即 shuffle占用内存的大小,进而调整spill 和 GC 的行为.

  • 如果spill太过频繁,可以适当增加spark.shuffle.memoryFraction 的大小,增加shuffle过程的可用内存数,进而减少spill的频率,

  • 为了避免内存溢出(OOM) 可能需要减少RDD cache 占用的内存,即 减少 spark.storage.memoryFraction 的值,但是减少RDD cache 所用的内存可能会带来其他的影响,因此需要综合考量

  • spark.shuffle.safetyFraction :在shuffle过程中,shuffle 占用的内存是估算出来的,并不是每次新增的数据项都会计算一次占用内存的大小,(这样做是为了减低时间的开销) ,但是在估算的时候回存在误差,因此存在实际使用内存比估算值大的情况,因此参数spark.shuffle.safetyFranction(- 默认:0.8) 作为一个保险参数减低实际shuffle过程需要的内存值,增加一定的缓冲,降低实际占用内存超过用户配置值的概率

1.4 spark.shuffle.sort.bypassMergeThreshold

  • 默认: 200

这个参数仅适用于SortShuffleManager,SortShuffleManager 在处理不需要排序的shuffle操作时,排序会带来性能的下降,这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager的内部便不适用Merge Sort的方式处理数据,而是使用与HashShuffle类似的方式,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件,并通过一个index索引未见来标记不同的partition的位置信息,这样通过去除Sort步骤来加快处理速度,代价是需要并打打开多个文件,所以内存消耗增加,本质上市相对HashShuffleManager的一个折中的方案,如果GC比较严重或者内存比较紧张可以适当减低这个值

1.5 spark.shuffle.blockTransferService

在Spark 1.2.0,这个配置的- 默认:值是netty,而之前是nio。这个主要是用于在各个Executor之间传输Shuffle数据。Netty的实现更加简洁,但实际上用户不用太关心这个选项。除非是有特殊的需求,否则采用- 默认:配置就可以。

1.6 spark.shuffle.consolidateFiles

  • 默认:false

仅适用于HashShuffleManager的实现,主要是结局在HashBasedShuffle的过程中产生过多的文件的问题,如果配置选项为true,那么对于同一个Core上运行的不同批次的ShuffleMapTask 不会产生新的shuffle文件 而是重用原来shuffle输出文件,但是每个ShuffleMapTask还是需要产生下游Task数量的文件,因为对内存使用量的减少并没有帮助,只是HashSHuffleManager里的一个折中的解决方案

中部分的代码实现原理上尽管很简单,但是设计到底层具体的文件系统的实现和限制等因素,例如在并发访问等方面,需要处理的细节很多,因此一直存在着这样那样的bug,例如在ext3上使用时,特定情况下性能范围可能下降,因此从Spark0.8 的代码开始,一直到Spark1.1 的代码为止也没有被标识为Stable,不是- 默认:采用的方式,此外因为并不减少同时打开的文件输出数量,因为对性能具体能带来多大的改善也取决于具体的文件数量的情况,所以即使你面临着shuffle文件数量巨大的问题,配置这个参数是否使用,在什么版本中可以使用,也最好还是实际测试以后再做决定

1.7 spark.shuffle.service.enabled

  • 默认: false

激活外部shuffle服务。服务维护executor写的文件,因而executor可以被安全移除。需要设置spark.dynamicAllocation.enabled 为true,同事指定外部shuffle服务

1.8 spark.shuffle.compress 和 spark.shuffle.spill.compress

默认:都是true

都是用来设置Shuffle过程中是否使用压缩算法对Shuffle数据进行压缩;

spark.shuffle.compress 针对最终写入本地文件系统的输出文件进行压缩

spark.shuffle.spill.compress 针对在处理过程中需要spill到外部存储的中间数据进行压缩

如何设置spark.shuffle.compress?

理论上说,spark.shuffle.compress设置为true通常是合理的,因为如果使用千兆以下的王克,网络带宽往往最容易成为瓶颈,如果下游的task通过网络获取上游shuffle map task 的结果的网络io 成为瓶颈时,那么就需要考虑将它设置为true,通过压缩数据来减少网络io,由于上游ShuffleMapTask和下游的task现阶段是不会并行处理的,也就是说上游ShuffleMapTask处理完成后,下游的task才会执行,因此如果需要压缩的时间消耗就是shuffleMapTask压缩数据的时间 + 网络传出的时间+ 下游task解压的时间,如果不需要压缩的时间消耗仅仅是网络传输的时间,因此需要评估压缩数据带来的时间消耗和因为数据压缩带来的时间节省,如果网络成为瓶颈,比如集群普遍使用的是千兆网络,那么可能将这个选项设置为true是合理的,同时压缩也是需要消耗大量的cpu资源的,所以打开压缩选项会增加map任务的执行时间,因为如果在cpu负载的影响远远大于磁盘和网络影响的场合下,也可能将spark.shuffle.compress设置为false才是最佳的方案

如何设置spark.shuffle.spill.compress?

如果spark.shuffle.spill.compress 设置为true,表示shuffle处理的中间结果在spill到本地磁盘时都会进行压缩,在将中间结果取回进行merge的时候要进行解压,因此需要综合考虑cpu成本,在引入压缩解压的消耗时间和disk io因为压缩带来的节省时间的比较,在disk io成为瓶颈的场景下,这个被设置为true可能比较合适,如果本地硬盘是ssd ,那么设置为false可能比较合适

总而言之,Shuffle过程中数据是否应该压缩,取决于 CPU,DISK,NETWORK 的实际能力和负载,应该综合考虑

1.9 spark.reducer.maxMbInFlight

  • 默认:48MB

这个参数用于限制一个ReducerTask向其他的Executor请求shuffle数据时所占用的最大内存数,尤其在网卡是千兆和千兆以下时,- 默认:值是48mb,这是这个值需要综合考虑网卡带宽和内存

1.10 spark.default.parallelism

该参数用于设置每个stage的默认task数量 这个参数极为重要,如果不设置可能会直接影响你的spark作业性能.

spark作业的- 默认:task数量为500–1000个较为合适,很多同学常犯一个错误就是不去设置这个参数,那么久会导致spark自己根据底层的hdfs的block数量来设置task的数量,- 默认:是一个hdfs block 对应一个task. spark- 默认:设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃,试想一下,无论你的Executor进程有多少个,内存和cpu有多大,那么90%的Executor进程坑你根本就没有task执行,也就是白白浪费了资源,因此Spark官网建议的设置原则是,该参数为 num-executors * executor-core 的2–3倍,比如 Executor的总CPU core 的数量为300个,那么设置1000个task是可以的,此时可以充分的利用spark集群资源

2. Storage相关配置参数

2.1 spark.local.dir

这个看起来很简单,就是spark用于写中间数据,如RDD Cache,shuffle ,Spill 等数据的位置,那么有什么需要注意的呢?

首先,最基本的是我们可以配置多个路径(用逗号分隔) 到多个磁盘上增加整体IO带宽,

其次,目前的实现中,spark是通过对文件名采用hash算法分布到各个路径的目录中去,如果你的存储设备有快有慢,比如ssd+hdd混合使用,那么你可以通过在ssd上配置更多的陆慕路径来增大它被spark使用的比例,从而更好的利用ssd的IO带宽能力. 当然这只是一种变通的方法,最终的解决方案还是应该向hdfs的实现方向一样,让spark 能够感知具体的存储设备类型,针对性的使用.

需要注意的是在spark1.0以后,SPARK_LOCAL_DIRS(Standlone,mesos ) or LOCAL_DIRS(YARN) 参数会辅材这个配置,比如spark on yarn 的时候,Spark Executor的本地路径依赖于yarn的配置,而不取决于这个参数

2.2 spark.executor.memory

Executor 内存的大小,和性能本身没有什么直接关系,但是几乎所有运行时性能相关的内容或多或少都和内存大小相关,这个参数最终会被设置到Executor的JVM的heap上,对应的就是Xmx 和Xms的值

理论上Executor 内存当然是多多益善,但是实际上受机器配置,以及运行环境,资源共享,JVM GC效率等因素的影响,还是有可能需要为它设置一个合理的大小,那么多大算合理呢? 这个要看实际情况

Executor的内存基本上是Executor内部所有的任务共享的,而每个Executor上可以支持的任务数量取决于Executor所管理的CPU Core 资源的多少,因此你需要了解每个任务的数据规模大小,从而推算出每个Executor大致需要多少内存即可满足基本的需求.

如何知道每个任务所需要的内存的大小呢?这个很难统一的衡量,因为除了数据集本身的开销,还包括算法所需各种临时内存空间的使用,而根据具体的代码算法等不同,临时内存空间的开销也不同,但是数据集本身的大小,对最终所需内存的大小还是有一定参考意义的.

通常来说每个分区的数据集在内存中的大小,可能是磁盘上元数据大小的若干倍(不考虑数据压缩,java独享相对于原始数据也还要算上用于管理数据的数据结构的额外开销),需要准确知道大小的话可以将RDD cache 在内存中,从BlockManager的Log输出可以看到每个cache的分区大小(实际上也是估算出来的,并不完全准确)

如:BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134(size: 495.3 MB)

反过来说,如果你的Executor的数量和内存大小受机器物理配置影响相对固定,那么你就需要合理规划每个分区任务的数据规模,例如采用更多分区,用增加任务数量(进而需要更多的批次来运算所有的任务) 的方式来减小每个任务所需处理的数据大小

2.3 spark.storage.memoryFraction

  • 默认: 0.6

该参数用户设置RDD 持久化在executor内存中的占比,- 默认:0.6 ,也就是说Executor 60%的内存可以用来保存持久化的RDD数据,根据你选择不同的持久化策略,当内存不够时,数据就不会持久化,或者数据会写入磁盘

如果写入Spark作业中,有较多的RDD持久化操作,该参数可以适当提高一些,保证持久化的数据能够容纳在内存中,避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能,如果Spark作业中的shuffle操作较多,而值就化操作比较少,那么这个参数的值可以适当降低一些比较合适,此外,如果发现作业由于频道gc导致运行缓慢,(通过spark web ui 可以观察到作业的gc耗时,)意味着task执行用户代码的内存不够用,那么同样建议建议调低这个参数值

2.4 spark.streaming.blockInterval

这个参数用来设置spark Streaming 中 Stream Receiver生成Block的时间间隔,- 默认:为200ms. 具体的行为表现是Receiver所接收的数据,每个指定的时间间隔(通过这个参数指定),就从buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager 中BlockManager中供后续计算过程中使用,理论上说,为了每个StreamingBatch 间隔里的数据是均匀的,这个时间间隔需要能被Batch的时间间隔长度所整除,如果内存大小够用,Streaming的数据来得及处理,这个blockInterval时间间隔的影响不大,如果数据Cache Level 是memory+ ser,即做了序列化处理,那么BlockInterval的大小会影响序列化后数据块的大小,对于java的GC的行为会有一些影响

此外spark.streaming.blockQueueSize 决定了再StreamBlock被存储到BlockManager之前,队列中最多可以容纳多少个StreamBlock,- 默认:为10个,因为这个队列的Poll 的时间间隔是100ms,所以如果CPU不是特别繁忙的话,基本没有问题

3. sql 相关参数

1.spark.sql.autoBroadcastJoinThreshold

  • 默认:10000

处理join 查询时广播到每个worker的表的最大字节数,当设置为-1 广播功能将失效

2.spark.sql.codegen

  • 默认:false

当为true时,那么会在运行时动态生成指定的表达式,对于那些拥有复杂表达式的查询来说,该项会导致明显的速度提升,然而对于简单的查询该项会减慢查询速度

3.3 spark.sql.shuffle.partitions

  • 默认:200

配置在处理join或者aggregation时 shuffle数据的partition数量


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Java 基础 15 static 关键字介绍 Java 基础 15 static 关键字介绍
1. 介绍在Java中,static是一个关键字,用于修饰类、方法和变量。使用static关键字声明的成员属于类级别,不需要实例化对象就可以访问和使用。在类定义中使用static关键字声明的变量可以作为常量使用。 2. 为什么设计 stat
2018-02-16
下一篇 
Spark 常用配置参数 Spark 常用配置参数
Spark 中大部分配置参数都有默认值,以下是常用配置: 1. Application Properties 属性 默认值 描述 spark.app.name (none) 应用程序的名称,会在日志和webUI显示 spark
2018-02-07
  目录