Skip to content

Commit 78f1812

Browse files
author
Nathan Marz
committed
Merge remote-tracking branch 'xiao/storm-425'
2 parents 9420e7e + cce1c88 commit 78f1812

File tree

3 files changed

+17
-1
lines changed

3 files changed

+17
-1
lines changed

conf/defaults.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ ui.port: 8080
3636
ui.childopts: "-Xmx768m"
3737

3838
drpc.port: 3772
39+
drpc.worker.threads: 64
40+
drpc.queue.size: 128
3941
drpc.invocations.port: 3773
4042
drpc.request.timeout.secs: 600
4143

src/clj/backtype/storm/daemon/drpc.clj

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
77
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
88
DistributedRPCInvocations$Processor])
9-
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
9+
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
1010
(:import [backtype.storm.daemon Shutdownable])
1111
(:import [java.net InetAddress])
1212
(:use [backtype.storm bootstrap config log])
@@ -100,6 +100,8 @@
100100
(defn launch-server!
101101
([]
102102
(let [conf (read-storm-config)
103+
worker-threads (int (conf DRPC-WORKER-THREADS))
104+
queue-size (int (conf DRPC-QUEUE-SIZE))
103105
service-handler (service-handler)
104106
;; requests and returns need to be on separate thread pools, since calls to
105107
;; "execute" don't unblock until other thrift methods are called. So if
@@ -108,6 +110,8 @@
108110
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
109111
(THsHaServer$Args.)
110112
(.workerThreads 64)
113+
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
114+
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
111115
(.protocolFactory (TBinaryProtocol$Factory.))
112116
(.processor (DistributedRPC$Processor. service-handler))
113117
))

src/jvm/backtype/storm/Config.java

+10
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,16 @@ public class Config extends HashMap<String, Object> {
222222
*/
223223
public static String DRPC_PORT = "drpc.port";
224224

225+
/**
226+
* DRPC thrift server worker threads
227+
*/
228+
public static String DRPC_WORKER_THREADS = "drpc.worker.threads";
229+
230+
/**
231+
* DRPC thrift server queue size
232+
*/
233+
public static String DRPC_QUEUE_SIZE = "drpc.queue.size";
234+
225235
/**
226236
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
227237
*/

0 commit comments

Comments
 (0)