@@ -21,26 +21,21 @@ import java.util.Collections
21
21
import java .util .concurrent ._
22
22
import java .util .regex .Pattern
23
23
24
- import org .apache .spark .util .Utils
25
-
26
24
import scala .collection .JavaConversions ._
27
25
import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet }
28
26
29
- import com .google .common .util .concurrent .ThreadFactoryBuilder
30
-
31
27
import org .apache .hadoop .conf .Configuration
32
28
import org .apache .hadoop .yarn .api .records ._
33
29
import org .apache .hadoop .yarn .client .api .AMRMClient
34
30
import org .apache .hadoop .yarn .client .api .AMRMClient .ContainerRequest
35
31
import org .apache .hadoop .yarn .util .RackResolver
36
-
37
32
import org .apache .log4j .{Level , Logger }
38
33
39
34
import org .apache .spark .{Logging , SecurityManager , SparkConf }
40
35
import org .apache .spark .deploy .yarn .YarnSparkHadoopUtil ._
41
36
import org .apache .spark .rpc .RpcEndpointRef
42
- import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
43
- import org .apache .spark .scheduler . cluster . CoarseGrainedClusterMessages . _
37
+ import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages . RemoveExecutor
38
+ import org .apache .spark .util . ThreadUtils
44
39
45
40
/**
46
41
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -108,13 +103,9 @@ private[yarn] class YarnAllocator(
108
103
// Resource capability requested for each executors
109
104
private [yarn] val resource = Resource .newInstance(executorMemory + memoryOverhead, executorCores)
110
105
111
- private val launcherPool = new ThreadPoolExecutor (
112
- // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
113
- sparkConf.getInt(" spark.yarn.containerLauncherMaxThreads" , 25 ), Integer .MAX_VALUE ,
114
- 1 , TimeUnit .MINUTES ,
115
- new LinkedBlockingQueue [Runnable ](),
116
- new ThreadFactoryBuilder ().setNameFormat(" ContainerLauncher #%d" ).setDaemon(true ).build())
117
- launcherPool.allowCoreThreadTimeOut(true )
106
+ private val launcherPool = ThreadUtils .newDaemonCachedThreadPool(
107
+ " ContainerLauncher" ,
108
+ sparkConf.getInt(" spark.yarn.containerLauncherMaxThreads" , 25 ))
118
109
119
110
// For testing
120
111
private val launchContainers = sparkConf.getBoolean(" spark.yarn.launchContainers" , true )
0 commit comments