Skip to content

Commit 6568642

Browse files
author
Nathan Marz
committed
batch the receive thread with zmq noblock
1 parent f1a5e08 commit 6568642

File tree

6 files changed

+36
-35
lines changed

6 files changed

+36
-35
lines changed

conf/defaults.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ topology.fall.back.on.java.serialization: true
8787
topology.worker.childopts: null
8888
topology.executor.receive.buffer.size: 8192 #batched
8989
topology.executor.send.buffer.size: 16384 #individual messages
90-
topology.receiver.buffer.size: 16384 #individual messages
90+
topology.receiver.buffer.size: 1024 #individual messages
9191
topology.transfer.buffer.size: 32 # batched
9292

9393
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

src/clj/backtype/storm/messaging/loader.clj

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,34 @@
1616
(apply afn args)))
1717

1818
(defnk launch-receive-thread!
19-
[context storm-id port transfer-local-fn send-buffer-size
19+
[context storm-id port transfer-local-fn max-buffer-size
2020
:daemon true
2121
:kill-fn (fn [t] (System/exit 1))
2222
:priority Thread/NORM_PRIORITY]
23-
(let [receive-batcher (disruptor/disruptor-queue send-buffer-size
24-
:claim-strategy :single-threaded
25-
:wait-strategy :yield)
26-
cached-emit (MutableObject. (ArrayList.))
23+
(let [max-buffer-size (int max-buffer-size)
2724
vthread (async-loop
2825
(fn [socket]
29-
(let [[task msg :as packet] (msg/recv socket)]
30-
(if (= task -1)
31-
(do
32-
(log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
33-
(.close socket)
34-
nil )
35-
(do
36-
(disruptor/publish receive-batcher packet)
37-
0 ))))
26+
(let [batched (ArrayList.)
27+
init (msg/recv socket)]
28+
(loop [[task msg :as packet] init]
29+
(cond (nil? packet)
30+
(do (transfer-local-fn batched)
31+
0)
32+
(= task -1)
33+
(do
34+
(log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
35+
(.close socket)
36+
nil )
37+
:else (do (.add batched packet)
38+
(if (< (.size batched) max-buffer-size)
39+
(recur (msg/recv-with-flags socket 1)) ;; 1 is ZMQ/NOBLOCK
40+
0
41+
))
42+
))))
3843
:args-fn (fn [] [(msg/bind context storm-id port)])
3944
:daemon daemon
4045
:kill-fn kill-fn
4146
:priority priority)]
42-
(disruptor/set-handler
43-
receive-batcher
44-
(fn [o seq-id batch-end?]
45-
(let [^ArrayList alist (.getObject cached-emit)]
46-
(.add alist o)
47-
(when batch-end?
48-
(transfer-local-fn alist)
49-
(.setObject cached-emit (ArrayList.))
50-
))))
5147
(fn []
5248
(let [kill-socket (msg/connect context storm-id "localhost" port)]
5349
(log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
@@ -56,7 +52,6 @@
5652
(byte-array []))
5753
(log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
5854
(.join vthread)
59-
(.shutdown receive-batcher)
6055
(.close kill-socket)
6156
(log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
6257
))))

src/clj/backtype/storm/messaging/local.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313

1414
(deftype LocalConnection [storm-id port queues-map lock queue]
1515
Connection
16-
(recv [this]
16+
(recv-with-flags [this flags]
1717
(when-not queue
1818
(throw (IllegalArgumentException. "Cannot receive on this socket")))
19-
(.take queue))
19+
(if (= flags 1)
20+
(.poll queue)
21+
(.take queue)))
2022
(send [this task message]
2123
(let [send-queue (add-queue! queues-map lock storm-id port)]
2224
(.put send-queue [task message])

src/clj/backtype/storm/messaging/protocol.clj

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
)
44

55
(defprotocol Connection
6-
(recv [conn])
6+
(recv-with-flags [conn flags])
77
(send [conn task message])
88
(close [conn])
99
)
@@ -14,3 +14,6 @@
1414
(term [context])
1515
)
1616

17+
(defn recv [conn]
18+
(recv-with-flags conn 0))
19+

src/clj/backtype/storm/messaging/zmq.clj

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828

2929
(deftype ZMQConnection [socket ^ByteBuffer bb]
3030
Connection
31-
(recv [this]
32-
(let [part1 (mq/recv socket)]
33-
(when-not (mq/recv-more? socket)
34-
(throw (RuntimeException. "Should always receive two-part ZMQ messages")))
35-
(parse-packet part1 (mq/recv socket))))
31+
(recv-with-flags [this flags]
32+
(let [part1 (mq/recv socket flags)]
33+
(when part1
34+
(when-not (mq/recv-more? socket)
35+
(throw (RuntimeException. "Should always receive two-part ZMQ messages")))
36+
(parse-packet part1 (mq/recv socket)))))
3637
(send [this task message]
3738
(.clear bb)
3839
(.putShort bb (short task))

src/jvm/backtype/storm/Config.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,8 @@ public class Config extends HashMap<String, Object> {
425425
public static String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
426426

427427
/**
428-
* The size of the Disruptor queue for the thread that receives messages from the network
429-
* (used to batch messages onto the executor queues).
428+
* The maximum number of messages to batch from the thread receiving off the network to the
429+
* executor queues.
430430
*/
431431
public static String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
432432

0 commit comments

Comments
 (0)