Spark 任务调度解析


1. Spark任务调度流程图

2 DAGScheduler

2.1 DAGScheduler 介绍

  • DAG:Directed Acyclic Graph ,有向无环图

  • 高级调度器,面向阶段调度。(map阶段、reduce阶段)(stage-0、stage-1)。

2.2 主要职能

  1. 接收 Job ,拆分 Job , 传递到下层调度器
  • DAGScheduler 是接收 Job 的主入口,在SparkContext中会调用 submitJob(rdd…)runJob(rdd…) 这两个方法提交任务。

  • DAGScheduler 将 RDD 以 shuffle 为边界创建 Stage

    • 窄依赖操作通过管道在每个阶段划分到一个taskSet中(如:map、filter都属于窄依赖操作)
    • 宽依赖的操作需要多个阶段,对于每个stage最后只有宽依赖会依赖其他阶段。
    • 宽依赖或者窄依赖的操作实际发生在各种RDD的compute函数中。
  • DAGScheduler 为 Job 计算 Stage 的 DAG ,跟踪哪个 RDD 的 Stage 的输出需要被进行物化,寻找最小调度进行作业。

  • DAGScheduler 生成stage并提交,接着判断这个 Stage 是否有父 Stage 未完成,如果有提交并等待父 Stage,以此类推结果是DAGScheduler里增加了一些Waiting Stage和一个Running Stage。

    • Running Stage提交后,分析 Stage 里 Task 的类型,生成一个task描述(taskSet)。
  • TaskSet 包含了独立的 Task,可以基于集群上已经存在的数据独立运行(如:上一个Stage的map output),但是如果这些数据不可用会导致task运行失败。

  • DAGScheduler 调用 TaskScheduler.submitTask(taskSet,…) 方法,以任务集(taskSet)方式提交Stage给 TaskScheduler,以实现在集群上运行的机制。

  • DAG调度器还会根据当前缓存状态确定每个任务的首选位置。并将这些位置信息传递给TaskScheduler。

  • TaskScheduler 会依据资源量和出发分配条件,为这个 TaskSet 分配资源并触发执行

  • DAGScheduler 提交job后,异步返回 jobWaiter 对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果。

  1. 用于跟踪作业追踪器上的任务结果事件
  • 如果 Task 执行成功,在对应的 Stage 里减去这个Task,接下来做一些计数工作
  1. 如果 Task 是 ResultTask :累加器 + 1,在 Job 中为该 Task 设置为true,JobFinish 总数+1。

    1. 加完之后如果Finish数目与Partition数目相等,说明这个 Stage 完成了,标记 Stage 为完成,从 Running Stage 里减去这个 Stage,做一些stage清理工作。
  1. 如果 Task 是 ShuffleMapTask:累加器 + 1,在Stage里加上一个OutPutLocation,里面是一个 MapStatus 类。
    1. 同时检查该 Stage 是否完成,向 MapOutputTracker 注册该 Stage 里的 Shuffle 和 Location 信息
    2. 然后检查 Stage 的 OutPutLocation 是否存在空,
    3. 如果存在空说明一些 Task 失败了,整个Stage重新提交。
    4. 如果不存在空,继续从waiting stage中提交下一个需要执行的stage。
    5. (MapStatus是ShuffleMap Task执行完成的返回,包含Location和BlockSize信息)。
  • 如果 Task 失败重复提交,在对应 Stage 里增加这个Task。
  1. 如果task是获取失败,马上标记对应的 Stage 完成,从 Running Stages 里减去,不允许retry并且终止整个stage。
  2. 如果不是获取失败则重新提交整个Stage,另外把这个Fetch相关的Location和Map任务信息从Stage里剔除,从mapOutputTracker注销掉,
  3. 最后如果这次fetch的blockManager对象部位空做一次ExecutorLost处理,下次shuffle会换在另一个executor上去执行。
  • 其他 Task 状态会由 TaskScheduler 处理,如Execution、TaskResultLost、commitDenied等。

  • 其他与job相关的还包括:Cancel Job、Cancel Stage、ReSubmit Failed Stage 等。

2.3 其他职能

  • 缓存跟踪 Cache tracking

DAG调度器分析被缓存的RDD, 以避免重复计算, 也会记住map阶段产生了output的shuffle 避免重复map过程。

  • 首选位置 Prefered location

DAG调度器基于底层RDD, 缓存的配置或者shuffle的数据参数得到首位置选择, 并计算task运行地点。

  • CleanUp

Job 完成后数据结构被清除, 防止内存泄漏,为了故障修复,同以stage可能被执行多次,称之为”attemp”

如果task调度器报告错误是因为上一个stage的map output 丢失, DAG调度器就会提交丢失的Stage,这是通过compeletion with fetchFailed 或者 ExecutorLost event 对象检测的,DAG调度器会等待一段时间,看是否其他节点或者task也出现故障, 然后一起提交lost Stage 的task集合。

2.4 DAG 图

val rdd1 = sc.makeRDD(1 to 10)
val rdd2 = rdd1.filter(_ < 6)
val rdd3 = rdd1.filter(_ >= 6)
val rdd4 = rdd2.union(rdd3)
val rdd5 = rdd4.map(e=>(e,1))
val rdd6 = rdd5.reduceByKey(_ + _)
val rdd7 = rdd6.filter(e=>(e _1) %2 == 0).repartition(3).reduceByKey(_ + _).collect

3. TaskScheduler

  • 任务调度器(Trait)

  • 底层任务调度接口,有专门的实现(TaskSchedulerImpl)。

  • 每个调度器任务需要一个单独的SparkContext对象。

  • 该调度器会从DAG调度器中得到提交的TaskSet,并负责将Task发送给cluster运行它们。如果运行出错会重试,返回事件对象给DAG调度器。

  • 维护task和executor对应关系,executor和物理资源的对应关系。在排队的task和正在运行的task。

  • 内部维护一个队列根据FIFO(先入先出)或者Fair(公平调度)策略,调度任务。

3.1 TaskScheduler中的抽象方法

TaskScheduler本身是一个Trait,spark里只TaskSchedulerImpl实现了TaskScheduler ,理论上任务调度器可以定制,下面是TaskScheduler的所有抽象方法


package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2

/**  
  * Low-level task scheduler interface, currently implemented exclusively by
  * [[org.apache.spark.scheduler.TaskSchedulerImpl]].
  * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
  * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
  * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
  * them, retrying if there are failures, and mitigating stragglers. They return events to the
  * DAGScheduler.
  */
private[spark] trait TaskScheduler {

  private val appId = "spark-application-" + System.currentTimeMillis
  def rootPool: Pool
  def schedulingMode: SchedulingMode
  def start(): Unit

  /**  
   *  系统成功初始化后调用(通常在spark context中)
   *  yarn 通过它来选择首选位置 分配资源等
   *  等待slave注册等
   */
  def postStartHook() { }

  // 断开集群连接
  def stop(): Unit

  // 提交一个tesk 队列运行
  def submitTasks(taskSet: TaskSet): Unit

  // 杀死stage中的所有task,并且使该stage以及所有依赖该stage的stage失败
  // 如果后台调度器不支持杀掉task则抛出异常UnsupportedOperationException
  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

  /**  
   * Kills a task attempt.
   * Throw UnsupportedOperationException if the backend doesn't support kill a task.
   * @return Whether the task was successfully killed.
   */
  def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean

  // Kill all the running task attempts in a stage.
  // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

  // 将DAG调度器设置为upcalls , 保证在submitTasks 之前调用
  def setDAGScheduler(dagScheduler: DAGScheduler): Unit

  // 获取集群中使用的默认的并行度级别,应用到job
  def defaultParallelism(): Int

  /**  
   *   更新正在执行任务状态,让master知道BlockManager is alive ,
   *   如果驱动程序知道该BlockManager 返回true,  否则返回false, 表示该BlockManager应该重新注册
   *   @param execId
   *   @param accumUpdates
   *   @param blockManagerId
   *   @param executorUpdates
   *   @return
   */
  def executorHeartbeatReceived(
      execId: String,
      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
      blockManagerId: BlockManagerId,
      executorUpdates: ExecutorMetrics): Boolean

  /**  
   * 获取与job相关联的应用id
   * @return An application ID
   */
  def applicationId(): String = appId

  /**  
   *  进程丢失执行器
   */
  def executorLost(executorId: String, reason: ExecutorLossReason): Unit

  /**  
   *  进程 removed worker
   */
  def workerRemoved(workerId: String, host: String, message: String): Unit
  /**  
   *  获取与job相关的应用的attempt id
   * 
   *  @return An application's Attempt ID
   */
  def applicationAttemptId(): Option[String]

}

3.2 TaskSchedulerImpl源码

  • 任务调度器的具体实现

  • 通过 SchedulerBackend 执行多种类型群集的计划任务。也可通过将isLocal设置为true 来使用LocalSchedulerBackend 来使用本地设置。

  • 它处理常见逻辑,例如确定跨作业的调度顺序,唤醒以启动推测任务等。客户端应首先调用initialize()和start()方法,然后通过submitTasks()方法提交任务集。

  • TaskSchduler 一直维护这 Taskid 和 executorId

3.2.1 submitTasks

/**  
 *  接收DAGScheduler发送过来的taskSet
 *  @param taskSet
 */
override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
   // 为task创建一个taskSetManager,添加到队列里。TaskSetManager 跟踪每个task的执行状况,维护了task的许多具体信息
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    //获取taskSet所属的stageid
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =&gt;
      ts.taskSet != taskSet &amp;&amp; !ts.isZombie
    }
    if (conflictingTaskSet) {
      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
    }
    //将taskSetManager加入队列 有两个实现=》先入先出、公平调度
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal &amp;&amp; !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  backend.reviveOffers()
}

3.2.2 cancelTasks

override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
  logInfo("Cancelling stage " + stageId)
  // 取消一个stage的tasks
  killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled")
  // Cancel all attempts for the stage.
  taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =&gt;
    attempts.foreach { case (_, tsm) =&gt;
      tsm.abort("Stage %s cancelled".format(stageId))
      logInfo("Stage %d was cancelled".format(stageId))
    }
  }
}

override def killAllTaskAttempts(
    stageId: Int,
    interruptThread: Boolean,
    reason: String): Unit = synchronized {
  logInfo(s"Killing all running tasks in stage $stageId: $reason")
  taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =&gt;
    attempts.foreach { case (_, tsm) =&gt;
      // There are two possible cases here:
      // 这里有两种可能
      // 1. The task set manager has been created and some tasks have been scheduled.
      //    In this case, send a kill signal to the executors to kill the task.
      // 2. The task set manager has been created but no tasks have been scheduled. In this case,
      //    simply continue.
      tsm.runningTasksSet.foreach { tid =&gt;
        taskIdToExecutorId.get(tid).foreach { execId =&gt;
          //调用SchedulerBackend 的killTask方法
          backend.killTask(tid, execId, interruptThread, reason)
        }
      }
    }
  }
}

3.2.3 resourceOffer

  // 调用者是SchedulerBackend, 用途是底层资源SchedulerBackend会把空余的workers资源给TaskScheduler,
  // 让其根据策略为排队的任务分配合理的cpu 和内存资源, 然后把任务描述列表回传给SchedulerBackend
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  // 标记每个slave是可用的并且记录它的hostname
  // 如果添加了新的executor也会一起跟踪
  var newExecAvail = false
  for (o &lt;- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    //从 worker offers里收集executor和host对应信息,还有active、executors等。
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
    //收集机架信息
    for (rack &lt;- getRackForHost(o.host)) {
      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
  }

  // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, and also because
  // updating the blacklist is only relevant when task offers are being made.
  //在提供资源之前,删除黑名单中的节点。
  //这样做事为了避免单独线程增加同步开销,而且因为更新黑名单仅在进行任务提交时提供相关信息
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  //过滤掉黑名单中的节点
  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =&gt;
    offers.filter { offer =&gt;
      !blacklistTracker.isNodeBlacklisted(offer.host) &amp;&amp;
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  // worker offers资源列表进行shuffle, 任务列表里的任务列表依据调度策略进行依次排序
  val shuffledOffers = shuffleOffers(filteredOffers)
  // 构建一个分配给每个worker的task list.
  val tasks = shuffledOffers.map(o =&gt; new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
  val availableCpus = shuffledOffers.map(o =&gt; o.cores).toArray
  // 为每个taskSeethe提供可用的cpu核数,看是否满足
  // CPUS_PER_TASK  默认一个task需要一个cpu, 设置参数为spark.task.cpus=1
  val availableSlots = shuffledOffers.map(o =&gt; o.cores / CPUS_PER_TASK).sum
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  for (taskSet &lt;- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // 按照我们的调度顺序获取每个TaskSet,然后按地点级别的递增顺序为每个节点提供它,以便它有机会在所有节点上启动本地任务。
  // 注意:首选的位置顺序:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY
  for (taskSet &lt;- sortedTaskSets) {
    // 跳过barrier taskSet 如果available slots 的数量小于pending tasks.
    if (taskSet.isBarrier &amp;&amp; availableSlots &lt; taskSet.numTasks) {
      // 跳过启动过程.
      // TODO SPARK-24819 If the job requires more slots than available (both busy and free
      // slots), fail the job on submit.
      logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
        s"number of available slots is $availableSlots.")
    } else {
      //启动任务标识
      var launchedAnyTask = false
      // 记录了所有executor id上分配的barrier tasks
      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
      for (currentMaxLocality &lt;- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
            currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }

      if (!launchedAnyTask) { //任务是不可调度的
        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =&gt;

      // 如果taskSet是不可调度的,我们试着寻找一个现在空闲黑名单executor。
      // 如果找不到就会立即终止。
      // 否则我们会终止空闲executor并启动一个abortTimer,如果它没有在超时内安排任务,那么我们无法从taskSet安排任何任务,则会中止taskSet。
      // 注意1:我们基于每个任务集而不是基于每个任务来跟踪可调度性。
      // 注意2:当有多个空闲的列入黑名单的executor启用了动态分配时,仍可以中止taskSet。
      // 当ExecutorAllocationManager没有及时替换被杀死的空闲executor时,会发生这种情况,
      // 因为它依赖于挂起的任务,并且不会在空闲超时时终止执行程序,从而导致中止计时器到期并中止taskset。

        executorIdToRunningTaskIds.find(x =&gt; !isExecutorBusy(x._1)) match {
              case Some ((executorId, _)) =&gt;
                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                  //杀掉黑名单executor
                  blacklistTrackerOpt.foreach(blt =&gt; blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT)   1000
                  unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
                  logInfo(s"Waiting for $timeout ms for completely "
                    + s"blacklisted task to be schedulable again before aborting $taskSet.")
                  abortTimer.schedule(
                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
                }
              case None =&gt; // 立即终止
                logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
                  s" executors can be found to kill. Aborting $taskSet." )
                taskSet.abortSinceCompletelyBlacklisted(taskIndex)
            }
        }
      } else {

       // 我们希望推迟杀死任何taskSets,只要我们有一个非黑名单的executor,可以用来从任何活动的taskSet调度任务。
       // 这可确保作业可以取得进展。
       // 注意:从理论上讲,taskSet永远不会在非黑名单的执行程序上进行调度,并且由于不断提交新的TaskSet,中止计时器不会启动。 有关更多详细信息。

        if (unschedulableTaskSetToExpiryTime.nonEmpty) {
          logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
            "recently scheduled.")
          //将清理所有到期不可调度任务作为最近的一个task
          unschedulableTaskSetToExpiryTime.clear()
        }
      }

      if (launchedAnyTask &amp;&amp; taskSet.isBarrier) {
        // 检查barrier stage 是否部分启动。
        // TODO SPARK-24818 handle the assert failure case (that can happen when some locality
        // requirements are not fulfilled, and we should revert the launched tasks).
        require(addressesWithDescs.size == taskSet.numTasks,
          s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
            s"because only ${addressesWithDescs.size} out of a total number of " +
            s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
            "been blacklisted or cannot fulfill task locality requirements.")

        // 实现障碍协调员.
        maybeInitBarrierCoordinator()

        // 将taskInfos 更新到为所有barrier task 属性中。
        val addressesStr = addressesWithDescs
          // 根据partitionId排序的地址
          .sortBy(_._2.partitionId)
          .map(_._1)
          .mkString(",")
        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
          s"stage ${taskSet.stageId}.")
      }
    }
  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
  // 在配置的时间内启动.
  if (tasks.size &gt; 0) {
    hasLaunchedTask = true
  }
  return tasks
}

3.2.4 statusUpdate


/**  
 *   调用者是SchedulerBackend,用途是SchedulerBackend会将task执行的状态回报给taskScheduler做一些决定
 *   task finish 包括四种状态:finished、killed、failed、lost。只有finished是成功执行完成来,其它三种都是失败的
 *  
 *   @param tid
 *   @param state
 *   @param serializedData
 */
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
  var failedExecutor: Option[String] = None
  var reason: Option[ExecutorLossReason] = None
  synchronized {
    try {
      Option(taskIdToTaskSetManager.get(tid)) match {
        case Some(taskSet) =&gt;
          // 若taskLost:找到该task对应的executor,从active executor里移出,避免这个executor被分配到其他task继续失败下去。
          if (state == TaskState.LOST) {
            // TaskState.LOST 仅由不推荐使用的Mesos细粒度调度模式使用,
            // 其中每个执行程序对应于单个任务,因此将执行程序标记为失败。
            val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
              "taskIdToTaskSetManager.contains(tid) &lt;=&gt; taskIdToExecutorId.contains(tid)"))
            if (executorIdToRunningTaskIds.contains(execId)) {
              reason = Some(
                SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
              //从active executor里移出,避免这个executor被分配到其他task继续失败下去
              removeExecutor(execId, reason.get)
              failedExecutor = Some(execId)
            }
          }
          if (TaskState.isFinished(state)) {
            cleanupTaskState(tid)
            taskSet.removeRunningTask(tid)
            //task完成执行后
            if (state == TaskState.FINISHED) {
              //执行成功时调用
              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
              //执行失败时调用
              taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
            }
            //TaskResultGetter内部维护里一个线程池,负责一步fetch task执行结果并反序列话,
            // 默认四个线程做这件事,可配参数(spark.resultGetter.threads=4)
          }

        case None =&gt;
          logError(
            ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
              "likely the result of receiving duplicate task finished status updates) or its " +
              "executor has been marked as failed.")
              .format(state, tid))
      }
    } catch {
      case e: Exception =&gt; logError("Exception in statusUpdate", e)
    }
  }
  // 更新DAGScheduler而不对其进行锁定,因为这可能会死锁
  if (failedExecutor.isDefined) {
    assert(reason.isDefined)
    dagScheduler.executorLost(failedExecutor.get, reason.get)
    backend.reviveOffers()
  }
}

3.3 TaskResultGetter 获取 task result 的逻辑

对于success task,如果taskResult里的数据是直接结果数据直接把data反序列化出来得到结果。

directResult.value(taskResultSerializer.get())
// 如果不是,会调用

val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
// 从远程获取,如果远程取回数据是空的,那么会调用

scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
// 告诉它这个任务执行完成了,但是数据丢失了,否则渠道数据之后会通知BlockMaster移除这个block信息,调用

scheduler.handleSuccessfulTask(taskSetManager, tid, result)
// 告诉它这个任务是执行成功的,并且把result data 传回去。
// 对于failed task,从data里解析出fail的理由,调用
// 对于failed task , 从data里解析出fail 的理由

scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
// 告诉它失败的理由。

4. SchedulerBackend

  • 后台调度器 ( trait )

  • 在taskScheduler 下层, 用于对接不同的资源管理系统, SchedulerBackend 是一个Trait

  • 粗粒度:进程常住模式,典型代表是standalone模式、mesos粗粒度模式、yarn

  • 细粒度:mesos细粒度模式

  • 这里讨论粗细粒度模式,更好理解 CoarseGrainedSchedulerBackend

  • 维护executor相关信息(包括executor的地址、通讯地址、host、总核数,剩余核数),目前executor被注册使用了多少、剩余多少、总共还有多少核实空的等待。

4.1 SchedulerBackend 源码

package org.apache.spark.scheduler

// 用于调度系统的后端接口,允许 TaskSchedulerImpl 对接不同的系统。
// 我们假设一个类似Mesos的模型,其中应用程序在机器可用时获得资源,并可以在它们上启动任务。
private[spark] trait SchedulerBackend {
  private val appId = "spark-application-" + System.currentTimeMillis

  def start(): Unit
  def stop(): Unit

  /**   
   *   SchedulerBackend 将自己目前可用资源交给TaskScheduler, 
   *   TaskScheduler 根据调度策略分配给排队的任务, 返回一批可执行的任务描述(TaskSet),
   *   SchedulerBackend 负责launchTask, 即最终把task 塞到executor模型上, executor里的线程池会执行task的run()方法
   */
  def reviveOffers(): Unit
  def defaultParallelism(): Int

  /**  
   *  向executor发送请求杀掉一个正在执行的task
   *  
   *  @param taskId Id of the task.
   *  @param executorId Id of the executor the task is running on.
   *  @param interruptThread 执行程序是否应该中断任务线程。
   *  @param reason 任务终止的原因。
   */
  def killTask(
      taskId: Long,
      executorId: String,
      interruptThread: Boolean,
      reason: String): Unit =
    throw new UnsupportedOperationException

  def isReady(): Boolean = true

  /**  
   *  获取与job相关的application id
   *  @return An application ID
   */
  def applicationId(): String = appId

  /**  
   *   如果集群管理器支持多次尝试,请获取此运行的attempt ID。 在客户端模式下运行的应用程序将没有attempt ID。 
   *  @return The application attempt id, if available.
   */
  def applicationAttemptId(): Option[String] = None

  /**  
   *   获取驱动程序日志的URL。
   *   这些URL用于显示Driver 的UI Executors选项卡中的链接。
   *  @return Map containing the log names and their respective URLs
   */
  def getDriverLogUrls: Option[Map[String, String]] = None

  /**  
   *  获取driver的属性。 当指定自定义日志URL模式时,这些属性用于替换日志URL。
   *  @return Map containing attributes on driver.
   */
  def getDriverAttributes: Option[Map[String, String]] = None

  /**  
   *   获取当前可以同时启动的最大任务数。
   *   请注意,请不要缓存此方法返回的值,因为该数字可能会因添加/删除执行程序而更改。
   * 
   *  @return The max number of tasks that can be concurrent launched currently.
   */
  def maxNumConcurrentTasks(): Int

}

4.1.1 RegisterExecutor

/**   
 * 用来注册executor,通常worker拉起、重启会触发executor的注册,
 * CoarseGrainedSchedulerBackend会把这些executor维护起来,更新内部的资源,比如综合书增加,最后调用一次makeOffer()
 * 即把目前空闲资源丢给taskScheduler去分配一次
 * 返回taskSet,把任务启动起来,这个makeOffer() 的调用会出现在任何资源变化的事件中,下面会看到
 */
      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
        if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else if (scheduler.nodeBlacklist.contains(hostname)) {
          /**   
           *   如果cluster manager 在黑名单节点上给我们一个executor
           *   (因为它在我们通知我们的黑名单之前已经开始分配这些资源,或者它忽略了我们的黑名单),
           *   那么我们立即拒绝该执行者。
           */
          logInfo(s"Rejecting $executorId as it has been blacklisted.")
          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
          context.reply(true)
        } else {
          //如果executor的rpc env 没有监听到传入连接,则 hostPort 将设置为null,并且应该使用client 连接executor
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val data = new ExecutorData(executorRef, executorAddress, hostname,
            cores, cores, logUrls, attributes)
          // 这必须同步,因为在请求executor时会读取此块中变异的变量
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          executorRef.send(RegisteredExecutor)
          // 注意:一些测试期望在我们将执行程序放入映射之后得到答复
          context.reply(true)
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
        }

4.1.2 StatusUpdate

      //task 的状态回调
      case StatusUpdate(executorId, taskId, state, data) =>
        //首先调用scheduler.statusUpdate(taskId, state, data.value) 上报上去
        scheduler.statusUpdate(taskId, state, data.value)
        //然后判断这个task是否执行结束了
        if (TaskState.isFinished(state)) {
          //如果执行结束了
          executorDataMap.get(executorId) match {
              //匹配ExecutorData
            case Some(executorInfo) =>
              //把executor上的freeCore 加上去, 调用一次makeOffer()
              //这个事件就是别人直接向SchedulerBackend请求资源, 直接调用makeOffer()
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

4.1.3 killTask

//killTask
      case KillTask(taskId, executorId, interruptThread, reason) =>
        //找出executorid对应的executor 然后杀掉task
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.executorEndpoint.send(
              KillTask(taskId, executorId, interruptThread, reason))
          case None =>
            // Ignoring the task kill since the executor is not registered.
            logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
        }

4.1.4 stopExecutors

//通知每个executor 处理stopExecutor事件
case StopExecutors =>
    logInfo("Asking each executor to shut down")
    for ((_, executorData) <- executorDataMap) {
      executorData.executorEndpoint.send(StopExecutor)
    }
context.reply(true)

4.1.5 RemoveExecutor


case RemoveExecutor(executorId, reason) =>
  /**   
   *   我们将删除executor的状态,无法恢复它。
   *   但是,Driver和executor之间的连接可能仍然存在,
   *   因此执行程序不会自动退出,因此请尝试告诉执行程序自行停止。
   *   见SPARK-13519。 
   */
  executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
  removeExecutor(executorId, reason)
}

// 从集群中移除一个断开连接的slave
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
  logDebug(s"Asked to remove executor $executorId with reason $reason")
  executorDataMap.get(executorId) match {
    //通过executorId匹配executor
    case Some(executorInfo) =>
    //这必须同步,因为在请求executor时会读取此块中变异的变量
    val killed = CoarseGrainedSchedulerBackend.this.synchronized {
      addressToExecutorId -= executorInfo.executorAddress
      executorDataMap -= executorId
      executorsPendingLossReason -= executorId
      executorsPendingToRemove.remove(executorId).getOrElse(false)
    }
    totalCoreCount.addAndGet(-executorInfo.totalCores)
    totalRegisteredExecutors.addAndGet(-1)
    // TaskScheduler.executorLost() 方法, 通知上层我这边有一批资源不能用了,你处理下吧,
    // TaskScheduler会继续把 executorLost事件上报给DAGScheduler,
    // 原因是DAGScheduler关心shuffle任务的outputlocation,
    // DAGScheduler会告诉BlockManager这个executor不能用了,然后移走它,
    // 然后把所有的stage的shuffleOutput信息都遍历一遍, 移走这个executor,
    // 并且更新后的shuffleoutput信息注册到MapOutputTracker 上, 最后清理下本地的CacheLocationsMap
    scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
    listenerBus.post(
      SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
    case None =>
    // SPARK-15262: If an executor is still alive even after the scheduler has removed
    // its metadata, we may receive a heartbeat from that executor and tell its block
    // manager to reregister itself. If that happens, the block manager master will know
    // about the executor, but the scheduler will not. Therefore, we should remove the
    // executor from the block manager when we hit this case.
    scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
    logInfo(s"Asked to remove non-existent executor $executorId")
  }
}

4.1.6 reviveOffers


//直接调用了makeOffers()方法,等到一批和执行的任务描述(taskSet) , 调用launchTasks
case ReviveOffers =>
makeOffers()

4.1.7 makeOffers


// 为所有executor提供虚拟资源
private def makeOffers() {
  // 确保在某个任务启动时没有executor被杀死
  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
    // 过滤出活跃的executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
      new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
                      Some(executorData.executorAddress.hostPort))
    }.toIndexedSeq
    scheduler.resourceOffers(workOffers)
  }
  if (!taskDescs.isEmpty) {
    launchTasks(taskDescs)
  }
}

4.1.8 launchTask


// 启动一组提供资源的 task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    //将task转换为二进制
    val serializedTask = TaskDescription.encode(task)
    //判断二进制数据大小,如果二进制数据超过最大范围则报错
    if (serializedTask.limit() >= maxRpcMessageSize) {
      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
          s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
          s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
               s"${executorData.executorHost}.")
      //将task 发送给executor执行
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

4.2 后台调度器的子类实现

下面有三个子实现类

  1. CoarseGrainedSchedulerBackend
  2. LocalSchedulerBackend
  3. StandaloneSchedulerBackend

4.2.1 CoarseGrainedSchedulerBackend


/**   
 *   调度程序后端代码,等待粗粒度执行程序连接。
 *   在Spark执行期间保留每个executor,不会在程序完成时关闭executor,在新的任务进来时不会创建新的executor。
 *   executor可以以多种方式运行,比如粗粒度mesos模式下或者standalone
 */
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
  extends ExecutorAllocationClient with SchedulerBackend with Logging

4.2.2 LocalSchedulerBackend

/**  
 *   本地调度器,在本地版的spark时使用,
 *   其中 executor, backend,master 都运行在相同的jvm中, 它位于 TaskSchedulerImpl的下一层, 
 *   在本地运行单个executor (由 LocalSchedulerBackend创建) 上处理启动等任务
 *   对[[LocalSchedulerBackend]]的调用都是通过LocalEndpoint序列化的。 使用RpcEndpoint使[[LocalSchedulerBackend]]上的调用异步,这是必要的
 *     防止[[LocalSchedulerBackend]]和[[TaskSchedulerImpl]]之间的死锁。
 */
private[spark] class LocalEndpoint(
    override val rpcEnv: RpcEnv,
    userClassPath: Seq[URL],
    scheduler: TaskSchedulerImpl,
    executorBackend: LocalSchedulerBackend,
    private val totalCores: Int)
  extends ThreadSafeRpcEndpoint with Logging

4.2.3 StandaloneSchedulerBackend

// A [[SchedulerBackend]] 针对spark standalone 集群模式的实现
private[spark] class StandaloneSchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with StandaloneAppClientListener
  with Logging

3. Spark 提交作业图形化


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
HDFS 负载均衡策略 HDFS 负载均衡策略
1. Hadoop HDFS介绍Hadoop 分布式文件系统(Hadoop Distributed File System),简称 HDFS,被设计成适合运行在通用硬件上的分布式文件系统。它和现有的分布式文件系统有很多的共同点。HDFS 是
2019-08-29
下一篇 
Spark 中的基本概念 Spark 中的基本概念
1. Application应用 Spark上运行的应用, 包含了驱动器进程(Driver)和集群上的执行器进程(Executor) 每个Application 只有一个Driver 但是可以有多个Executor 2. Appli
2019-08-19
  目录