1. Spark任务调度流程图
2 DAGScheduler
2.1 DAGScheduler 介绍
DAG:Directed Acyclic Graph ,有向无环图
高级调度器,面向阶段调度。(map阶段、reduce阶段)(stage-0、stage-1)。
2.2 主要职能
- 接收 Job ,拆分 Job , 传递到下层调度器
DAGScheduler 是接收 Job 的主入口,在SparkContext中会调用
submitJob(rdd…)
或runJob(rdd…)
这两个方法提交任务。DAGScheduler 将 RDD 以 shuffle 为边界创建 Stage
- 窄依赖操作通过管道在每个阶段划分到一个taskSet中(如:map、filter都属于窄依赖操作)
- 宽依赖的操作需要多个阶段,对于每个stage最后只有宽依赖会依赖其他阶段。
- 宽依赖或者窄依赖的操作实际发生在各种RDD的compute函数中。
DAGScheduler 为 Job 计算 Stage 的 DAG ,跟踪哪个 RDD 的 Stage 的输出需要被进行物化,寻找最小调度进行作业。
DAGScheduler 生成stage并提交,接着判断这个 Stage 是否有父 Stage 未完成,如果有提交并等待父 Stage,以此类推结果是DAGScheduler里增加了一些Waiting Stage和一个Running Stage。
- Running Stage提交后,分析 Stage 里 Task 的类型,生成一个task描述(taskSet)。
TaskSet 包含了独立的 Task,可以基于集群上已经存在的数据独立运行(如:上一个Stage的map output),但是如果这些数据不可用会导致task运行失败。
DAGScheduler 调用 TaskScheduler.submitTask(taskSet,…) 方法,以任务集(taskSet)方式提交Stage给 TaskScheduler,以实现在集群上运行的机制。
DAG调度器还会根据当前缓存状态确定每个任务的首选位置。并将这些位置信息传递给TaskScheduler。
TaskScheduler 会依据资源量和出发分配条件,为这个 TaskSet 分配资源并触发执行
DAGScheduler 提交job后,异步返回 jobWaiter 对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果。
- 用于跟踪作业追踪器上的任务结果事件
- 如果 Task 执行成功,在对应的 Stage 里减去这个Task,接下来做一些计数工作
如果 Task 是
ResultTask
:累加器 + 1,在 Job 中为该 Task 设置为true,JobFinish 总数+1。- 加完之后如果Finish数目与Partition数目相等,说明这个 Stage 完成了,标记 Stage 为完成,从 Running Stage 里减去这个 Stage,做一些stage清理工作。
- 如果 Task 是
ShuffleMapTask
:累加器 + 1,在Stage里加上一个OutPutLocation,里面是一个 MapStatus 类。- 同时检查该 Stage 是否完成,向 MapOutputTracker 注册该 Stage 里的 Shuffle 和 Location 信息
- 然后检查 Stage 的 OutPutLocation 是否存在空,
- 如果存在空说明一些 Task 失败了,整个Stage重新提交。
- 如果不存在空,继续从waiting stage中提交下一个需要执行的stage。
- (MapStatus是ShuffleMap Task执行完成的返回,包含Location和BlockSize信息)。
- 如果 Task 失败重复提交,在对应 Stage 里增加这个Task。
- 如果task是获取失败,马上标记对应的 Stage 完成,从 Running Stages 里减去,不允许retry并且终止整个stage。
- 如果不是获取失败则重新提交整个Stage,另外把这个Fetch相关的Location和Map任务信息从Stage里剔除,从mapOutputTracker注销掉,
- 最后如果这次fetch的blockManager对象部位空做一次ExecutorLost处理,下次shuffle会换在另一个executor上去执行。
其他 Task 状态会由 TaskScheduler 处理,如Execution、TaskResultLost、commitDenied等。
其他与job相关的还包括:Cancel Job、Cancel Stage、ReSubmit Failed Stage 等。
2.3 其他职能
- 缓存跟踪 Cache tracking
DAG调度器分析被缓存的RDD, 以避免重复计算, 也会记住map阶段产生了output的shuffle 避免重复map过程。
- 首选位置 Prefered location
DAG调度器基于底层RDD, 缓存的配置或者shuffle的数据参数得到首位置选择, 并计算task运行地点。
- CleanUp
Job 完成后数据结构被清除, 防止内存泄漏,为了故障修复,同以stage可能被执行多次,称之为”attemp”
如果task调度器报告错误是因为上一个stage的map output 丢失, DAG调度器就会提交丢失的Stage,这是通过compeletion with fetchFailed 或者 ExecutorLost event 对象检测的,DAG调度器会等待一段时间,看是否其他节点或者task也出现故障, 然后一起提交lost Stage 的task集合。
2.4 DAG 图
val rdd1 = sc.makeRDD(1 to 10)
val rdd2 = rdd1.filter(_ < 6)
val rdd3 = rdd1.filter(_ >= 6)
val rdd4 = rdd2.union(rdd3)
val rdd5 = rdd4.map(e=>(e,1))
val rdd6 = rdd5.reduceByKey(_ + _)
val rdd7 = rdd6.filter(e=>(e _1) %2 == 0).repartition(3).reduceByKey(_ + _).collect
3. TaskScheduler
任务调度器(Trait)
底层任务调度接口,有专门的实现(TaskSchedulerImpl)。
每个调度器任务需要一个单独的SparkContext对象。
该调度器会从DAG调度器中得到提交的TaskSet,并负责将Task发送给cluster运行它们。如果运行出错会重试,返回事件对象给DAG调度器。
维护task和executor对应关系,executor和物理资源的对应关系。在排队的task和正在运行的task。
内部维护一个队列根据FIFO(先入先出)或者Fair(公平调度)策略,调度任务。
3.1 TaskScheduler中的抽象方法
TaskScheduler本身是一个Trait,spark里只TaskSchedulerImpl实现了TaskScheduler ,理论上任务调度器可以定制,下面是TaskScheduler的所有抽象方法
package org.apache.spark.scheduler
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
/**
* Low-level task scheduler interface, currently implemented exclusively by
* [[org.apache.spark.scheduler.TaskSchedulerImpl]].
* This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
* them, retrying if there are failures, and mitigating stragglers. They return events to the
* DAGScheduler.
*/
private[spark] trait TaskScheduler {
private val appId = "spark-application-" + System.currentTimeMillis
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit
/**
* 系统成功初始化后调用(通常在spark context中)
* yarn 通过它来选择首选位置 分配资源等
* 等待slave注册等
*/
def postStartHook() { }
// 断开集群连接
def stop(): Unit
// 提交一个tesk 队列运行
def submitTasks(taskSet: TaskSet): Unit
// 杀死stage中的所有task,并且使该stage以及所有依赖该stage的stage失败
// 如果后台调度器不支持杀掉task则抛出异常UnsupportedOperationException
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
/**
* Kills a task attempt.
* Throw UnsupportedOperationException if the backend doesn't support kill a task.
* @return Whether the task was successfully killed.
*/
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
// Kill all the running task attempts in a stage.
// Throw UnsupportedOperationException if the backend doesn't support kill tasks.
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit
// 将DAG调度器设置为upcalls , 保证在submitTasks 之前调用
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
// 获取集群中使用的默认的并行度级别,应用到job
def defaultParallelism(): Int
/**
* 更新正在执行任务状态,让master知道BlockManager is alive ,
* 如果驱动程序知道该BlockManager 返回true, 否则返回false, 表示该BlockManager应该重新注册
* @param execId
* @param accumUpdates
* @param blockManagerId
* @param executorUpdates
* @return
*/
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics): Boolean
/**
* 获取与job相关联的应用id
* @return An application ID
*/
def applicationId(): String = appId
/**
* 进程丢失执行器
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
/**
* 进程 removed worker
*/
def workerRemoved(workerId: String, host: String, message: String): Unit
/**
* 获取与job相关的应用的attempt id
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): Option[String]
}
3.2 TaskSchedulerImpl源码
任务调度器的具体实现
通过 SchedulerBackend 执行多种类型群集的计划任务。也可通过将isLocal设置为true 来使用LocalSchedulerBackend 来使用本地设置。
它处理常见逻辑,例如确定跨作业的调度顺序,唤醒以启动推测任务等。客户端应首先调用initialize()和start()方法,然后通过submitTasks()方法提交任务集。
TaskSchduler 一直维护这 Taskid 和 executorId
3.2.1 submitTasks
/**
* 接收DAGScheduler发送过来的taskSet
* @param taskSet
*/
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 为task创建一个taskSetManager,添加到队列里。TaskSetManager 跟踪每个task的执行状况,维护了task的许多具体信息
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//获取taskSet所属的stageid
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
//将taskSetManager加入队列 有两个实现=》先入先出、公平调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
3.2.2 cancelTasks
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
// 取消一个stage的tasks
killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled")
// Cancel all attempts for the stage.
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
}
}
}
override def killAllTaskAttempts(
stageId: Int,
interruptThread: Boolean,
reason: String): Unit = synchronized {
logInfo(s"Killing all running tasks in stage $stageId: $reason")
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
// There are two possible cases here:
// 这里有两种可能
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task.
// 2. The task set manager has been created but no tasks have been scheduled. In this case,
// simply continue.
tsm.runningTasksSet.foreach { tid =>
taskIdToExecutorId.get(tid).foreach { execId =>
//调用SchedulerBackend 的killTask方法
backend.killTask(tid, execId, interruptThread, reason)
}
}
}
}
}
3.2.3 resourceOffer
// 调用者是SchedulerBackend, 用途是底层资源SchedulerBackend会把空余的workers资源给TaskScheduler,
// 让其根据策略为排队的任务分配合理的cpu 和内存资源, 然后把任务描述列表回传给SchedulerBackend
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 标记每个slave是可用的并且记录它的hostname
// 如果添加了新的executor也会一起跟踪
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
//从 worker offers里收集executor和host对应信息,还有active、executors等。
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
//收集机架信息
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
//在提供资源之前,删除黑名单中的节点。
//这样做事为了避免单独线程增加同步开销,而且因为更新黑名单仅在进行任务提交时提供相关信息
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
//过滤掉黑名单中的节点
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
// worker offers资源列表进行shuffle, 任务列表里的任务列表依据调度策略进行依次排序
val shuffledOffers = shuffleOffers(filteredOffers)
// 构建一个分配给每个worker的task list.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 为每个taskSeethe提供可用的cpu核数,看是否满足
// CPUS_PER_TASK 默认一个task需要一个cpu, 设置参数为spark.task.cpus=1
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// 按照我们的调度顺序获取每个TaskSet,然后按地点级别的递增顺序为每个节点提供它,以便它有机会在所有节点上启动本地任务。
// 注意:首选的位置顺序:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY
for (taskSet <- sortedTaskSets) {
// 跳过barrier taskSet 如果available slots 的数量小于pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// 跳过启动过程.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
// slots), fail the job on submit.
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is $availableSlots.")
} else {
//启动任务标识
var launchedAnyTask = false
// 记录了所有executor id上分配的barrier tasks
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) { //任务是不可调度的
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// 如果taskSet是不可调度的,我们试着寻找一个现在空闲黑名单executor。
// 如果找不到就会立即终止。
// 否则我们会终止空闲executor并启动一个abortTimer,如果它没有在超时内安排任务,那么我们无法从taskSet安排任何任务,则会中止taskSet。
// 注意1:我们基于每个任务集而不是基于每个任务来跟踪可调度性。
// 注意2:当有多个空闲的列入黑名单的executor启用了动态分配时,仍可以中止taskSet。
// 当ExecutorAllocationManager没有及时替换被杀死的空闲executor时,会发生这种情况,
// 因为它依赖于挂起的任务,并且不会在空闲超时时终止执行程序,从而导致中止计时器到期并中止taskset。
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
//杀掉黑名单executor
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))
val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}
case None => // 立即终止
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {
// 我们希望推迟杀死任何taskSets,只要我们有一个非黑名单的executor,可以用来从任何活动的taskSet调度任务。
// 这可确保作业可以取得进展。
// 注意:从理论上讲,taskSet永远不会在非黑名单的执行程序上进行调度,并且由于不断提交新的TaskSet,中止计时器不会启动。 有关更多详细信息。
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
"recently scheduled.")
//将清理所有到期不可调度任务作为最近的一个task
unschedulableTaskSetToExpiryTime.clear()
}
}
if (launchedAnyTask && taskSet.isBarrier) {
// 检查barrier stage 是否部分启动。
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
require(addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number of " +
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
"been blacklisted or cannot fulfill task locality requirements.")
// 实现障碍协调员.
maybeInitBarrierCoordinator()
// 将taskInfos 更新到为所有barrier task 属性中。
val addressesStr = addressesWithDescs
// 根据partitionId排序的地址
.sortBy(_._2.partitionId)
.map(_._1)
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))
logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
}
}
}
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// 在配置的时间内启动.
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
3.2.4 statusUpdate
/**
* 调用者是SchedulerBackend,用途是SchedulerBackend会将task执行的状态回报给taskScheduler做一些决定
* task finish 包括四种状态:finished、killed、failed、lost。只有finished是成功执行完成来,其它三种都是失败的
*
* @param tid
* @param state
* @param serializedData
*/
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
Option(taskIdToTaskSetManager.get(tid)) match {
case Some(taskSet) =>
// 若taskLost:找到该task对应的executor,从active executor里移出,避免这个executor被分配到其他task继续失败下去。
if (state == TaskState.LOST) {
// TaskState.LOST 仅由不推荐使用的Mesos细粒度调度模式使用,
// 其中每个执行程序对应于单个任务,因此将执行程序标记为失败。
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
//从active executor里移出,避免这个executor被分配到其他task继续失败下去
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
//task完成执行后
if (state == TaskState.FINISHED) {
//执行成功时调用
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
//执行失败时调用
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
//TaskResultGetter内部维护里一个线程池,负责一步fetch task执行结果并反序列话,
// 默认四个线程做这件事,可配参数(spark.resultGetter.threads=4)
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// 更新DAGScheduler而不对其进行锁定,因为这可能会死锁
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
3.3 TaskResultGetter 获取 task result 的逻辑
对于success task,如果taskResult里的数据是直接结果数据直接把data反序列化出来得到结果。
directResult.value(taskResultSerializer.get())
// 如果不是,会调用
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
// 从远程获取,如果远程取回数据是空的,那么会调用
scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
// 告诉它这个任务执行完成了,但是数据丢失了,否则渠道数据之后会通知BlockMaster移除这个block信息,调用
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
// 告诉它这个任务是执行成功的,并且把result data 传回去。
// 对于failed task,从data里解析出fail的理由,调用
// 对于failed task , 从data里解析出fail 的理由
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
// 告诉它失败的理由。
4. SchedulerBackend
后台调度器 ( trait )
在taskScheduler 下层, 用于对接不同的资源管理系统, SchedulerBackend 是一个Trait
粗粒度:进程常住模式,典型代表是standalone模式、mesos粗粒度模式、yarn
细粒度:mesos细粒度模式
这里讨论粗细粒度模式,更好理解 CoarseGrainedSchedulerBackend
维护executor相关信息(包括executor的地址、通讯地址、host、总核数,剩余核数),目前executor被注册使用了多少、剩余多少、总共还有多少核实空的等待。
4.1 SchedulerBackend 源码
package org.apache.spark.scheduler
// 用于调度系统的后端接口,允许 TaskSchedulerImpl 对接不同的系统。
// 我们假设一个类似Mesos的模型,其中应用程序在机器可用时获得资源,并可以在它们上启动任务。
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis
def start(): Unit
def stop(): Unit
/**
* SchedulerBackend 将自己目前可用资源交给TaskScheduler,
* TaskScheduler 根据调度策略分配给排队的任务, 返回一批可执行的任务描述(TaskSet),
* SchedulerBackend 负责launchTask, 即最终把task 塞到executor模型上, executor里的线程池会执行task的run()方法
*/
def reviveOffers(): Unit
def defaultParallelism(): Int
/**
* 向executor发送请求杀掉一个正在执行的task
*
* @param taskId Id of the task.
* @param executorId Id of the executor the task is running on.
* @param interruptThread 执行程序是否应该中断任务线程。
* @param reason 任务终止的原因。
*/
def killTask(
taskId: Long,
executorId: String,
interruptThread: Boolean,
reason: String): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true
/**
* 获取与job相关的application id
* @return An application ID
*/
def applicationId(): String = appId
/**
* 如果集群管理器支持多次尝试,请获取此运行的attempt ID。 在客户端模式下运行的应用程序将没有attempt ID。
* @return The application attempt id, if available.
*/
def applicationAttemptId(): Option[String] = None
/**
* 获取驱动程序日志的URL。
* 这些URL用于显示Driver 的UI Executors选项卡中的链接。
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None
/**
* 获取driver的属性。 当指定自定义日志URL模式时,这些属性用于替换日志URL。
* @return Map containing attributes on driver.
*/
def getDriverAttributes: Option[Map[String, String]] = None
/**
* 获取当前可以同时启动的最大任务数。
* 请注意,请不要缓存此方法返回的值,因为该数字可能会因添加/删除执行程序而更改。
*
* @return The max number of tasks that can be concurrent launched currently.
*/
def maxNumConcurrentTasks(): Int
}
4.1.1 RegisterExecutor
/**
* 用来注册executor,通常worker拉起、重启会触发executor的注册,
* CoarseGrainedSchedulerBackend会把这些executor维护起来,更新内部的资源,比如综合书增加,最后调用一次makeOffer()
* 即把目前空闲资源丢给taskScheduler去分配一次
* 返回taskSet,把任务启动起来,这个makeOffer() 的调用会出现在任何资源变化的事件中,下面会看到
*/
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist.contains(hostname)) {
/**
* 如果cluster manager 在黑名单节点上给我们一个executor
* (因为它在我们通知我们的黑名单之前已经开始分配这些资源,或者它忽略了我们的黑名单),
* 那么我们立即拒绝该执行者。
*/
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
//如果executor的rpc env 没有监听到传入连接,则 hostPort 将设置为null,并且应该使用client 连接executor
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls, attributes)
// 这必须同步,因为在请求executor时会读取此块中变异的变量
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor)
// 注意:一些测试期望在我们将执行程序放入映射之后得到答复
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
4.1.2 StatusUpdate
//task 的状态回调
case StatusUpdate(executorId, taskId, state, data) =>
//首先调用scheduler.statusUpdate(taskId, state, data.value) 上报上去
scheduler.statusUpdate(taskId, state, data.value)
//然后判断这个task是否执行结束了
if (TaskState.isFinished(state)) {
//如果执行结束了
executorDataMap.get(executorId) match {
//匹配ExecutorData
case Some(executorInfo) =>
//把executor上的freeCore 加上去, 调用一次makeOffer()
//这个事件就是别人直接向SchedulerBackend请求资源, 直接调用makeOffer()
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
4.1.3 killTask
//killTask
case KillTask(taskId, executorId, interruptThread, reason) =>
//找出executorid对应的executor 然后杀掉task
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
KillTask(taskId, executorId, interruptThread, reason))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
4.1.4 stopExecutors
//通知每个executor 处理stopExecutor事件
case StopExecutors =>
logInfo("Asking each executor to shut down")
for ((_, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(StopExecutor)
}
context.reply(true)
4.1.5 RemoveExecutor
case RemoveExecutor(executorId, reason) =>
/**
* 我们将删除executor的状态,无法恢复它。
* 但是,Driver和executor之间的连接可能仍然存在,
* 因此执行程序不会自动退出,因此请尝试告诉执行程序自行停止。
* 见SPARK-13519。
*/
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}
// 从集群中移除一个断开连接的slave
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
//通过executorId匹配executor
case Some(executorInfo) =>
//这必须同步,因为在请求executor时会读取此块中变异的变量
val killed = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
// TaskScheduler.executorLost() 方法, 通知上层我这边有一批资源不能用了,你处理下吧,
// TaskScheduler会继续把 executorLost事件上报给DAGScheduler,
// 原因是DAGScheduler关心shuffle任务的outputlocation,
// DAGScheduler会告诉BlockManager这个executor不能用了,然后移走它,
// 然后把所有的stage的shuffleOutput信息都遍历一遍, 移走这个executor,
// 并且更新后的shuffleoutput信息注册到MapOutputTracker 上, 最后清理下本地的CacheLocationsMap
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None =>
// SPARK-15262: If an executor is still alive even after the scheduler has removed
// its metadata, we may receive a heartbeat from that executor and tell its block
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}
4.1.6 reviveOffers
//直接调用了makeOffers()方法,等到一批和执行的任务描述(taskSet) , 调用launchTasks
case ReviveOffers =>
makeOffers()
4.1.7 makeOffers
// 为所有executor提供虚拟资源
private def makeOffers() {
// 确保在某个任务启动时没有executor被杀死
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// 过滤出活跃的executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
4.1.8 launchTask
// 启动一组提供资源的 task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//将task转换为二进制
val serializedTask = TaskDescription.encode(task)
//判断二进制数据大小,如果二进制数据超过最大范围则报错
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//将task 发送给executor执行
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
4.2 后台调度器的子类实现
下面有三个子实现类
- CoarseGrainedSchedulerBackend
- LocalSchedulerBackend
- StandaloneSchedulerBackend
4.2.1 CoarseGrainedSchedulerBackend
/**
* 调度程序后端代码,等待粗粒度执行程序连接。
* 在Spark执行期间保留每个executor,不会在程序完成时关闭executor,在新的任务进来时不会创建新的executor。
* executor可以以多种方式运行,比如粗粒度mesos模式下或者standalone
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging
4.2.2 LocalSchedulerBackend
/**
* 本地调度器,在本地版的spark时使用,
* 其中 executor, backend,master 都运行在相同的jvm中, 它位于 TaskSchedulerImpl的下一层,
* 在本地运行单个executor (由 LocalSchedulerBackend创建) 上处理启动等任务
* 对[[LocalSchedulerBackend]]的调用都是通过LocalEndpoint序列化的。 使用RpcEndpoint使[[LocalSchedulerBackend]]上的调用异步,这是必要的
* 防止[[LocalSchedulerBackend]]和[[TaskSchedulerImpl]]之间的死锁。
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalSchedulerBackend,
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging
4.2.3 StandaloneSchedulerBackend
// A [[SchedulerBackend]] 针对spark standalone 集群模式的实现
private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging