overridedefcancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) // 取消一个stage的tasks killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled") // Cancel all attempts for the stage. taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } }
overridedefkillAllTaskAttempts( stageId: Int, interruptThread: Boolean, reason: String): Unit = synchronized { logInfo(s"Killing all running tasks in stage $stageId: $reason") taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => // There are two possible cases here: // 这里有两种可能 // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task. // 2. The task set manager has been created but no tasks have been scheduled. In this case, // simply continue. tsm.runningTasksSet.foreach { tid => taskIdToExecutorId.get(tid).foreach { execId => //调用SchedulerBackend 的killTask方法 backend.killTask(tid, execId, interruptThread, reason) } } } } }
// 调用者是SchedulerBackend, 用途是底层资源SchedulerBackend会把空余的workers资源给TaskScheduler, // 让其根据策略为排队的任务分配合理的cpu 和内存资源, 然后把任务描述列表回传给SchedulerBackend defresourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // 标记每个slave是可用的并且记录它的hostname // 如果添加了新的executor也会一起跟踪 var newExecAvail = false for (o <- offers) { if (!hostToExecutors.contains(o.host)) { hostToExecutors(o.host) = newHashSet[String]() } //从 worker offers里收集executor和host对应信息,还有active、executors等。 if (!executorIdToRunningTaskIds.contains(o.executorId)) { hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true } //收集机架信息 for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, newHashSet[String]()) += o.host } } // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do // this here to avoid a separate thread and added synchronization overhead, and also because // updating the blacklist is only relevant when task offers are being made. //在提供资源之前,删除黑名单中的节点。 //这样做事为了避免单独线程增加同步开销,而且因为更新黑名单仅在进行任务提交时提供相关信息 blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) //过滤掉黑名单中的节点 val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => offers.filter { offer => !blacklistTracker.isNodeBlacklisted(offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers) // worker offers资源列表进行shuffle, 任务列表里的任务列表依据调度策略进行依次排序 val shuffledOffers = shuffleOffers(filteredOffers) // 构建一个分配给每个worker的task list. val tasks = shuffledOffers.map(o => newArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray // 为每个taskSeethe提供可用的cpu核数,看是否满足 // CPUS_PER_TASK 默认一个task需要一个cpu, 设置参数为spark.task.cpus=1 val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // 按照我们的调度顺序获取每个TaskSet,然后按地点级别的递增顺序为每个节点提供它,以便它有机会在所有节点上启动本地任务。 // 注意:首选的位置顺序:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY for (taskSet <- sortedTaskSets) { // 跳过barrier taskSet 如果available slots 的数量小于pending tasks. if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // 跳过启动过程. // TODO SPARK-24819 If the job requires more slots than available (both busy and free // slots), fail the job on submit. logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + s"number of available slots is $availableSlots.") } else { //启动任务标识 var launchedAnyTask = false // 记录了所有executor id上分配的barrier tasks val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { //任务是不可调度的 taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // 如果taskSet是不可调度的,我们试着寻找一个现在空闲黑名单executor。 // 如果找不到就会立即终止。 // 否则我们会终止空闲executor并启动一个abortTimer,如果它没有在超时内安排任务,那么我们无法从taskSet安排任何任务,则会中止taskSet。 // 注意1:我们基于每个任务集而不是基于每个任务来跟踪可调度性。 // 注意2:当有多个空闲的列入黑名单的executor启用了动态分配时,仍可以中止taskSet。 // 当ExecutorAllocationManager没有及时替换被杀死的空闲executor时,会发生这种情况, // 因为它依赖于挂起的任务,并且不会在空闲超时时终止执行程序,从而导致中止计时器到期并中止taskset。
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { caseSome ((executorId, _)) => if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { //杀掉黑名单executor blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 1000 unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout logInfo(s"Waiting for $timeout ms for completely " + s"blacklisted task to be schedulable again before aborting $taskSet.") abortTimer.schedule( createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) } caseNone => // 立即终止 logInfo("Cannot schedule any task because of complete blacklisting. No idle" + s" executors can be found to kill. Aborting $taskSet." ) taskSet.abortSinceCompletelyBlacklisted(taskIndex) } } } else { // 我们希望推迟杀死任何taskSets,只要我们有一个非黑名单的executor,可以用来从任何活动的taskSet调度任务。 // 这可确保作业可以取得进展。 // 注意:从理论上讲,taskSet永远不会在非黑名单的执行程序上进行调度,并且由于不断提交新的TaskSet,中止计时器不会启动。 有关更多详细信息。
if (unschedulableTaskSetToExpiryTime.nonEmpty) { logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + "recently scheduled.") //将清理所有到期不可调度任务作为最近的一个task unschedulableTaskSetToExpiryTime.clear() } }
if (launchedAnyTask && taskSet.isBarrier) { // 检查barrier stage 是否部分启动。 // TODO SPARK-24818 handle the assert failure case (that can happen when some locality // requirements are not fulfilled, and we should revert the launched tasks). require(addressesWithDescs.size == taskSet.numTasks, s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because only ${addressesWithDescs.size} out of a total number of " + s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + "been blacklisted or cannot fulfill task locality requirements.") // 实现障碍协调员. maybeInitBarrierCoordinator() // 将taskInfos 更新到为所有barrier task 属性中。 val addressesStr = addressesWithDescs // 根据partitionId排序的地址 .sortBy(_._2.partitionId) .map(_._1) .mkString(",") addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " + s"stage ${taskSet.stageId}.") } } } // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get // 在配置的时间内启动. if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
// 从集群中移除一个断开连接的slave privatedefremoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { //通过executorId匹配executor caseSome(executorInfo) => //这必须同步,因为在请求executor时会读取此块中变异的变量 val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) // TaskScheduler.executorLost() 方法, 通知上层我这边有一批资源不能用了,你处理下吧, // TaskScheduler会继续把 executorLost事件上报给DAGScheduler, // 原因是DAGScheduler关心shuffle任务的outputlocation, // DAGScheduler会告诉BlockManager这个executor不能用了,然后移走它, // 然后把所有的stage的shuffleOutput信息都遍历一遍, 移走这个executor, // 并且更新后的shuffleoutput信息注册到MapOutputTracker 上, 最后清理下本地的CacheLocationsMap scheduler.executorLost(executorId, if (killed) ExecutorKilledelse reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) caseNone => // SPARK-15262: If an executor is still alive even after the scheduler has removed // its metadata, we may receive a heartbeat from that executor and tell its block // manager to reregister itself. If that happens, the block manager master will know // about the executor, but the scheduler will not. Therefore, we should remove the // executor from the block manager when we hit this case. scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } }