|
17 | 17 |
|
18 | 18 | package org.apache.spark.util
|
19 | 19 |
|
20 |
| -import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} |
| 20 | +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream} |
21 | 21 | import java.lang.{Double => JDouble, Float => JFloat}
|
22 | 22 | import java.net.{BindException, ServerSocket, URI}
|
23 | 23 | import java.nio.{ByteBuffer, ByteOrder}
|
24 | 24 | import java.text.DecimalFormatSymbols
|
25 |
| -import java.util.concurrent.TimeUnit |
26 | 25 | import java.util.Locale
|
| 26 | +import java.util.concurrent.TimeUnit |
27 | 27 |
|
28 | 28 | import scala.collection.mutable.ListBuffer
|
29 | 29 | import scala.util.Random
|
30 | 30 |
|
31 | 31 | import com.google.common.base.Charsets.UTF_8
|
32 | 32 | import com.google.common.io.Files
|
33 |
| - |
| 33 | +import org.apache.commons.lang3.SystemUtils |
34 | 34 | import org.apache.hadoop.conf.Configuration
|
35 | 35 | import org.apache.hadoop.fs.Path
|
36 |
| - |
37 | 36 | import org.apache.spark.network.util.ByteUnit
|
38 |
| -import org.apache.spark.{Logging, SparkFunSuite} |
39 |
| -import org.apache.spark.SparkConf |
| 37 | +import org.apache.spark.{Logging, SparkConf, SparkFunSuite} |
40 | 38 |
|
41 | 39 | class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|
42 | 40 |
|
@@ -745,4 +743,77 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|
745 | 743 | assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
|
746 | 744 | assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc xyz")
|
747 | 745 | }
|
| 746 | + |
| 747 | + test("Kill process") { |
| 748 | + // Verify that we can terminate a process even if it is in a bad state. This is only run |
| 749 | + // on UNIX since it does some OS specific things to verify the correct behavior. |
| 750 | + if (SystemUtils.IS_OS_UNIX) { |
| 751 | + def getPid(p: Process): Int = { |
| 752 | + val f = p.getClass().getDeclaredField("pid") |
| 753 | + f.setAccessible(true) |
| 754 | + f.get(p).asInstanceOf[Int] |
| 755 | + } |
| 756 | + |
| 757 | + def pidExists(pid: Int): Boolean = { |
| 758 | + val p = Runtime.getRuntime.exec(s"kill -0 $pid") |
| 759 | + p.waitFor() |
| 760 | + p.exitValue() == 0 |
| 761 | + } |
| 762 | + |
| 763 | + def signal(pid: Int, s: String): Unit = { |
| 764 | + val p = Runtime.getRuntime.exec(s"kill -$s $pid") |
| 765 | + p.waitFor() |
| 766 | + } |
| 767 | + |
| 768 | + // Start up a process that runs 'sleep 10'. Terminate the process and assert it takes |
| 769 | + // less time and the process is no longer there. |
| 770 | + val startTimeMs = System.currentTimeMillis() |
| 771 | + val process = new ProcessBuilder("sleep", "10").start() |
| 772 | + val pid = getPid(process) |
| 773 | + try { |
| 774 | + assert(pidExists(pid)) |
| 775 | + val terminated = Utils.terminateProcess(process, 5000) |
| 776 | + assert(terminated.isDefined) |
| 777 | + Utils.waitForProcess(process, 5000) |
| 778 | + val durationMs = System.currentTimeMillis() - startTimeMs |
| 779 | + assert(durationMs < 5000) |
| 780 | + assert(!pidExists(pid)) |
| 781 | + } finally { |
| 782 | + // Forcibly kill the test process just in case. |
| 783 | + signal(pid, "SIGKILL") |
| 784 | + } |
| 785 | + |
| 786 | + val v: String = System.getProperty("java.version") |
| 787 | + if (v >= "1.8.0") { |
| 788 | + // Java8 added a way to forcibly terminate a process. We'll make sure that works by |
| 789 | + // creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On |
| 790 | + // older versions of java, this will *not* terminate. |
| 791 | + val file = File.createTempFile("temp-file-name", ".tmp") |
| 792 | + val cmd = |
| 793 | + s""" |
| 794 | + |#!/bin/bash |
| 795 | + |trap "" SIGTERM |
| 796 | + |sleep 10 |
| 797 | + """.stripMargin |
| 798 | + Files.write(cmd.getBytes(), file) |
| 799 | + file.getAbsoluteFile.setExecutable(true) |
| 800 | + |
| 801 | + val process = new ProcessBuilder(file.getAbsolutePath).start() |
| 802 | + val pid = getPid(process) |
| 803 | + assert(pidExists(pid)) |
| 804 | + try { |
| 805 | + signal(pid, "SIGSTOP") |
| 806 | + val start = System.currentTimeMillis() |
| 807 | + val terminated = Utils.terminateProcess(process, 5000) |
| 808 | + assert(terminated.isDefined) |
| 809 | + Utils.waitForProcess(process, 5000) |
| 810 | + val duration = System.currentTimeMillis() - start |
| 811 | + assert(duration < 5000) |
| 812 | + assert(!pidExists(pid)) |
| 813 | + } finally { |
| 814 | + signal(pid, "SIGKILL") |
| 815 | + } |
| 816 | + } |
| 817 | + } |
| 818 | + } |
748 | 819 | }
|
0 commit comments