Skip to content

Commit 83e2eec

Browse files
author
Nathan Marz
committed
implemented configurable high water mark for zmq
1 parent 317710d commit 83e2eec

File tree

5 files changed

+23
-5
lines changed

5 files changed

+23
-5
lines changed

conf/defaults.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ task.refresh.poll.secs: 10
5454

5555
zmq.threads: 1
5656
zmq.linger.millis: 5000
57+
zmq.high.water.mark: null
5758

5859
### topology.* configs are for specific executing storms
5960
topology.debug: false
@@ -68,4 +69,4 @@ topology.max.spout.pending: null
6869
topology.state.synchronization.timeout.secs: 60
6970
topology.stats.sample.rate: 0.05
7071
topology.fall.back.on.java.serialization: true
71-
topology.worker.childopts: nil
72+
topology.worker.childopts: null

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
mq-context
113113
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
114114
(storm-conf ZMQ-LINGER-MILLIS)
115+
(storm-conf ZMQ-HIGH-WATER-MARK)
115116
(= (conf STORM-CLUSTER-MODE) "local")))
116117
outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
117118
endpoint-socket-lock (mk-rw-lock)

src/clj/backtype/storm/messaging/zmq.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
(.close socket)
1818
))
1919

20-
(deftype ZMQContext [context linger-ms ipc?]
20+
(deftype ZMQContext [context linger-ms hwm ipc?]
2121
Context
2222
(bind [this virtual-port]
2323
(-> context
@@ -32,6 +32,7 @@
3232
(-> context
3333
(mq/socket mq/push)
3434
(mq/set-linger linger-ms)
35+
(mq/set-hwm hwm)
3536
(mq/connect url)
3637
(ZMQConnection.))))
3738
(send-local-task-empty [this virtual-port]
@@ -45,6 +46,6 @@
4546
context))
4647

4748

48-
(defn mk-zmq-context [num-threads linger local?]
49-
(ZMQContext. (mq/context num-threads) linger local?))
49+
(defn mk-zmq-context [num-threads linger hwm local?]
50+
(ZMQContext. (mq/context num-threads) linger hwm local?))
5051

src/clj/zilch/mq.clj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@
5656
(doto socket
5757
(.setLinger (long linger-ms))))
5858

59+
(defn set-hwm
60+
[^ZMQ$Socket socket hwm]
61+
(if hwm
62+
(doto socket
63+
(.setHwm (long hwm)))
64+
socket
65+
))
66+
5967
(defn bind
6068
[^ZMQ$Socket socket url]
6169
(doto socket

src/jvm/backtype/storm/Config.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,14 @@ public class Config extends HashMap<String, Object> {
342342
* The number of threads that should be used by the zeromq context in each worker process.
343343
*/
344344
public static String ZMQ_THREADS = "zmq.threads";
345-
345+
346+
/**
347+
* The high water mark for the underlying push sockets used to send messages. If downstream
348+
* tasks to a producer are overloaded, the producer will block until the downstream tasks
349+
* have consumed messages. This configuration can be used to prevent overloading in topologies
350+
* that use unreliable spouts and are thus not controllable with TOPOLOGY_MAX_SPOUT_PENDING.
351+
*/
352+
public static String ZMQ_HIGH_WATER_MARK = "zmq.high.water.mark";
346353

347354
/**
348355
* How long a connection should retry sending messages to a target host when

0 commit comments

Comments
 (0)