Skip to content

Commit 72da2a2

Browse files
Andrew OrJoshRosen
authored andcommitted
[SPARK-8414] Ensure context cleaner periodic cleanups
Garbage collection triggers cleanups. If the driver JVM is huge and there is little memory pressure, we may never clean up shuffle files on executors. This is a problem for long-running applications (e.g. streaming). Author: Andrew Or <andrew@databricks.com> Closes apache#10070 from andrewor14/periodic-gc. (cherry picked from commit 1ce4adf) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
1 parent 1b3db96 commit 72da2a2

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark
1919

2020
import java.lang.ref.{ReferenceQueue, WeakReference}
21+
import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
2122

2223
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2324

2425
import org.apache.spark.broadcast.Broadcast
2526
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
26-
import org.apache.spark.util.Utils
27+
import org.apache.spark.util.{ThreadUtils, Utils}
2728

2829
/**
2930
* Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6667

6768
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
6869

70+
private val periodicGCService: ScheduledExecutorService =
71+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
72+
73+
/**
74+
* How often to trigger a garbage collection in this JVM.
75+
*
76+
* This context cleaner triggers cleanups only when weak references are garbage collected.
77+
* In long-running applications with large driver JVMs, where there is little memory pressure
78+
* on the driver, this may happen very occasionally or not at all. Not cleaning at all may
79+
* lead to executors running out of disk space after a while.
80+
*/
81+
private val periodicGCInterval =
82+
sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
83+
6984
/**
7085
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
7186
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
104119
cleaningThread.setDaemon(true)
105120
cleaningThread.setName("Spark Context Cleaner")
106121
cleaningThread.start()
122+
periodicGCService.scheduleAtFixedRate(new Runnable {
123+
override def run(): Unit = System.gc()
124+
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
107125
}
108126

109127
/**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
119137
cleaningThread.interrupt()
120138
}
121139
cleaningThread.join()
140+
periodicGCService.shutdown()
122141
}
123142

124143
/** Register a RDD for cleanup when it is garbage collected. */

0 commit comments

Comments
 (0)