File tree 3 files changed +17
-1
lines changed
clj/backtype/storm/daemon
3 files changed +17
-1
lines changed Original file line number Diff line number Diff line change @@ -36,6 +36,8 @@ ui.port: 8080
36
36
ui.childopts : " -Xmx768m"
37
37
38
38
drpc.port : 3772
39
+ drpc.worker.threads : 64
40
+ drpc.queue.size : 128
39
41
drpc.invocations.port : 3773
40
42
drpc.request.timeout.secs : 600
41
43
Original file line number Diff line number Diff line change 6
6
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
7
7
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
8
8
DistributedRPCInvocations$Processor])
9
- (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
9
+ (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit ])
10
10
(:import [backtype.storm.daemon Shutdownable])
11
11
(:import [java.net InetAddress])
12
12
(:use [backtype.storm bootstrap config log])
100
100
(defn launch-server!
101
101
([]
102
102
(let [conf (read-storm-config )
103
+ worker-threads (int (conf DRPC-WORKER-THREADS))
104
+ queue-size (int (conf DRPC-QUEUE-SIZE))
103
105
service-handler (service-handler )
104
106
; ; requests and returns need to be on separate thread pools, since calls to
105
107
; ; "execute" don't unblock until other thrift methods are called. So if
108
110
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
109
111
(THsHaServer$Args. )
110
112
(.workerThreads 64 )
113
+ (.executorService (ThreadPoolExecutor. worker-threads worker-threads
114
+ 60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
111
115
(.protocolFactory (TBinaryProtocol$Factory. ))
112
116
(.processor (DistributedRPC$Processor. service-handler))
113
117
))
Original file line number Diff line number Diff line change @@ -222,6 +222,16 @@ public class Config extends HashMap<String, Object> {
222
222
*/
223
223
public static String DRPC_PORT = "drpc.port" ;
224
224
225
+ /**
226
+ * DRPC thrift server worker threads
227
+ */
228
+ public static String DRPC_WORKER_THREADS = "drpc.worker.threads" ;
229
+
230
+ /**
231
+ * DRPC thrift server queue size
232
+ */
233
+ public static String DRPC_QUEUE_SIZE = "drpc.queue.size" ;
234
+
225
235
/**
226
236
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
227
237
*/
You can’t perform that action at this time.
0 commit comments