动态分配executor的实例初始化部分
如果spark.executor.instances配置项设置为0或者没有设置,这个默认情况下是一个未设置的值,yarn的运行模式时,这个配置通过--num-executors来得到.同时spark.dynamicAllocation.enabled配置项设置为true时.默认值为false,表示启用了动态分配executor.
在driver端SparkContext生成时,会检查上面两个配置项,如果这两个配置满足动态executor分配的要求时,会生成一个ExecutorAllocationManager实例.
_executorAllocationManager =
if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None } _executorAllocationManager.foreach(_.start())
必要的配置项:
1,配置项spark.dynamicAllocation.minExecutors,默认值0,最少分配的executor的个数.
2,配置项spark.dynamicAllocation.maxExecutors,默认值int.maxvalue.最大可分配的executor的个数.
3,配置项spark.dynamicAllocation.initialExecutors,默认值为配置项1的值,初始化时启用的executor的个数,
4,1,配置项spark.dynamicAllocation.schedulerBacklogTimeout,默认值1s,如果未分配的task等待分配的时间超过了这个配置的时间,表示需要新启动executor.
4,2,配置项spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,默认是4,1,配置项的值,这个配置用于设置在初始调度的executor调度延时后,每次的等待超时时间.
5,配置项spark.dynamicAllocation.executorIdleTimeout,默认值60s,executor的空闲回收时间.
6,配置项spark.executor.cores的配置(executor-cores)必须大于或等于配置项spark.task.cpus的值(这个配置默认是1,这是每个task需要的cpu的个数).
7,配置项spark.shuffle.service.enabled必须配置为true,默认为false.如果这个配置设置为true时,BlockManager实例生成时,需要读取spark.shuffle.service.port配置项配置的shuffle的端口,同时对应BlockManager的shuffleClient不在是默认的BlockTransferService实例,而是ExternalShuffleClient实例.
8,初始化时,ExecutorAllocationManager中的属性initializing默认值为true,表示定时调度时,什么都不做.
在执行ExecutorAllocationManager中的start函数时:
def start(): Unit = {
这里把ExecutorAllocationListener实例(内部实现类)添加到sparkContext中的listenerBus中,用于监听stage,task的启动与完成,并做对应的操作.
listenerBus.addListener(listener)val scheduleTask = new Runnable() {
override def run(): Unit = { try { schedule() } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread${Thread.currentThread().getName}", t)
} } }定时100ms执行一次schedule的调度函数,来进行task的分析.
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis,TimeUnit.MILLISECONDS)
}
对executor的个数分配的计算
针对task的调度主要由一个定时器每100ms进行一次schedule函数的调用.private def schedule(): Unit = synchronized {
在这个函数中,首先得到当前的时间,
val now = clock.getTimeMillis在调用这个函数时,初始情况下,initializing的属性值为true,这个时候,这个函数什么也不做.
这个函数的内容,后面在进行分析.
updateAndSyncNumExecutorsTarget(now)这个removeTimes集合中记录有每一个executor没有被task占用后的时间,如果这个时间超过了上面配置的idle的时间,会移出掉这个executor,同时设置initializing属性为false,表示可以继续进行task的调度.retain函数只保留未超时的executor.
removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false removeExecutor(executorId) } !expired } }如何知道stage被提交?看下面,
在SparkContext中,执行runJob命令时,针对一个stage进行submit操作时,会调用listenerBus中所有的listener对应的onStageSubmitted函数.
而在ExecutorAllocationManager进行start操作时,生成了一个listener,实例为ExecutorAllocationListener,并把这个listener添加到了listenerBus中.
接下来看看ExecutorAllocationListener中对应stage提交的监听处理:
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted)
: Unit = {
这里首先把initializing的属性值设置为false,表示下次定时调度时,需要执行executor的分配操作.
initializing = false
得到进行submit操作对应的stage的id与stage中对应的task的个数.
val stageId = stageSubmitted.stageInfo.stageId val numTasks = stageSubmitted.stageInfo.numTasks allocationManager.synchronized {通过对应的stageId设置这个stage的task的个数,存储到stageIdToNumTasks集合中.
stageIdToNumTasks(stageId) = numTasks这里更新allocationManager中的addTime的时间,
由当前时间加上配置spark.dynamicAllocation.schedulerBacklogTimeout的超时时间.
allocationManager.onSchedulerBacklogged()这里根据每个task对应的host,计算出每个host对应的task的个数,numTasksPending的个数原则上应该与stage中numTask的个数相同.
// Compute the number of tasks requested by the stage on each host var numTasksPending = 0 val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]() stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality => if (!locality.isEmpty) { numTasksPending += 1 locality.foreach { location => val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1 hostToLocalTaskCountPerStage(location.host) = count } } }
在对应的集合中,根据stageid与pending的task的个数,对应的host与host对应的task的个数进行存储.
stageIdToExecutorPlacementHints.put(stageId, (numTasksPending, hostToLocalTaskCountPerStage.toMap))下面的函数迭代stageIdToExecutorPlacementHints集合中的values,并更新allocationManager中localityAwareTasks属性(存储待启动的task的个数)与hostToLocalTaskCount集合属性(存储host对应的task的个数)的值.添加到这里,主要是executor启动时对应的调度启动task
// Update the executor placement hints updateExecutorPlacementHints() } }
接面看看allocationManager中定时调度的updateAndSyncNumExecutorsTarget函数:
现在来说说updateAndSyncNumExecutorsTarget函数与addExecutors函数的作用:
示例说明:
假定这次的stage需要的executor的个数为5,numExecutorsTarget的配置保持默认值0,
如果是第一次调度启动时,在updateAndSyncNumExecutorsTarget函数中:
1,先计算出这个stage需要的executor的个数,
val maxNeeded = maxNumExecutorsNeeded
if (initializing) {
如果函数进行这里,表示还没有stage提交,也就是没有job被执行.不进行调度.
// Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 }
2,进入的流程为else if (addTime != NOT_SET && now >= addTime)部分.这个时候执行addExecutors函数,(这里假定时间已经达到了addTime的超时时间)
这种情况下默认的初始executor的个数为0的情况下,在当前时间超过了等待超时时间后,会进入,第一次时需要等待一秒钟,每次执行会更新等待时间.
这里根据要stage对应的task需要的executor的个数,并执行addExecutors的函数.
else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") addTime += sustainedSchedulerBacklogTimeoutS * 1000 delta }
在addExecutors函数中,先计算出目标的executor的个数(属性numExecutorsTarget),
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " + s"is already $numExecutorsTarget (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 }val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so // make sure our target is at least as much as our current allocation: numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size) // Boost our target with the number to add for this round: numExecutorsTarget += numExecutorsToAdd // Ensure that our target doesn't exceed what we need at the present moment: numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) // Ensure that our target fits within configured bounds: numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors),minNumExecutors)
此时executorIds的长度为0,集合是个空集合,这个时候numExecutorsToAdd的值为默认的1,根据上面的代码计算完成后(maxNumExecutorsNeeded为5就是tasks需要的executor的个数),numExecutorsTarget的值为1.接下来计算出来一个值,如果这次任务的目标executor的个数高于上次tasks启动的目标executor的个数,delta的值是一个大于0的值.根据上面的说明,下面代码中这个delta的值为1,
val delta = numExecutorsTarget - oldNumExecutorsTarget// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth if (delta == 0) {如果delta等于0,表示这次的目标executor的个数,与上次任务的executor的个数相同,重置增量的个数为1.
numExecutorsToAdd = 1 return 0 } 接下来,通过下面的代码通过SparkContext发起numExecutorsTarget的executor的启动,并在executor中加载对应的task的个数.val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,hostToLocalTaskCount)
接下来,由于我们的任务执行还需要的executor的个数还需要4个(共需要),同时这个时候,delta的值为1,与numExecutorsToAdd的属性值相同,因此numExecutorsToAdd的值会*2.
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2 } else { 1 }
3,调度定时器开始执行第二次调度启动,这个时候执行updateAndSyncNumExecutorsTarget函数时,numExecutorsTarget的值为1,需要的executor的个数为3,因此,还是会执行时间超时的流程.
else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") addTime += sustainedSchedulerBacklogTimeoutS * 1000 delta }
再次进入addExecutors函数,这个时候numExecutorsToAdd属性值为2,numExecutorsTarget属性值为1,executorsIds的size为1,已经有一个executor被启动,需要的executor的个数为3,最后计算完成后,numExecutorsTarget属性的值为3.计算出来当前的numExecutorsTarget与上一次的numExecutorsTarget的delta的值为2,开始根据这个值为3的numExecutorsTarget发起task的启动请求.
接下来,由于计算出来的delta的值为2,而属性numExecutorsToAdd的值也为2,
因此numExecutorsToAdd属性值现在还是需要*2操作.执行完成后,最后这个numExecutorsToAdd属性值修改成了4.
4,这个时候,由于还有部分task没有被执行,开始第三次的处理,此时,numExecutorTarget的值还是小于目标的executor的个数,接着执行addExecutors函数,此时,executorsIds的size为4,第一次执行一个,第二次启动了3个,这个时候,numExecutorsTarget的属性值变化情况:
首先先修改成4,(取numExecutorsTarget与executorIds.size中的最大值),
然后numExecutorsTarget += numExecutorsToAdd的值,这个时候值修改成了8.
最后与共需要的executor的个数5取最小值,把值修改成5.计算出当前的numExecutorsTarget与上一次的numExecutorsTarget的差值为2,numExecutorsToAdd的值为4,因此重新修改numExecutorsToAdd的值为1.
这个时候调度程序会修改addTime的值为NOT_SET,表示不在执行executor的调度.因为executor已经够了.
5,现在假定spark.dynamicAllocation.initialExecutors配置项配置有一个值,初始值为6.需要的executor的个数还是是5.这个时候,进入updateAndSyncNumExecutorsTarget函数时,执行如下的流程部分,因为初始的executor的个数大于了需要的executor的个数.这部分流程在设置有初始大小的executor个数或者说要执行的job的第二个stage的task的个数需要的executor的个数小于小次stage需要的executor的个数时,会被执行.
else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requests val oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors) numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget(previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actuallyneeded")
} numExecutorsTarget - oldNumExecutorsTarget }
在上面的代码中,重新根据需要的executor的个数,计算出numExecutorsTarget的值,这个时候,新的numExecutorsTarget的值为5,而老的numExecutorsTarget的值为6,因此通过新的numExecutorsTarget直接调用SparkContext中对应的启动executor的函数,发起对executor的调度与task的启动.
通过SparkContext调度executor
在allocationManager中,对executor进行动态的调用后,会执行如下的代码片断.client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
在上面的代码中,client就是SparkContext实例.
下面看看这个函数的处理流程:
函数的传入参数中:
numExecutors是目标的executor的个数,
第二个是共需要的task的个数,
第三个是host->taskCount的集合.
private[spark] override def requestTotalExecutors(
numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend =>这里直接调用了GoarseGrainedSchedulerBackend中对应的函数.
b.requestTotalExecutors(numExecutors, localityAwareTasks,hostToLocalTaskCount)
case _ => logWarning("Requesting executors is only supported in coarse-grained mode") false } }
下面看看GoarseGrainedSchedulerBackend中requestTotalExecutors的函数实现:
final override def requestTotalExecutors(
numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] ): Boolean = synchronized { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") }this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount每次执行时,计算出还需要的共需要的executor的个数与正在执行或者等待回收的executor的个数之间的差值,这个差值是还需要启动的executor的个数.
numPendingExecutors = math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size,0)
这里根据具体的cluster的部署模式(yarn,standalone,mesos,等),调用对应的函数进行executor的启动操作.这里我们看看standalone的操作.由SparkDeploySchedulerBackend实现.
这个函数的实现主要是通过向master发送一个RequestExecutors消息,这个消息是一个需要响应的消息.
这个消息在Master中通过receiveAndReply函数中的RequestExecutors部分进行处理.
doRequestTotalExecutors(numExecutors) }
Master中处理executor的申请:
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
看看这个的handleRequestExecutors函数
private def handleRequestExecutors(appId: String, requestedTotal: Int)
: Boolean = {
idToApp.get(appId) match {这个函数中,根据传入的app对应的job共依赖的executor的个数,更新appInfo中executorLimit的值.并执行对executor的启动的调度.
case Some(appInfo) => logInfo(s"Application $appId requested to set total executors to$requestedTotal.")
appInfo.executorLimit = requestedTotal在这个调度的过程中通过startExecutorsOnWorkers函数来调度与启动executor在对应的worker中,
在判断要启动的executor的个数时,会根据scheduleExecutorsOnWorkers函数来判断executor的个数是否达到要求的appInfo.executorLimit的个数,如果达到指定的executor的个数时,调度不再执行executor的启动.判断worker是否有足够的资源启动executor时,通过对executor需要的cpu core的个数与executor需要的内存来判断worker是否有足够的对应资源启动executor,如果有,表示这个worker可以用来启动executor,迭代所有的worker进行executor的启动,当已经启动的executor的个数达到了appInfo的executorLimit的限制时,不在进行分配.
schedule() true case None => logWarning(s"Unknown application $appId requested$requestedTotal total executors.")
false } }
Driver端处理worker对executor的启动后的监听:
Worker端的executor启动后,会向driver发起一个RegisterExecutor消息.
这个消息在Driver端通过CoarseGrainedSchedulerBackend实例中的receiveAndReply函数中的RegisterExecutor部分来进行处理.
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
if (executorDataMap.contains(executorId)) { context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress } logInfo(s"Registered executor $executorRef ($executorAddress) with ID$executorId")
这里把executor的信息存储到driver中对应的集合容器中.
并更新待启动的executor的个数(numPendingExecutors属性).
addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address,executorAddress.host,
cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors($numPendingExecutors left)")
} } // Note: some tests expect the reply to come after we put the executor in the map context.reply(RegisteredExecutor(executorAddress.host))这里通过向allocationManager中的ExecutorAllocationListener中executorAdded的监听函数.
listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
这里在启动后的executors中对task进行调度与启动.
makeOffers() }
在ExecutorAllocationManager中,对executor注册,task启动与结束的监听处理:
对executor的注册处理:
private def onExecutorAdded(executorId: String): Unit = synchronized {
if (!executorIds.contains(executorId)) {把新启动的executor对应的id存储到 executorIds的集合中.
executorIds.add(executorId)这里检查下所有的executor是否包含有task在运行,如果不包含的executor,添加到remoteTimes的集合中,这个remoteTimes中对应此executor有一个超时的清理时间,
如果executor不包含blockCache,
超时清理时间通过spark.dynamicAllocation.executorIdleTimeout配置.默认为60s
如果executor中包含有blockCache时,
超时清理时间通过spark.dynamicAllocation.cachedExecutorIdleTimeout配置.默认不控制超时.
// If an executor (call this executor X) is not removed because the lower bound // has been reached, it will no longer be marked as idle. When new executors join, // however, we are no longer at the lower bound,and so we must mark executor X
// as idle again so as not to forget that it is a candidate for removal.(see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) logInfo(s"New executor $executorId has registered (new total is${executorIds.size})")
} else { logWarning(s"Duplicate executor $executorId has registered") } }
对task启动时的监听:
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId val taskId = taskStart.taskInfo.taskId val taskIndex = taskStart.taskInfo.index val executorId = taskStart.taskInfo.executorIdallocationManager.synchronized {
首先更新正在运行的task的个数,
numRunningTasks += 1 // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is // possible because these events are posted in different threads. (see SPARK-4951) if (!allocationManager.executorIds.contains(executorId)) { allocationManager.onExecutorAdded(executorId) }在stage与task的对应关系的集合中,设置stageId中包含的taskId的值.
// If this is the last pending task, mark the scheduler queue as empty stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=taskIndex
这里检查的stageid中对应的所有的正在运行的task的个数是否达到了stage需要运行的task的个数,
如果达到了需要运行的task的个数,设置allocationManager中addTime属性的值为NOT_SET.同时重置属性numExecutorsToAdd的值为1.
if (totalPendingTasks() == 0) { allocationManager.onSchedulerQueueEmpty() }把task运行的executor的对应关系存储到对应的集合中,这个存储的集合用于判断executor是否是闲置的状态.
// Mark the executor on which this task is scheduled as busy executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) +=taskId
由于executorId已经有task在运行,把这个executor从要移出的executor集合中移出
从removeTimes集合中移出,这个集合中存储了executor的闲置回收超时时间.
从executorsPendingToRemove集合中移出,这个集合中存储了准备执行移出操作的executor的集合.
allocationManager.onExecutorBusy(executorId) } }
对task停止时的监听:
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId val taskId = taskEnd.taskInfo.taskId val taskIndex = taskEnd.taskInfo.index val stageId = taskEnd.stageId allocationManager.synchronized {把对应的运行的TASK的个数减去1,表示有一个task完成运行.
numRunningTasks -= 1
从对应的executor的关系集合中移出此task,如果这个executor已经没有了正在运行的task时,直接把这个executor执行idle的处理,也就是移动到removeTimes集合中.
// If the executor is no longer running any scheduled tasks, mark it as idle if (executorIdToTaskIds.contains(executorId)) { executorIdToTaskIds(executorId) -= taskId if (executorIdToTaskIds(executorId).isEmpty) { executorIdToTaskIds -= executorId allocationManager.onExecutorIdle(executorId) } }// If the task failed, we expect it to be resubmitted later. To ensure we have
// enough resources to run the resubmitted task, we need to mark the scheduler // as backlogged again if it's not already marked as such (SPARK-8366) if (taskEnd.reason != Success) { if (totalPendingTasks() == 0) { allocationManager.onSchedulerBacklogged() } stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) } } } }
对stage停止时的监听:
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted)
: Unit = {
val stageId = stageCompleted.stageInfo.stageId allocationManager.synchronized {从对应的集合中移出此stage.
stageIdToNumTasks -= stageId stageIdToTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId更新allocationManager中对应各个job的stage中task与location的对应关系.
// Update the executor placement hints updateExecutorPlacementHints()所有的job都运行完成,暂停调度.
// If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason if (stageIdToNumTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() if (numRunningTasks != 0) { logWarning("No stages are running, but numRunningTasks != 0") numRunningTasks = 0 } } } }
在调度过程中发现executor超时需要移出时的处理:
private def removeExecutor(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen) if (!executorIds.contains(executorId)) { logWarning(s"Attempted to remove unknown executor $executorId!") return false }// Do not kill the executor again if it is already pending to be killed
(should never happen)
if (executorsPendingToRemove.contains(executorId)) { logWarning(s"Attempted to remove executor $executorId " + s"when it is already pending to be removed!") return false }这里检查下现在存在的executor的个数是否小于了配置的最小的executor的个数,如果不小于才执行移出操作.
// Do not kill the executor if we have already reached the lower bound val numExistingExecutors = executorIds.size - executorsPendingToRemove.size if (numExistingExecutors - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorId because there are only " + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") return false }这里发起对executor的停止操作.并把这个发起停止后的executor添加到executorsPendingToRemov集合中.等待回调此allocationManager中listener中的onExecutorRemoved函数.
// Send a request to the backend to kill this executor val removeRequestAcknowledged = testing || client.killExecutor(executorId) if (removeRequestAcknowledged) { logInfo(s"Removing executor $executorId because it has been idle for " + s"$executorIdleTimeoutS seconds (new desired total will be${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId) true } else { logWarning(s"Unable to reach the cluster manager to kill executor $executorId!") false } }
allocationManager中监听executor的移出操作:
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved)
: Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId) }其实这个函数就是从对应的集合中移出此executor.
private def onExecutorRemoved(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) { executorIds.remove(executorId) removeTimes.remove(executorId) logInfo(s"Existing executor $executorId has been removed (new total is${executorIds.size})")
if (executorsPendingToRemove.contains(executorId)) { executorsPendingToRemove.remove(executorId) logDebug(s"Executor $executorId is no longer pending to " + s"be removed (${executorsPendingToRemove.size} left)") } } else { logWarning(s"Unknown executor $executorId has been removed!") } }