Spark SQL 4.架构介绍


1. SparkSQL架构介绍

  • SparkSQL 是spark技术栈当中又一非常实用的模块,
  • SparkSQL 通过引入SQL的支持,大大降低了学习成本,让我们开发人员直接使用SQL的方式就能够实现大数据的开发,
  • SparkSQL 同时支持DSL以及SQL的语法风格
  • 目前在spark的整个架构设计当中,sparkSQL,SparkML,sparkGrahpx以及Structed Streaming等模块都是基于 Catalyst Optimization & Tungsten Execution模块之上运行
  • 如下图所示就显示了spark的整体架构模块设计

2. SparkSQL的架构设计实现

SparkSQL 执行先会经过 SQL Parser 解析 SQL,然后经过 Catalyst 优化器处理,最后到 Spark 执行。而 Catalyst 的过程又分为很多个过程,其中包括:

  • Analysis:主要利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed logical plan;
  • Logical Optimizations:利用一些规则将 Analyzed logical plan 解析成 Optimized Logical Plan;
  • Physical Planning:前面的 logical plan 不能被 Spark 执行,而这个过程是把 logical plan 转换成多个 physical plans,然后利用代价模型(cost model)选择最佳的 physical plan
  • Code Generation:这个过程会把 SQL 查询生成 Java 字 节码。

SparkSQL 内部架构构图

3. Catalyst执行过程

我们编写的sql语句,经过多次转换,最终进行编译成为字节码文件进行执行,这一整个过程经过了好多个步骤,其中包括以下几个重要步骤:

  • sql解析阶段 –> parse
  • 生成逻辑计划 –> Analyzer
  • sql语句调优阶段 –> Optimizer
  • 生成物理查询计划 –> planner

3.1 代码示例

  • 编写测试代码
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/** 利用sparksql加载mysql表中的数据 */
object DataFromMysqlPlan {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")

    //sparkConf.set("spark.sql.codegen.wholeStage","true")
    //2、创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    //3、读取mysql表的数据
    //3.1 指定mysql连接地址
    val url="jdbc:mysql://localhost:3306/mydb?characterEncoding=UTF-8"

    //3.2 指定要加载的表名
    val student="students"
    val score="scores"

    // 3.3 配置连接数据库的相关属性
    val properties = new Properties()

    // 用户名
    properties.setProperty("user","root")

    // 密码
    properties.setProperty("password","123456")

    val studentFrame: DataFrame = spark.read.jdbc(url,student,properties)
    val scoreFrame: DataFrame = spark.read.jdbc(url,score,properties)

    // 把dataFrame注册成表
    studentFrame.createTempView("students")
    scoreFrame.createOrReplaceTempView("scores")

    // 定义查询语句
    val sql=
      s"""
        |SELECT
        |  temp1.class,
        |  SUM(temp1.degree),
        |  AVG(temp1.degree)
        |FROM (
        |  SELECT
        |    students.sno AS ssno,
        |    students.sname,
        |    students.ssex,
        |    students.sbirthday,
        |    students.class,
        |    scores.sno,
        |    scores.degree,
        |    scores.cno
        |  FROM students
        |  LEFT JOIN scores ON students.sno =  scores.sno
        |  WHERE degree > 60
        |  AND sbirthday > '1973-01-01 00:00:00'
        |) temp1
        |GROUP BY temp1.class
        |""".stripMargin

    val resultFrame: DataFrame = spark.sql(sql)
    resultFrame.explain(true)
    resultFrame.show()
    spark.stop()
  }
}
  • 通过explain方法来查sql的执行计划,得到以下信息
------------------------------------------------------------------------------------------------------------
----------------------------- Parsed Logical Plan ----------------------------------------------------------
------------------------------------------------------------------------------------------------------------

'Aggregate ['temp1.class], ['temp1.class, unresolvedalias('SUM('temp1.degree), None), unresolvedalias('AVG('temp1.degree), None)]
+- 'SubqueryAlias temp1
   +- 'Project ['students.sno AS ssno#16, 'students.sname, 'students.ssex, 'students.sbirthday, 'students.class, 'scores.sno, 'scores.degree, 'scores.cno]
      +- 'Filter (('degree > 60) && ('sbirthday > 1973-01-01 00:00:00))
         +- 'Join LeftOuter, ('students.sno = 'scores.sno)
            :- 'UnresolvedRelation `students`
            +- 'UnresolvedRelation `scores`

------------------------------------------------------------------------------------------------------------
-----------------------------Analyzed Logical Plan ---------------------------------------------------------
------------------------------------------------------------------------------------------------------------

class: string, sum(degree): decimal(20,1), avg(degree): decimal(14,5)
Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28]
+- SubqueryAlias temp1
   +- Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, degree#12, cno#11]
      +- Filter ((cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1))) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00))
         +- Join LeftOuter, (sno#0 = sno#10)
            :- SubqueryAlias students
            :  +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1]
            +- SubqueryAlias scores
               +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1]

------------------------------------------------------------------------------------------------------------
-----------------------------Optimized Logical Plan---------------------------------------------------------
------------------------------------------------------------------------------------------------------------

Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, cast((avg(UnscaledValue(degree#12)) / 10.0) as decimal(14,5)) AS avg(degree)#28]
+- Project [class#4, degree#12]
   +- Join Inner, (sno#0 = sno#10)
      :- Project [sno#0, class#4]
      :  +- Filter ((isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) && isnotnull(sno#0))
      :     +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1]
      +- Project [sno#10, degree#12]
         +- Filter ((isnotnull(degree#12) && (degree#12 > 60.0)) && isnotnull(sno#10))
            +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1]

------------------------------------------------------------------------------------------------------------
---------------------------- Physical Plan -----------------------------------------------------------------
------------------------------------------------------------------------------------------------------------

*(6) HashAggregate(keys=[class#4], functions=[sum(degree#12), avg(UnscaledValue(degree#12))], output=[class#4, sum(degree)#27, avg(degree)#28])
+- Exchange hashpartitioning(class#4, 200)
   +- *(5) HashAggregate(keys=[class#4], functions=[partial_sum(degree#12), partial_avg(UnscaledValue(degree#12))], output=[class#4, sum#32, sum#33, count#34L])
      +- *(5) Project [class#4, degree#12]
         +- *(5) SortMergeJoin [sno#0], [sno#10], Inner
            :- *(2) Sort [sno#0 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(sno#0, 200)
            :     +- *(1) Project [sno#0, class#4]
            :        +- *(1) Filter (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)
            :           +- *(1) Scan JDBCRelation(students) [numPartitions=1] [sno#0,class#4,sbirthday#3] PushedFilters: [*IsNotNull(sbirthday), *IsNotNull(sno)], ReadSchema: struct<sno:string,class:string,sbirthday:timestamp>
            +- *(4) Sort [sno#10 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(sno#10, 200)
                  +- *(3) Scan JDBCRelation(scores) [numPartitions=1] [sno#10,degree#12] PushedFilters: [*IsNotNull(degree), *GreaterThan(degree,60.0), *IsNotNull(sno)], ReadSchema: struct<sno:string,degree:decimal(10,1)>

3.2 sql解析阶段

  • Parser 阶段

  • 在 Spark2.x 的版本当中,为了解析sparkSQL的sql语句,引入了 Antlr

  • 目前最新版本的 Spark 使用的是 ANTLR4,通过这个对 SQL 进行词法分析并构建语法树。

  • SQL 解析首先通过SqlBaseLexer来解析关键词以及各种标识符,然后使用SqlBaseParser来构建语法树。

  • 通过 Lexer 以及 parse 解析之后,生成语法树,生成语法树之后

  • 使用AstBuilder将语法树转换成为LogicalPlan 也被称为 Unresolved LogicalPlan。

Antlr 是一款强大的语法生成器工具,可用于读取、处理、执行和翻译结构化的文本或二进制文件,是当前 Java 语言中使用最为广泛的语法生成器工具,我们常见的大数据 SQL 解析都用到了这个工具,包括 Hive、Cassandra、Phoenix、Pig 以及 presto 等。

我们可以通过github去查看得到sparkSQL支持的SQL语法,所有sparkSQL支持的语法都定义在了这个文件当中,具体路径如下:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

如果我们需要重构sparkSQL的语法,那么我们只需要重新定义好相关语法,然后使用 Antlr4 对SqlBase.g4进行语法解析,生成相关的java类,其中就包含重要的词法解析器 SqlBaseLexer.java 和语法解析器 SqlBaseParser.java。

构建语法树简单示例

  • 解析之后的逻辑计划如下:

两个表被join之后生成了UnresolvedRelation,选择的列以及聚合的字段都有了,sql解析的第一个阶段就已经完成,接着准备进入到第二个阶段

------------------------------------------------------------------------------------------------------------
----------------------------- Parsed Logical Plan ----------------------------------------------------------
------------------------------------------------------------------------------------------------------------

'Aggregate ['temp1.class], ['temp1.class, 
     unresolvedalias('SUM('temp1.degree), None), 
     unresolvedalias('AVG('temp1.degree), None)]
+- 'SubqueryAlias temp1
   +- 'Project [
        'students.sno AS ssno#16, 
        'students.sname, 
        'students.ssex, 
        'students.sbirthday, 
        'students.class, 
        'scores.sno, 
        'scores.degree, 
        'scores.cno
      ]

      +- 'Filter (('degree > 60) && ('sbirthday > 1973-01-01 00:00:00))
         +- 'Join LeftOuter, ('students.sno = 'scores.sno)
            :- 'UnresolvedRelation `students`
            +- 'UnresolvedRelation `scores`
unresolved Logical Plan

3.3 绑定逻辑计划

  • Analyzer 阶段

  • 在sql解析parse阶段,生成了很多的 unresolvedalias , UnresolvedRelation 等很多未解析出来的有些关键字,这些都是属于 Unresolved LogicalPlan解析的部分。

  • Unresolved LogicalPlan仅仅是一种数据结构,不包含任何数据信息,例如不知道数据源,数据类型,不同的列来自哪张表等等

  • Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform。

    • SessionCatalog 主要用于各种 函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。
    • Rule 是定义在 Analyzer 里面的,具体的类的路径如下:
org.apache.spark.sql.catalyst.analysis.Analyzer

具体的rule规则定义如下:
 lazy val batches: Seq[Batch] = Seq(

   Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),

   Batch("Simple Sanity Check", Once,
      LookupFunctions),

   Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),

   Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveSubqueryColumnAliases ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables(conf) ::
      ResolveTimeZone(conf) ::
      ResolvedUuidExpressions ::
      TypeCoercion.typeCoercionRules(conf) ++
      extendedResolutionRules : _*),

   Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),

   Batch("View", Once,
      AliasViewChild(conf)),

   Batch("Nondeterministic", Once,
      PullOutNondeterministic),

   Batch("UDF", Once,
      HandleNullInputsForUDF),

   Batch("FixNullability", Once,
      FixNullability),

   Batch("Subquery", Once,
      UpdateOuterReferences),

   Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )
  1. 从上面代码可以看出,多个性质类似的 Rule 组成一个 Batch,比如上面名为 Hints 的 Batch就是由很多个 Hints Rule 组成;

  2. 而多个 Batch 构成一个 batches。这些 batches 会由 RuleExecutor 执行,先按一个一个 Batch 顺序执行,然后对 Batch 里面的每个 Rule 顺序执行。

  3. 每个 Batch 会执行一次(Once)或多次(FixedPoint,由spark.sql.optimizer.maxIterations 参数决定),执行过程如下:

spark_batch_ruleexecutor-iteblog

所以上面的 SQL 经过这个阶段生成的 Analyzed Logical Plan 如下:

------------------------------------------------------------------------------------------------------------
-----------------------------Analyzed Logical Plan ---------------------------------------------------------
------------------------------------------------------------------------------------------------------------

class: string, 
sum(degree): decimal(20,1), 
avg(degree): decimal(14,5)

Aggregate 
    [class#4], 
    [
      class#4, sum(degree#12) AS sum(degree)#27, 
      avg(degree#12) AS avg(degree)#28
    ]

+- SubqueryAlias temp1
   +- Project [
        sno#0 AS ssno#16, 
        sname#1, 
        ssex#2, 
        sbirthday#3, 
        class#4, 
        sno#10, 
        degree#12, 
        cno#11
      ]

      +- Filter (
           (cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1))) 
              && 
           (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)
             )

          +- Join LeftOuter, (sno#0 = sno#10)
            :- SubqueryAlias students
            :  +- Relation
                  [sno#0,sname#1,ssex#2,sbirthday#3,class#4] 
                  JDBCRelation(students) [numPartitions=1]
            +- SubqueryAlias scores
               +- Relation
                   [sno#10,cno#11,degree#12] 
                   JDBCRelation(scores) [numPartitions=1]

从上面的解析过程来看可以看到 Analyzed Logical Plan 主要就是干了一下几件事:

1、确定最终返回字段名称以及返回类型:

  • class: string, sum(degree): decimal(20,1), avg(degree): decimal(14,5)

2、确定聚合函数

  • Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28]

3、确定表当中获取的查询字段

  • Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, cno#11, degree#12]

4、确定过滤条件

Filter (

​ (cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1)))

​ &&

​ (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)

)

5、确定join方式

Join LeftOuter, (sno#0 = sno#10)

6、确定表当中的数据来源以及分区个数

  • JDBCRelation(students) [numPartitions=1]

  • JDBCRelation(scores) [numPartitions=1]

至此Analyzed Logical Plan已经完成。

Analyzer Logical Plan 过程图示

3.4 逻辑优化阶段

  • Optimizer 阶段

  • 逻辑计划阶段对 Unresolved LogicalPlan 进行相关 transform 操作得到了 Analyzed Logical Plan

  • Analyzed Logical Plan 是可以直接转换成 Physical Plan 然后在 [Spark] 中执行。

  • 但得到的 Physical Plan 很可能不是最优的,需要进一步对Analyzed Logical Plan 进行处理,得到更优的逻辑算子树

  • 针对 SQL 逻辑算子树的优化器 Optimizer 应运而生

逻辑优化阶段的优化器主要是基于规则的(Rule-based Optimizer,简称 RBO),而绝大部分的规则都是启发式规则,也就是基于直观或经验而得出的规则,比如

  • 列裁剪:过滤掉查询不需要使用到的列
  • 谓词下推 :将过滤尽可能地下沉到数据源端
  • 常量累加:如 1 + 2 这种事先计算好
  • 常量替换:如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以转换成 SELECT * FROM table WHERE i = 5 AND j = 8

与前文介绍绑定逻辑计划阶段类似,这个阶段所有的规则也是实现 Rule 抽象类,多个规则组成一个 Batch,多个 Batch 组成一个 batches,同样也是在 RuleExecutor 中进行执行

这里按照 Rule 执行顺序一一进行说明。

------------------------------------------------------------------------------------------------------------
-----------------------------Optimized Logical Plan---------------------------------------------------------
------------------------------------------------------------------------------------------------------------

Aggregate 
    [class#4], 
    [
      class#4, 
      sum(degree#12) AS sum(degree)#27, 
      cast((avg(UnscaledValue(degree#12)) / 10.0) as decimal(14,5)) AS avg(degree)#28]

+- Project [class#4, degree#12]
   +- Join Inner, (sno#0 = sno#10)
      :- Project [sno#0, class#4]
      :  +- Filter (
            (isnotnull(sbirthday#3) 
                       && 
            (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)) && isnotnull(sno#0)
            )
      :     +- Relation[sno#0,sname#1,ssex#2,sbirthday#3,class#4] JDBCRelation(students) [numPartitions=1]
      +- Project [sno#10, degree#12]
         +- Filter ((isnotnull(degree#12) && (degree#12 > 60.0)) && isnotnull(sno#10))
            +- Relation[sno#10,cno#11,degree#12] JDBCRelation(scores) [numPartitions=1]

3.4.1 谓词下推

  • 谓词下推在 SparkQL 是由 PushDownPredicate 实现的,这个过程主要将过滤条件尽可能地下推到底层,最好是数据源。

  • 谓词下推是将 Filter 算子直接下推到 Join 之前,经过这样的操作,可以大大减少 Join 算子处理的数据量,从而加快计算速度。

使用谓词下推优化得到的逻辑计划如下:

在扫描 student表和scores表的时候使用条件过滤条件过滤出满足条件的数据

谓词下推图示

3.4.1 列裁剪

列裁剪在 Spark SQL 是由 ColumnPruning 实现的。因为我们查询的表可能有很多个字段,但是每次查询我们很大可能不需要扫描出所有的字段,这个时候利用列裁剪可以把那些查询不需要的字段过滤掉,使得扫描的数据量减少。所以针对我们上面介绍的 SQL,使用列裁剪优化得到的逻辑计划如下:

列剪枝图示

从上图可以看出,经过列裁剪后,students 表只需要查询 sno和 class 两个字段;scores 表只需要查询 sno,degree 字段。这样减少了数据的传输,而且如果底层的文件格式为列存(比如 Parquet),可以大大提高数据的扫描速度的。

3.4.3 常量替换

  • 常量替换在 Spark SQL 是由 ConstantPropagation 实现的。

  • 常量替换是将变量替换成常量,如果扫描的行数非常多可以减少很多的计算时间的开销的

    -- 比如 
    SELECT * FROM table WHERE i = 5 AND j = i + 3  
    -- 转换成  
    SELECT * FROM table WHERE i = 5 AND j = 8
    
  • 经过常量替换优化,得到的逻辑计划如下:
常量替换

如上图查询中有 t1.cid = 1 AND t1.did = t1.cid + 1 查询语句,从里面可以看出 t1.cid 其实已经是确定的值了,所以完全可以使用它计算出 t1.did

3.4.4 常量累加

  • 常量累加在 Spark SQL 是由 ConstantFolding 实现。
  • 常量累加和常量替换类似,也是在这个阶段把一些常量表达式事先计算好。
  • 常量累加看起来改动的不大,但是在数据量非常大的时候可以减少大量的计算,减少 CPU 等资源的使用。
  • 经过这个优化,得到的逻辑计划如下:
常量累加

另外更多的其他优化,参见spark源码:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L59

3.5 可执行的物理计划阶段

  • Physical Plan 阶段

  • 经过前面多个多个阶段,得到经过优化之后的sql语句仍然不能执行

  • 为了能够执行这个sql,最终必须得要翻译成为可以被执行的物理计划,

  • 到可执行的物理计划阶段 spark就知道该如何执行这个sql了,和前面逻辑计划绑定和优化不一样,

  • 可执行的物理计划阶段 使用的是策略 strategy,经过前面介绍的逻辑计划绑定和 Transformations 动作之后,树的类型并没有改变。

    • Expression 经过 Transformations 之后得到的还是 Transformations
    • Logical Plan 经过 Transformations 之后得到的还是 Logical Plan
  • 在可执行的物理计划阶段,Logical Plan 经过 Transformations 之后树的类型转换成 Physical Plan

  • 一个逻辑计划(Logical Plan)经过一系列的策略处理之后,得到多个物理计划(Physical Plans),物理计划在 Spark 是由 SparkPlan 实现的。

  • 多个物理计划再经过代价模型(Cost Model)得到选择后的物理计划(Selected Physical Plan),整个过程如下所示:

  • Cost Model 对应的就是基于代价的优化(Cost-based Optimizations,CBO),核心思想是计算每个物理计划的代价,

  • SPARK-16026 引入的 CBO 优化主要是在前面介绍的优化逻辑计划阶段 - Optimizer 阶段进行的

  • 对应的 Rule 为 CostBasedJoinReorder,并且默认是关闭的,需要通过 spark.sql.cbo.enabledspark.sql.cbo.joinReorder.enabled 参数开启。

  • 最后得到的物理计划如下:

------------------------------------------------------------------------------------------------------------
---------------------------- Physical Plan -----------------------------------------------------------------
------------------------------------------------------------------------------------------------------------

*(6) HashAggregate(keys=[class#4], functions=[sum(degree#12), avg(UnscaledValue(degree#12))], output=[class#4, sum(degree)#27, avg(degree)#28])
+- Exchange hashpartitioning(class#4, 200)
   +- *(5) HashAggregate(keys=[class#4], functions=[partial_sum(degree#12), partial_avg(UnscaledValue(degree#12))], output=[class#4, sum#32, sum#33, count#34L])
      +- *(5) Project [class#4, degree#12]
         +- *(5) SortMergeJoin [sno#0], [sno#10], Inner
            :- *(2) Sort [sno#0 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(sno#0, 200)
            :     +- *(1) Project [sno#0, class#4]
            :        +- *(1) Filter (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)
            :           +- *(1) Scan JDBCRelation(students) [numPartitions=1] [sno#0,class#4,sbirthday#3] PushedFilters: [*IsNotNull(sbirthday), *IsNotNull(sno)], ReadSchema: struct<sno:string,class:string,sbirthday:timestamp>
            +- *(4) Sort [sno#10 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(sno#10, 200)
                  +- *(3) Scan JDBCRelation(scores) [numPartitions=1] [sno#10,degree#12] PushedFilters: [*IsNotNull(degree), *GreaterThan(degree,60.0), *IsNotNull(sno)], ReadSchema: struct<sno:string,degree:decimal(10,1)>

从上面的结果可以看出,物理计划阶段已经知道数据源是从 JDBC里面读取了,也知道文件的路径,数据类型等。而且在读取文件的时候,直接将过滤条件(PushedFilters)加进去了,同时,这个 Join 变成了 SortMergeJoin,到这里 Physical Plan 就完全生成了。

3.6 代码生成阶段

  • 物理执行计划标明了整个的代码执行过程当中我们代码层面的执行过程,以及最终要得到的数据字段以及字段类型,也包含了我们对应的数据源的位置,

  • 得到物理执行计划想要被执行,最终还是得要生成完整的代码,底层还是基于sparkRDD去进行处理的,spark最后也还会有一些Rule对生成的物理执行计划进行处理,这个处理过程就是prepareForExecution

  • 这些rule规则定义在org.apache.spark.sql.execution.QueryExecution 这个类当中的方法里面。

3.6.1 生成代码与sql解析引擎的区别

  • 在sparkSQL当中,通过生成代码,来实现sql语句的最终生成,说白了最后底层执行的还是代码,那么为什么要这么麻烦,使用代码的方式来执行我们的sql语句,难道没有sql的解析引擎直接执行sql语句嘛?

  • 当然是有的,在spark2.0版本之前使用的都是基于Volcano Iterator Model(参见 《Volcano-An Extensible and Parallel Query Evaluation System》) 来实现sql的解析的,这个是由 Goetz Graefe 在 1993 年提出的,当今绝大多数数据库系统处理 SQL 在底层都是基于这个模型的。

  • 这个模型的执行可以概括为:

    1. 数据库引擎会将 SQL 翻译成一系列的关系代数算子或表达式,
    2. 依赖这些关系代数算子逐条处理输入数据并产生结果。每个算子在底层都实现同样的接口,比如都实现了 next() 方法,
    3. 最顶层的算子 next() 调用子算子的 next(),子算子的 next() 在调用孙算子的 next(),直到最底层的 next(),
    4. 具体过程如下图表示:
原始解析sql方式
  • Volcano Iterator Model 的优点是抽象起来很简单,很容易实现,而且可以通过任意组合算子来表达复杂的查询。但是缺点也很明显,存在大量的 虚函数调用 ,会引起 CPU 的中断,最终影响了执行效率。

  • 所以总结起来就是将sql解析成为代码,比sql引擎直接解析sql语句效率要快,所以spark2.0最终选择使用代码生成的方式来执行sql语句

  • 基于上面的发现,从 Apache Spark 2.0 开始,社区开始引入了 Whole-stage Code Generation,参见 SPARK-12795,主要就是想通过这个来模拟手写代码,从而提升 Spark SQL 的执行效率。Whole-stage Code Generation 来自于2011年 Thomas Neumann 发表的 Efficiently Compiling Efficient Query Plans for Modern Hardware论文,这个也是 Tungsten 计划的一部分。

Tungsten 代码生成分为三部分:

  1. 表达式代码生成(expression codegen)

  2. 全阶段代码生成(Whole-stage Code Generation)

  3. 加速序列化和反序列化(speed up serialization/deserialization)

3.6.2 表达式代码生成(expression codegen)

这个其实在 Spark 1.x 就有了。表达式代码生成的基类是 org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator,其下有七个子类:

前文的 SQL 生成的逻辑计划中的 (isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00) 就是最基本的表达式。它也是一种 Predicate,所以会调用 org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate 来生成表达式的代码。

3.6.3 全阶段代码生成

  • 全阶段代码生成(Whole-stage Code Generation),用来将多个处理逻辑整合到单个代码模块中,其中也会用到上面的表达式代码生成。
  • 和前面介绍的表达式代码生成不一样,这个是对整个 SQL 过程进行代码生成,前面的表达式代码生成仅对于表达式的。
  • 全阶段代码生成都是继承自 org.apache.spark.sql.execution.BufferedRowIterator 的,生成的代码需要实现 processNext() 方法,这个方法会在 org.apache.spark.sql.execution.WholeStageCodegenExec 里面的 doExecute 方法里面被调用。而这个方法里面的 rdd 会将数据传进生成的代码里面 ,比如我们上文 SQL 这个例子的数据源是 JDBC文件,
  • 底层使用 org.apache.spark.sql.execution.RowDataSourceScanExec这个类读取文件,然后生成 inputRDD,这个 rdd 在 WholeStageCodegenExec 类中的 doExecute 方法里面调用生成的代码,然后执行我们各种判断得到最后的结果。
  • 通过引入全阶段代码生成,大大减少了虚函数的调用,减少了 CPU 的调用,使得 SQL 的执行速度有很大提升。
全阶段代码生成
  • WholeStageCodegenExec 类中的 doExecute 方法部分代码如下:
/** 
  * WholeStageCodegen compiles a subtree of plans that support codegen together into single Java 
  * function. 
  * 
  * Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not): 
  * 
  * WholeStageCodegen       Plan A               FakeInput        Plan B 
  * ========================================================================= 
  * 
  * -> execute() 
  *     | 
  *  doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute() 
  *     | 
  *     +----------------->   produce() 
  *                             | 
  *                          doProduce()  -------> produce() 
  *                                                   | 
  *                                                doProduce() 
  *                                                   | 
  *                         doConsume() <--------- consume() 
  *                             | 
  *  doConsume()  <--------  consume() 
  * 
  * SparkPlan A should override `doProduce()` and `doConsume()`. 
  * 
  * `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, 
  * used to generated code for [[BoundReference]]. 
  */
override def doExecute(): RDD[InternalRow] = {    
  val (ctx, cleanedSource) = doCodeGen()
  //
  try to compile and fallback if it failed    
  val (_, maxCodeSize) = 
  try {      
    CodeGenerator.compile(cleanedSource)    
  } catch {      
    case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>        
  // We should already saw the error message        
    logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")        
    return child.execute()    
  }    
  // Check if compiled code has a too large function    
  if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {      
    logInfo(
      s"Found too long generated codes and JIT optimization might not work: " +        
      s"the bytecode size ($maxCodeSize) is above the limit " +
      s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +        
      s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +        
      s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString"
    )      
    child match {        
      // The fallback solution of batch file source scan still uses WholeStageCodegenExec        
      case f: FileSourceScanExec if f.supportsBatch => 
      // do nothing        
      case _ => return child.execute()      
    }    
  }    
  val references = ctx.references.toArray    
  val durationMs = longMetric("pipelineTime")    
  val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()    
  assert(rdds.size <= 2, "Up to two input RDDs can be supported")    
  if (rdds.length == 1) {      
    rdds.head.mapPartitionsWithIndex { 
      (index, iter) =>        
      val (clazz, _) = CodeGenerator.compile(cleanedSource)        
      val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]        
      buffer.init(index, Array(iter))        
      new Iterator[InternalRow] {          
        override def hasNext: Boolean = {            
          val v = buffer.hasNext            
          if (!v) durationMs += buffer.durationMs()            
          v          
        }          
        override def next: InternalRow = buffer.next()        
      }      
    }    
  } else {      
    // Right now, we support up to two input RDDs.      
    rdds.head.zipPartitions(rdds(1)) {
      (leftIter, rightIter) =>        
      Iterator((leftIter, rightIter))        
      // a small hack to obtain the correct partition index
    }.mapPartitionsWithIndex { (index, zippedIter) =>
      val (leftIter, rightIter) = zippedIter.next()        
      val (clazz, _) = CodeGenerator.compile(cleanedSource)        
      val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]        
      buffer.init(index, Array(leftIter, rightIter))        
      new Iterator[InternalRow] {
        override def hasNext: Boolean = {
          val v = buffer.hasNext
          if (!v) durationMs += buffer.durationMs()
          v
        }
        override def next: InternalRow = buffer.next()
      }
    }
  }
}

在WholeStageCodegenExec 这个类的注释当中也说明了,最终生成的代码过程如下

/** 
 * WholeStageCodegen compiles a subtree of plans that support codegen together into single Java 
 * function. 
 * 
 * Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not): 
 * 
 *   WholeStageCodegen       Plan A               FakeInput        Plan B 
 * ========================================================================= 
 * 
 * -> execute() 
 *     | 
 *  doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute() 
 *     | *     +----------------->   produce() 
 *                             | 
 *                          doProduce()  -------> produce() 
 *                                                   | 
 *                                                doProduce() 
 *                                                   | 
 *                         doConsume() <--------- consume() 
 *                             | 
 *  doConsume()  <--------  consume() 
 * 
 * SparkPlan A should override `doProduce()` and `doConsume()`. 
 * 
 * `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, 
 * used to generated code for [[BoundReference]]. 
 */

全阶段代码生成的执行过程

3.6.4 代码编译

  • 生成代码之后需要解决的另一个问题是如何将生成的代码进行编译然后加载到同一个 JVM 中去。

  • 在早期 Spark 版本是使用 Scala 的 Reflection 和 Quasiquotes 机制来实现代码生成的。

  • Quasiquotes 是一个简洁的符号,可以让我们轻松操作 Scala 语法树,具体参见 这里。虽然 Quasiquotes 可以很好的为我们解决代码生成等相关的问题,但是带来的新问题是编译代码时间比较长(大约 50ms - 500ms)所以社区不得不默认关闭表达式代码生成。

  • 为了解决这个问题,Spark 引入了 Janino 项目,参见 SPARK-7956

  • Janino 是一个超级小但又超级快的 Java™ 编译器. 它不仅能像 javac 工具那样将一组源文件编译成字节码文件,还可以对一些 Java 表达式,代码块,类中的文本(class body)或者内存中源文件进行编译,并把编译后的字节码直接加载到同一个 JVM 中运行。

  • Janino 不是一个开发工具, 而是作为运行时的嵌入式编译器,比如作为表达式求值的翻译器或类似于 JSP 的服务端页面引擎,关于 Janino 的更多知识请参见这里

  • 通过引入了 Janino 来编译生成的代码,结果显示 SQL 表达式的编译时间减少到 5ms。

  • 在 Spark 中使用了 ClassBodyEvaluator 来编译生成之后的代码,参见 org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator。

  • 需要主要的是,代码生成是在 Driver 端进行的,而代码编译是在 Executor 端进行的。

3.6.5 SQL执行

终于到了 SQL 真正执行的地方了。这个时候 Spark 会执行上阶段生成的代码,然后得到最终的结果

DAG 执行图

4. sparkSQL执行过程总结

SparkSQL 执行过程

从上面可以看得出来,sparkSQL的执行主要经过了这么几个大的步骤

  1. 输入sql,dataFrame或者dataSet

  2. 经过Catalyst过程,生成最终我们得到的最优的物理执行计划

    2.1 parser阶段

​ 主要是通过Antlr4解析SqlBase.g4 ,所有spark’支持的语法方式都是定义在sqlBase.g4里面了,如果需要扩展sparkSQL的语法,我们只需要扩展sqlBase.g4即可,通过antlr4解析sqlBase.g4文件,生成了我们的语法解析器SqlBaseLexer.java和词法解析器SqlBaseParser.java

​ parse阶段 –> antlr4 –> 解析 –> SqlBase.g4 –> 得到 –> 语法解析器SqlBaseLexer.java + 词法解析器SqlBaseParser.java

​ 2.2 analyzer阶段

​ 使用基于Rule的规则解析以及Session Catalog来实现函数资源信息和元数据管理信息

​ Analyzer 阶段 –> 使用 –> Rule + Session Catalog –> 多个rule –> 组成一个batch

​ session CataLog –> 保存函数资源信息以及元数据信息等

​ 2.3 optimizer阶段

​ optimizer调优阶段 –> 基于规则的RBO优化rule-based optimizer –> 谓词下推 + 列剪枝 + 常量替换 + 常量累加

​ 2.4 planner阶段

​ 通过analyzer生成多个物理计划 –> 经过Cost Model进行最优选择 –> 基于代价的CBO优化 –> 最终选定得到的最优物理执行计划

​ 2.5 选定最终的物理计划,准备执行

​ 最终选定的最优物理执行计划 –> 准备生成代码去开始执行

  1. 将最终得到的物理执行计划进行代码生成,提交代码去执行我们的最终任务

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark SQL 5.调优 Spark SQL 5.调优
1. 数据缓存性能调优主要是将数据放入内存中操作,spark缓存注册表的方法 缓存spark表 spark.catalog.cacheTable("tableName") 释放缓存表 spark.catalog.uncacheTab
2018-06-15
下一篇 
Spark SQL 3.Hive On Spark Spark SQL 3.Hive On Spark
1. Spark on hive 与 Hive on Spark 的区别 Spark on hive Spark通过Spark-SQL使用hive 语句操作hive,底层运行的还是 spark rdd 就是通过sparksql,加载hi
2018-05-29
  目录