Spark 任务调度解析


1. Spark任务调度流程图

2 DAGScheduler

2.1 DAGScheduler 介绍

  • DAG:Directed Acyclic Graph ,有向无环图

  • 高级调度器,面向阶段调度。(map阶段、reduce阶段)(stage-0、stage-1)。

2.2 主要职能

  1. 接收 Job ,拆分 Job , 传递到下层调度器
  • DAGScheduler 是接收 Job 的主入口,在SparkContext中会调用 submitJob(rdd…)runJob(rdd…) 这两个方法提交任务。

  • DAGScheduler 将 RDD 以 shuffle 为边界创建 Stage

    • 窄依赖操作通过管道在每个阶段划分到一个taskSet中(如:map、filter都属于窄依赖操作)
    • 宽依赖的操作需要多个阶段,对于每个stage最后只有宽依赖会依赖其他阶段。
    • 宽依赖或者窄依赖的操作实际发生在各种RDD的compute函数中。
  • DAGScheduler 为 Job 计算 Stage 的 DAG ,跟踪哪个 RDD 的 Stage 的输出需要被进行物化,寻找最小调度进行作业。

  • DAGScheduler 生成stage并提交,接着判断这个 Stage 是否有父 Stage 未完成,如果有提交并等待父 Stage,以此类推结果是DAGScheduler里增加了一些Waiting Stage和一个Running Stage。

    • Running Stage提交后,分析 Stage 里 Task 的类型,生成一个task描述(taskSet)。
  • TaskSet 包含了独立的 Task,可以基于集群上已经存在的数据独立运行(如:上一个Stage的map output),但是如果这些数据不可用会导致task运行失败。

  • DAGScheduler 调用 TaskScheduler.submitTask(taskSet,…) 方法,以任务集(taskSet)方式提交Stage给 TaskScheduler,以实现在集群上运行的机制。

  • DAG调度器还会根据当前缓存状态确定每个任务的首选位置。并将这些位置信息传递给TaskScheduler。

  • TaskScheduler 会依据资源量和出发分配条件,为这个 TaskSet 分配资源并触发执行

  • DAGScheduler 提交job后,异步返回 jobWaiter 对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果。

  1. 用于跟踪作业追踪器上的任务结果事件
  • 如果 Task 执行成功,在对应的 Stage 里减去这个Task,接下来做一些计数工作
  1. 如果 Task 是 ResultTask :累加器 + 1,在 Job 中为该 Task 设置为true,JobFinish 总数+1。

    1. 加完之后如果Finish数目与Partition数目相等,说明这个 Stage 完成了,标记 Stage 为完成,从 Running Stage 里减去这个 Stage,做一些stage清理工作。
  2. 如果 Task 是 ShuffleMapTask:累加器 + 1,在Stage里加上一个OutPutLocation,里面是一个 MapStatus 类。

    1. 同时检查该 Stage 是否完成,向 MapOutputTracker 注册该 Stage 里的 Shuffle 和 Location 信息
    2. 然后检查 Stage 的 OutPutLocation 是否存在空,
    3. 如果存在空说明一些 Task 失败了,整个Stage重新提交。
    4. 如果不存在空,继续从waiting stage中提交下一个需要执行的stage。
    5. (MapStatus是ShuffleMap Task执行完成的返回,包含Location和BlockSize信息)。
  • 如果 Task 失败重复提交,在对应 Stage 里增加这个Task。
  1. 如果task是获取失败,马上标记对应的 Stage 完成,从 Running Stages 里减去,不允许retry并且终止整个stage。
  2. 如果不是获取失败则重新提交整个Stage,另外把这个Fetch相关的Location和Map任务信息从Stage里剔除,从mapOutputTracker注销掉,
  3. 最后如果这次fetch的blockManager对象部位空做一次ExecutorLost处理,下次shuffle会换在另一个executor上去执行。
  • 其他 Task 状态会由 TaskScheduler 处理,如Execution、TaskResultLost、commitDenied等。

  • 其他与job相关的还包括:Cancel Job、Cancel Stage、ReSubmit Failed Stage 等。

2.3 其他职能

  • 缓存跟踪 Cache tracking

DAG调度器分析被缓存的RDD, 以避免重复计算, 也会记住map阶段产生了output的shuffle 避免重复map过程。

  • 首选位置 Prefered location

DAG调度器基于底层RDD, 缓存的配置或者shuffle的数据参数得到首位置选择, 并计算task运行地点。

  • CleanUp

Job 完成后数据结构被清除, 防止内存泄漏,为了故障修复,同以stage可能被执行多次,称之为”attemp”

如果task调度器报告错误是因为上一个stage的map output 丢失, DAG调度器就会提交丢失的Stage,这是通过compeletion with fetchFailed 或者 ExecutorLost event 对象检测的,DAG调度器会等待一段时间,看是否其他节点或者task也出现故障, 然后一起提交lost Stage 的task集合。

2.4 DAG 图

1
2
3
4
5
6
7
val rdd1 = sc.makeRDD(1 to 10)
val rdd2 = rdd1.filter(_ < 6)
val rdd3 = rdd1.filter(_ >= 6)
val rdd4 = rdd2.union(rdd3)
val rdd5 = rdd4.map(e=>(e,1))
val rdd6 = rdd5.reduceByKey(_ + _)
val rdd7 = rdd6.filter(e=>(e _1) %2 == 0).repartition(3).reduceByKey(_ + _).collect

3. TaskScheduler

  • 任务调度器(Trait)

  • 底层任务调度接口,有专门的实现(TaskSchedulerImpl)。

  • 每个调度器任务需要一个单独的SparkContext对象。

  • 该调度器会从DAG调度器中得到提交的TaskSet,并负责将Task发送给cluster运行它们。如果运行出错会重试,返回事件对象给DAG调度器。

  • 维护task和executor对应关系,executor和物理资源的对应关系。在排队的task和正在运行的task。

  • 内部维护一个队列根据FIFO(先入先出)或者Fair(公平调度)策略,调度任务。

3.1 TaskScheduler中的抽象方法

TaskScheduler本身是一个Trait,spark里只TaskSchedulerImpl实现了TaskScheduler ,理论上任务调度器可以定制,下面是TaskScheduler的所有抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2

/**
* Low-level task scheduler interface, currently implemented exclusively by
* [[org.apache.spark.scheduler.TaskSchedulerImpl]].
* This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
* them, retrying if there are failures, and mitigating stragglers. They return events to the
* DAGScheduler.
*/
private[spark] trait TaskScheduler {

private val appId = "spark-application-" + System.currentTimeMillis
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit

/**
* 系统成功初始化后调用(通常在spark context中)
* yarn 通过它来选择首选位置 分配资源等
* 等待slave注册等
*/
def postStartHook() { }

// 断开集群连接
def stop(): Unit

// 提交一个tesk 队列运行
def submitTasks(taskSet: TaskSet): Unit

// 杀死stage中的所有task,并且使该stage以及所有依赖该stage的stage失败
// 如果后台调度器不支持杀掉task则抛出异常UnsupportedOperationException
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

/**
* Kills a task attempt.
* Throw UnsupportedOperationException if the backend doesn't support kill a task.
* @return Whether the task was successfully killed.
*/
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean

// Kill all the running task attempts in a stage.
// Throw UnsupportedOperationException if the backend doesn't support kill tasks.
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

// 将DAG调度器设置为upcalls , 保证在submitTasks 之前调用
def setDAGScheduler(dagScheduler: DAGScheduler): Unit

// 获取集群中使用的默认的并行度级别,应用到job
def defaultParallelism(): Int

/**
* 更新正在执行任务状态,让master知道BlockManager is alive ,
* 如果驱动程序知道该BlockManager 返回true, 否则返回false, 表示该BlockManager应该重新注册
* @param execId
* @param accumUpdates
* @param blockManagerId
* @param executorUpdates
* @return
*/
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics): Boolean

/**
* 获取与job相关联的应用id
* @return An application ID
*/
def applicationId(): String = appId

/**
* 进程丢失执行器
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit

/**
* 进程 removed worker
*/
def workerRemoved(workerId: String, host: String, message: String): Unit
/**
* 获取与job相关的应用的attempt id
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): Option[String]

}

3.2 TaskSchedulerImpl源码

  • 任务调度器的具体实现

  • 通过 SchedulerBackend 执行多种类型群集的计划任务。也可通过将isLocal设置为true 来使用LocalSchedulerBackend 来使用本地设置。

  • 它处理常见逻辑,例如确定跨作业的调度顺序,唤醒以启动推测任务等。客户端应首先调用initialize()和start()方法,然后通过submitTasks()方法提交任务集。

  • TaskSchduler 一直维护这 Taskid 和 executorId

3.2.1 submitTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**  
* 接收DAGScheduler发送过来的taskSet
* @param taskSet
*/
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 为task创建一个taskSetManager,添加到队列里。TaskSetManager 跟踪每个task的执行状况,维护了task的许多具体信息
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//获取taskSet所属的stageid
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =&gt;
ts.taskSet != taskSet &amp;&amp; !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
//将taskSetManager加入队列 有两个实现=》先入先出、公平调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal &amp;&amp; !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}

3.2.2 cancelTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
// 取消一个stage的tasks
killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled")
// Cancel all attempts for the stage.
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =&gt;
attempts.foreach { case (_, tsm) =&gt;
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
}
}
}

override def killAllTaskAttempts(
stageId: Int,
interruptThread: Boolean,
reason: String): Unit = synchronized {
logInfo(s"Killing all running tasks in stage $stageId: $reason")
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =&gt;
attempts.foreach { case (_, tsm) =&gt;
// There are two possible cases here:
// 这里有两种可能
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task.
// 2. The task set manager has been created but no tasks have been scheduled. In this case,
// simply continue.
tsm.runningTasksSet.foreach { tid =&gt;
taskIdToExecutorId.get(tid).foreach { execId =&gt;
//调用SchedulerBackend 的killTask方法
backend.killTask(tid, execId, interruptThread, reason)
}
}
}
}
}

3.2.3 resourceOffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  // 调用者是SchedulerBackend, 用途是底层资源SchedulerBackend会把空余的workers资源给TaskScheduler,
// 让其根据策略为排队的任务分配合理的cpu 和内存资源, 然后把任务描述列表回传给SchedulerBackend
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 标记每个slave是可用的并且记录它的hostname
// 如果添加了新的executor也会一起跟踪
var newExecAvail = false
for (o &lt;- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
//从 worker offers里收集executor和host对应信息,还有active、executors等。
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
//收集机架信息
for (rack &lt;- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
//在提供资源之前,删除黑名单中的节点。
//这样做事为了避免单独线程增加同步开销,而且因为更新黑名单仅在进行任务提交时提供相关信息
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

//过滤掉黑名单中的节点
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =&gt;
offers.filter { offer =&gt;
!blacklistTracker.isNodeBlacklisted(offer.host) &amp;&amp;
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)

// worker offers资源列表进行shuffle, 任务列表里的任务列表依据调度策略进行依次排序
val shuffledOffers = shuffleOffers(filteredOffers)
// 构建一个分配给每个worker的task list.
val tasks = shuffledOffers.map(o =&gt; new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o =&gt; o.cores).toArray
// 为每个taskSeethe提供可用的cpu核数,看是否满足
// CPUS_PER_TASK 默认一个task需要一个cpu, 设置参数为spark.task.cpus=1
val availableSlots = shuffledOffers.map(o =&gt; o.cores / CPUS_PER_TASK).sum
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet &lt;- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}

// 按照我们的调度顺序获取每个TaskSet,然后按地点级别的递增顺序为每个节点提供它,以便它有机会在所有节点上启动本地任务。
// 注意:首选的位置顺序:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY
for (taskSet &lt;- sortedTaskSets) {
// 跳过barrier taskSet 如果available slots 的数量小于pending tasks.
if (taskSet.isBarrier &amp;&amp; availableSlots &lt; taskSet.numTasks) {
// 跳过启动过程.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
// slots), fail the job on submit.
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is $availableSlots.")
} else {
//启动任务标识
var launchedAnyTask = false
// 记录了所有executor id上分配的barrier tasks
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
for (currentMaxLocality &lt;- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}

if (!launchedAnyTask) { //任务是不可调度的
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =&gt;

// 如果taskSet是不可调度的,我们试着寻找一个现在空闲黑名单executor。
// 如果找不到就会立即终止。
// 否则我们会终止空闲executor并启动一个abortTimer,如果它没有在超时内安排任务,那么我们无法从taskSet安排任何任务,则会中止taskSet。
// 注意1:我们基于每个任务集而不是基于每个任务来跟踪可调度性。
// 注意2:当有多个空闲的列入黑名单的executor启用了动态分配时,仍可以中止taskSet。
// 当ExecutorAllocationManager没有及时替换被杀死的空闲executor时,会发生这种情况,
// 因为它依赖于挂起的任务,并且不会在空闲超时时终止执行程序,从而导致中止计时器到期并中止taskset。

executorIdToRunningTaskIds.find(x =&gt; !isExecutorBusy(x._1)) match {
case Some ((executorId, _)) =&gt;
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
//杀掉黑名单executor
blacklistTrackerOpt.foreach(blt =&gt; blt.killBlacklistedIdleExecutor(executorId))

val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}
case None =&gt; // 立即终止
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {

// 我们希望推迟杀死任何taskSets,只要我们有一个非黑名单的executor,可以用来从任何活动的taskSet调度任务。
// 这可确保作业可以取得进展。
// 注意:从理论上讲,taskSet永远不会在非黑名单的执行程序上进行调度,并且由于不断提交新的TaskSet,中止计时器不会启动。 有关更多详细信息。

if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
"recently scheduled.")
//将清理所有到期不可调度任务作为最近的一个task
unschedulableTaskSetToExpiryTime.clear()
}
}

if (launchedAnyTask &amp;&amp; taskSet.isBarrier) {
// 检查barrier stage 是否部分启动。
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
require(addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number of " +
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
"been blacklisted or cannot fulfill task locality requirements.")

// 实现障碍协调员.
maybeInitBarrierCoordinator()

// 将taskInfos 更新到为所有barrier task 属性中。
val addressesStr = addressesWithDescs
// 根据partitionId排序的地址
.sortBy(_._2.partitionId)
.map(_._1)
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
}
}
}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// 在配置的时间内启动.
if (tasks.size &gt; 0) {
hasLaunchedTask = true
}
return tasks
}

3.2.4 statusUpdate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
* 调用者是SchedulerBackend,用途是SchedulerBackend会将task执行的状态回报给taskScheduler做一些决定
* task finish 包括四种状态:finished、killed、failed、lost。只有finished是成功执行完成来,其它三种都是失败的
*
* @param tid
* @param state
* @param serializedData
*/
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
Option(taskIdToTaskSetManager.get(tid)) match {
case Some(taskSet) =&gt;
// 若taskLost:找到该task对应的executor,从active executor里移出,避免这个executor被分配到其他task继续失败下去。
if (state == TaskState.LOST) {
// TaskState.LOST 仅由不推荐使用的Mesos细粒度调度模式使用,
// 其中每个执行程序对应于单个任务,因此将执行程序标记为失败。
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) &lt;=&gt; taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
//从active executor里移出,避免这个executor被分配到其他task继续失败下去
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
//task完成执行后
if (state == TaskState.FINISHED) {
//执行成功时调用
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
//执行失败时调用
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
//TaskResultGetter内部维护里一个线程池,负责一步fetch task执行结果并反序列话,
// 默认四个线程做这件事,可配参数(spark.resultGetter.threads=4)
}

case None =&gt;
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception =&gt; logError("Exception in statusUpdate", e)
}
}
// 更新DAGScheduler而不对其进行锁定,因为这可能会死锁
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}

3.3 TaskResultGetter 获取 task result 的逻辑

对于success task,如果taskResult里的数据是直接结果数据直接把data反序列化出来得到结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
directResult.value(taskResultSerializer.get())
// 如果不是,会调用

val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
// 从远程获取,如果远程取回数据是空的,那么会调用

scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
// 告诉它这个任务执行完成了,但是数据丢失了,否则渠道数据之后会通知BlockMaster移除这个block信息,调用

scheduler.handleSuccessfulTask(taskSetManager, tid, result)
// 告诉它这个任务是执行成功的,并且把result data 传回去。
// 对于failed task,从data里解析出fail的理由,调用
// 对于failed task , 从data里解析出fail 的理由

scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
// 告诉它失败的理由。

4. SchedulerBackend

  • 后台调度器 ( trait )

  • 在taskScheduler 下层, 用于对接不同的资源管理系统, SchedulerBackend 是一个Trait

  • 粗粒度:进程常住模式,典型代表是standalone模式、mesos粗粒度模式、yarn

  • 细粒度:mesos细粒度模式

  • 这里讨论粗细粒度模式,更好理解 CoarseGrainedSchedulerBackend

  • 维护executor相关信息(包括executor的地址、通讯地址、host、总核数,剩余核数),目前executor被注册使用了多少、剩余多少、总共还有多少核实空的等待。

4.1 SchedulerBackend 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.apache.spark.scheduler

// 用于调度系统的后端接口,允许 TaskSchedulerImpl 对接不同的系统。
// 我们假设一个类似Mesos的模型,其中应用程序在机器可用时获得资源,并可以在它们上启动任务。
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis

def start(): Unit
def stop(): Unit

/**
* SchedulerBackend 将自己目前可用资源交给TaskScheduler,
* TaskScheduler 根据调度策略分配给排队的任务, 返回一批可执行的任务描述(TaskSet),
* SchedulerBackend 负责launchTask, 即最终把task 塞到executor模型上, executor里的线程池会执行task的run()方法
*/
def reviveOffers(): Unit
def defaultParallelism(): Int

/**
* 向executor发送请求杀掉一个正在执行的task
*
* @param taskId Id of the task.
* @param executorId Id of the executor the task is running on.
* @param interruptThread 执行程序是否应该中断任务线程。
* @param reason 任务终止的原因。
*/
def killTask(
taskId: Long,
executorId: String,
interruptThread: Boolean,
reason: String): Unit =
throw new UnsupportedOperationException

def isReady(): Boolean = true

/**
* 获取与job相关的application id
* @return An application ID
*/
def applicationId(): String = appId

/**
* 如果集群管理器支持多次尝试,请获取此运行的attempt ID。 在客户端模式下运行的应用程序将没有attempt ID。
* @return The application attempt id, if available.
*/
def applicationAttemptId(): Option[String] = None

/**
* 获取驱动程序日志的URL。
* 这些URL用于显示Driver 的UI Executors选项卡中的链接。
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None

/**
* 获取driver的属性。 当指定自定义日志URL模式时,这些属性用于替换日志URL。
* @return Map containing attributes on driver.
*/
def getDriverAttributes: Option[Map[String, String]] = None

/**
* 获取当前可以同时启动的最大任务数。
* 请注意,请不要缓存此方法返回的值,因为该数字可能会因添加/删除执行程序而更改。
*
* @return The max number of tasks that can be concurrent launched currently.
*/
def maxNumConcurrentTasks(): Int

}

4.1.1 RegisterExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**   
* 用来注册executor,通常worker拉起、重启会触发executor的注册,
* CoarseGrainedSchedulerBackend会把这些executor维护起来,更新内部的资源,比如综合书增加,最后调用一次makeOffer()
* 即把目前空闲资源丢给taskScheduler去分配一次
* 返回taskSet,把任务启动起来,这个makeOffer() 的调用会出现在任何资源变化的事件中,下面会看到
*/
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist.contains(hostname)) {
/**
* 如果cluster manager 在黑名单节点上给我们一个executor
* (因为它在我们通知我们的黑名单之前已经开始分配这些资源,或者它忽略了我们的黑名单),
* 那么我们立即拒绝该执行者。
*/
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
//如果executor的rpc env 没有监听到传入连接,则 hostPort 将设置为null,并且应该使用client 连接executor
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls, attributes)
// 这必须同步,因为在请求executor时会读取此块中变异的变量
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor)
// 注意:一些测试期望在我们将执行程序放入映射之后得到答复
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}

4.1.2 StatusUpdate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//task 的状态回调
case StatusUpdate(executorId, taskId, state, data) =>
//首先调用scheduler.statusUpdate(taskId, state, data.value) 上报上去
scheduler.statusUpdate(taskId, state, data.value)
//然后判断这个task是否执行结束了
if (TaskState.isFinished(state)) {
//如果执行结束了
executorDataMap.get(executorId) match {
//匹配ExecutorData
case Some(executorInfo) =>
//把executor上的freeCore 加上去, 调用一次makeOffer()
//这个事件就是别人直接向SchedulerBackend请求资源, 直接调用makeOffer()
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}

4.1.3 killTask

1
2
3
4
5
6
7
8
9
10
11
//killTask
case KillTask(taskId, executorId, interruptThread, reason) =>
//找出executorid对应的executor 然后杀掉task
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
KillTask(taskId, executorId, interruptThread, reason))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

4.1.4 stopExecutors

1
2
3
4
5
6
7
//通知每个executor 处理stopExecutor事件
case StopExecutors =>
logInfo("Asking each executor to shut down")
for ((_, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(StopExecutor)
}
context.reply(true)

4.1.5 RemoveExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

case RemoveExecutor(executorId, reason) =>
/**
* 我们将删除executor的状态,无法恢复它。
* 但是,Driver和executor之间的连接可能仍然存在,
* 因此执行程序不会自动退出,因此请尝试告诉执行程序自行停止。
* 见SPARK-13519。
*/
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}

// 从集群中移除一个断开连接的slave
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
//通过executorId匹配executor
case Some(executorInfo) =>
//这必须同步,因为在请求executor时会读取此块中变异的变量
val killed = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
// TaskScheduler.executorLost() 方法, 通知上层我这边有一批资源不能用了,你处理下吧,
// TaskScheduler会继续把 executorLost事件上报给DAGScheduler,
// 原因是DAGScheduler关心shuffle任务的outputlocation,
// DAGScheduler会告诉BlockManager这个executor不能用了,然后移走它,
// 然后把所有的stage的shuffleOutput信息都遍历一遍, 移走这个executor,
// 并且更新后的shuffleoutput信息注册到MapOutputTracker 上, 最后清理下本地的CacheLocationsMap
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None =>
// SPARK-15262: If an executor is still alive even after the scheduler has removed
// its metadata, we may receive a heartbeat from that executor and tell its block
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}

4.1.6 reviveOffers

1
2
3
4

//直接调用了makeOffers()方法,等到一批和执行的任务描述(taskSet) , 调用launchTasks
case ReviveOffers =>
makeOffers()

4.1.7 makeOffers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// 为所有executor提供虚拟资源
private def makeOffers() {
// 确保在某个任务启动时没有executor被杀死
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// 过滤出活跃的executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}

4.1.8 launchTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

// 启动一组提供资源的 task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//将task转换为二进制
val serializedTask = TaskDescription.encode(task)
//判断二进制数据大小,如果二进制数据超过最大范围则报错
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//将task 发送给executor执行
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}

4.2 后台调度器的子类实现

下面有三个子实现类

  1. CoarseGrainedSchedulerBackend
  2. LocalSchedulerBackend
  3. StandaloneSchedulerBackend

4.2.1 CoarseGrainedSchedulerBackend

1
2
3
4
5
6
7
8
9

/**
* 调度程序后端代码,等待粗粒度执行程序连接。
* 在Spark执行期间保留每个executor,不会在程序完成时关闭executor,在新的任务进来时不会创建新的executor。
* executor可以以多种方式运行,比如粗粒度mesos模式下或者standalone
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging

4.2.2 LocalSchedulerBackend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**  
* 本地调度器,在本地版的spark时使用,
* 其中 executor, backend,master 都运行在相同的jvm中, 它位于 TaskSchedulerImpl的下一层,
* 在本地运行单个executor (由 LocalSchedulerBackend创建) 上处理启动等任务
* 对[[LocalSchedulerBackend]]的调用都是通过LocalEndpoint序列化的。 使用RpcEndpoint使[[LocalSchedulerBackend]]上的调用异步,这是必要的
* 防止[[LocalSchedulerBackend]]和[[TaskSchedulerImpl]]之间的死锁。
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalSchedulerBackend,
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging

4.2.3 StandaloneSchedulerBackend

1
2
3
4
5
6
7
8
9
// A [[SchedulerBackend]] 针对spark standalone 集群模式的实现
private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging

3. Spark 提交作业图形化


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录