Skip to content

Commit dae3461

Browse files
CodingCatsrowen
authored andcommitted
[SPARK-13803] restore the changes in SPARK-3411
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in apache#1106 which was erased due to the merging of apache#731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes apache#11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365b) Signed-off-by: Sean Owen <sowen@cloudera.com>
1 parent 54ff1f9 commit dae3461

File tree

1 file changed

+17
-4
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+17
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -583,15 +583,28 @@ private[master] class Master(
583583
* every time a new app joins or resource availability changes.
584584
*/
585585
private def schedule(): Unit = {
586-
if (state != RecoveryState.ALIVE) { return }
586+
if (state != RecoveryState.ALIVE) {
587+
return
588+
}
587589
// Drivers take strict precedence over executors
588-
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
589-
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
590-
for (driver <- waitingDrivers) {
590+
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
591+
val numWorkersAlive = shuffledAliveWorkers.size
592+
var curPos = 0
593+
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
594+
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
595+
// start from the last worker that was assigned a driver, and continue onwards until we have
596+
// explored all alive workers.
597+
var launched = false
598+
var numWorkersVisited = 0
599+
while (numWorkersVisited < numWorkersAlive && !launched) {
600+
val worker = shuffledAliveWorkers(curPos)
601+
numWorkersVisited += 1
591602
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
592603
launchDriver(worker, driver)
593604
waitingDrivers -= driver
605+
launched = true
594606
}
607+
curPos = (curPos + 1) % numWorkersAlive
595608
}
596609
}
597610
startExecutorsOnWorkers()

0 commit comments

Comments
 (0)