Skip to content

Commit 3868ab6

Browse files
committed
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient (backport 1.5)
backport apache#10108 to branch 1.5 Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#10135 from zsxwing/fix-threadpool-1.5.
1 parent 93a0510 commit 3868ab6

File tree

3 files changed

+13
-26
lines changed

3 files changed

+13
-26
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,10 @@ private[spark] class AppClient(
6666
// A thread pool for registering with masters. Because registering with a master is a blocking
6767
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
6868
// time so that we can register with all masters.
69-
private val registerMasterThreadPool = new ThreadPoolExecutor(
70-
0,
71-
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
72-
60L, TimeUnit.SECONDS,
73-
new SynchronousQueue[Runnable](),
74-
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
69+
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
70+
"appclient-register-master-threadpool",
71+
masterRpcAddresses.length // Make sure we can register with all masters at the same time
72+
)
7573

7674
// A scheduled executor for scheduling the registration actions
7775
private val registrationRetryThread =

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,10 @@ private[deploy] class Worker(
147147
// A thread pool for registering with masters. Because registering with a master is a blocking
148148
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
149149
// time so that we can register with all masters.
150-
private val registerMasterThreadPool = new ThreadPoolExecutor(
151-
0,
152-
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
153-
60L, TimeUnit.SECONDS,
154-
new SynchronousQueue[Runnable](),
155-
ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
150+
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
151+
"worker-register-master-threadpool",
152+
masterRpcAddresses.size // Make sure we can register with all masters at the same time
153+
)
156154

157155
var coresUsed = 0
158156
var memoryUsed = 0

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,21 @@ import java.util.Collections
2121
import java.util.concurrent._
2222
import java.util.regex.Pattern
2323

24-
import org.apache.spark.util.Utils
25-
2624
import scala.collection.JavaConversions._
2725
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2826

29-
import com.google.common.util.concurrent.ThreadFactoryBuilder
30-
3127
import org.apache.hadoop.conf.Configuration
3228
import org.apache.hadoop.yarn.api.records._
3329
import org.apache.hadoop.yarn.client.api.AMRMClient
3430
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3531
import org.apache.hadoop.yarn.util.RackResolver
36-
3732
import org.apache.log4j.{Level, Logger}
3833

3934
import org.apache.spark.{Logging, SecurityManager, SparkConf}
4035
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
4136
import org.apache.spark.rpc.RpcEndpointRef
42-
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
43-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
37+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
38+
import org.apache.spark.util.ThreadUtils
4439

4540
/**
4641
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -108,13 +103,9 @@ private[yarn] class YarnAllocator(
108103
// Resource capability requested for each executors
109104
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
110105

111-
private val launcherPool = new ThreadPoolExecutor(
112-
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
113-
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
114-
1, TimeUnit.MINUTES,
115-
new LinkedBlockingQueue[Runnable](),
116-
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
117-
launcherPool.allowCoreThreadTimeOut(true)
106+
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
107+
"ContainerLauncher",
108+
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
118109

119110
// For testing
120111
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)

0 commit comments

Comments
 (0)