Skip to content

Commit 8f65939

Browse files
nongliAndrew Or
authored andcommitted
[SPARK-12486] Worker should kill the executors more forcefully if possible.
This patch updates the ExecutorRunner's terminate path to use the new java 8 API to terminate processes more forcefully if possible. If the executor is unhealthy, it would previously ignore the destroy() call. Presumably, the new java API was added to handle cases like this. We could update the termination path in the future to use OS specific commands for older java versions. Author: Nong Li <nong@databricks.com> Closes apache#10438 from nongli/spark-12486-executors.
1 parent 962aac4 commit 8f65939

File tree

3 files changed

+112
-12
lines changed

3 files changed

+112
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ import scala.collection.JavaConverters._
2323

2424
import com.google.common.base.Charsets.UTF_8
2525
import com.google.common.io.Files
26-
27-
import org.apache.spark.rpc.RpcEndpointRef
28-
import org.apache.spark.{SecurityManager, SparkConf, Logging}
29-
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3026
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
27+
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
28+
import org.apache.spark.rpc.RpcEndpointRef
3129
import org.apache.spark.util.{ShutdownHookManager, Utils}
3230
import org.apache.spark.util.logging.FileAppender
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3332

3433
/**
3534
* Manages the execution of one executor process.
@@ -60,6 +59,9 @@ private[deploy] class ExecutorRunner(
6059
private var stdoutAppender: FileAppender = null
6160
private var stderrAppender: FileAppender = null
6261

62+
// Timeout to wait for when trying to terminate an executor.
63+
private val EXECUTOR_TERMINATE_TIMEOUT_MS = 10 * 1000
64+
6365
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
6466
// make sense to remove this in the future.
6567
private var shutdownHook: AnyRef = null
@@ -94,8 +96,11 @@ private[deploy] class ExecutorRunner(
9496
if (stderrAppender != null) {
9597
stderrAppender.stop()
9698
}
97-
process.destroy()
98-
exitCode = Some(process.waitFor())
99+
exitCode = Utils.terminateProcess(process, EXECUTOR_TERMINATE_TIMEOUT_MS)
100+
if (exitCode.isEmpty) {
101+
logWarning("Failed to terminate process: " + process +
102+
". This process will likely be orphaned.")
103+
}
99104
}
100105
try {
101106
worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode))

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,6 +1698,30 @@ private[spark] object Utils extends Logging {
16981698
new File(path).getName
16991699
}
17001700

1701+
/**
1702+
* Terminates a process waiting for at most the specified duration. Returns whether
1703+
* the process terminated.
1704+
*/
1705+
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
1706+
try {
1707+
// Java8 added a new API which will more forcibly kill the process. Use that if available.
1708+
val destroyMethod = process.getClass().getMethod("destroyForcibly");
1709+
destroyMethod.setAccessible(true)
1710+
destroyMethod.invoke(process)
1711+
} catch {
1712+
case NonFatal(e) =>
1713+
if (!e.isInstanceOf[NoSuchMethodException]) {
1714+
logWarning("Exception when attempting to kill process", e)
1715+
}
1716+
process.destroy()
1717+
}
1718+
if (waitForProcess(process, timeoutMs)) {
1719+
Option(process.exitValue())
1720+
} else {
1721+
None
1722+
}
1723+
}
1724+
17011725
/**
17021726
* Wait for a process to terminate for at most the specified duration.
17031727
* Return whether the process actually terminated after the given timeout.

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,24 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream}
2121
import java.lang.{Double => JDouble, Float => JFloat}
2222
import java.net.{BindException, ServerSocket, URI}
2323
import java.nio.{ByteBuffer, ByteOrder}
2424
import java.text.DecimalFormatSymbols
25-
import java.util.concurrent.TimeUnit
2625
import java.util.Locale
26+
import java.util.concurrent.TimeUnit
2727

2828
import scala.collection.mutable.ListBuffer
2929
import scala.util.Random
3030

3131
import com.google.common.base.Charsets.UTF_8
3232
import com.google.common.io.Files
33-
33+
import org.apache.commons.lang3.SystemUtils
3434
import org.apache.hadoop.conf.Configuration
3535
import org.apache.hadoop.fs.Path
36-
3736
import org.apache.spark.network.util.ByteUnit
38-
import org.apache.spark.{Logging, SparkFunSuite}
39-
import org.apache.spark.SparkConf
37+
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
4038

4139
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
4240

@@ -745,4 +743,77 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
745743
assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
746744
assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc xyz")
747745
}
746+
747+
test("Kill process") {
748+
// Verify that we can terminate a process even if it is in a bad state. This is only run
749+
// on UNIX since it does some OS specific things to verify the correct behavior.
750+
if (SystemUtils.IS_OS_UNIX) {
751+
def getPid(p: Process): Int = {
752+
val f = p.getClass().getDeclaredField("pid")
753+
f.setAccessible(true)
754+
f.get(p).asInstanceOf[Int]
755+
}
756+
757+
def pidExists(pid: Int): Boolean = {
758+
val p = Runtime.getRuntime.exec(s"kill -0 $pid")
759+
p.waitFor()
760+
p.exitValue() == 0
761+
}
762+
763+
def signal(pid: Int, s: String): Unit = {
764+
val p = Runtime.getRuntime.exec(s"kill -$s $pid")
765+
p.waitFor()
766+
}
767+
768+
// Start up a process that runs 'sleep 10'. Terminate the process and assert it takes
769+
// less time and the process is no longer there.
770+
val startTimeMs = System.currentTimeMillis()
771+
val process = new ProcessBuilder("sleep", "10").start()
772+
val pid = getPid(process)
773+
try {
774+
assert(pidExists(pid))
775+
val terminated = Utils.terminateProcess(process, 5000)
776+
assert(terminated.isDefined)
777+
Utils.waitForProcess(process, 5000)
778+
val durationMs = System.currentTimeMillis() - startTimeMs
779+
assert(durationMs < 5000)
780+
assert(!pidExists(pid))
781+
} finally {
782+
// Forcibly kill the test process just in case.
783+
signal(pid, "SIGKILL")
784+
}
785+
786+
val v: String = System.getProperty("java.version")
787+
if (v >= "1.8.0") {
788+
// Java8 added a way to forcibly terminate a process. We'll make sure that works by
789+
// creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On
790+
// older versions of java, this will *not* terminate.
791+
val file = File.createTempFile("temp-file-name", ".tmp")
792+
val cmd =
793+
s"""
794+
|#!/bin/bash
795+
|trap "" SIGTERM
796+
|sleep 10
797+
""".stripMargin
798+
Files.write(cmd.getBytes(), file)
799+
file.getAbsoluteFile.setExecutable(true)
800+
801+
val process = new ProcessBuilder(file.getAbsolutePath).start()
802+
val pid = getPid(process)
803+
assert(pidExists(pid))
804+
try {
805+
signal(pid, "SIGSTOP")
806+
val start = System.currentTimeMillis()
807+
val terminated = Utils.terminateProcess(process, 5000)
808+
assert(terminated.isDefined)
809+
Utils.waitForProcess(process, 5000)
810+
val duration = System.currentTimeMillis() - start
811+
assert(duration < 5000)
812+
assert(!pidExists(pid))
813+
} finally {
814+
signal(pid, "SIGKILL")
815+
}
816+
}
817+
}
818+
}
748819
}

0 commit comments

Comments
 (0)