File tree Expand file tree Collapse file tree 5 files changed +23
-5
lines changed Expand file tree Collapse file tree 5 files changed +23
-5
lines changed Original file line number Diff line number Diff line change @@ -54,6 +54,7 @@ task.refresh.poll.secs: 10
54
54
55
55
zmq.threads : 1
56
56
zmq.linger.millis : 5000
57
+ zmq.high.water.mark : null
57
58
58
59
# ## topology.* configs are for specific executing storms
59
60
topology.debug : false
@@ -68,4 +69,4 @@ topology.max.spout.pending: null
68
69
topology.state.synchronization.timeout.secs : 60
69
70
topology.stats.sample.rate : 0.05
70
71
topology.fall.back.on.java.serialization : true
71
- topology.worker.childopts : nil
72
+ topology.worker.childopts : null
Original file line number Diff line number Diff line change 112
112
mq-context
113
113
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
114
114
(storm-conf ZMQ-LINGER-MILLIS)
115
+ (storm-conf ZMQ-HIGH-WATER-MARK)
115
116
(= (conf STORM-CLUSTER-MODE) " local" )))
116
117
outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
117
118
endpoint-socket-lock (mk-rw-lock )
Original file line number Diff line number Diff line change 17
17
(.close socket)
18
18
))
19
19
20
- (deftype ZMQContext [context linger-ms ipc?]
20
+ (deftype ZMQContext [context linger-ms hwm ipc?]
21
21
Context
22
22
(bind [this virtual-port]
23
23
(-> context
32
32
(-> context
33
33
(mq/socket mq/push)
34
34
(mq/set-linger linger-ms)
35
+ (mq/set-hwm hwm)
35
36
(mq/connect url)
36
37
(ZMQConnection. ))))
37
38
(send-local-task-empty [this virtual-port]
45
46
context))
46
47
47
48
48
- (defn mk-zmq-context [num-threads linger local?]
49
- (ZMQContext. (mq/context num-threads) linger local?))
49
+ (defn mk-zmq-context [num-threads linger hwm local?]
50
+ (ZMQContext. (mq/context num-threads) linger hwm local?))
50
51
Original file line number Diff line number Diff line change 56
56
(doto socket
57
57
(.setLinger (long linger-ms))))
58
58
59
+ (defn set-hwm
60
+ [^ZMQ$Socket socket hwm]
61
+ (if hwm
62
+ (doto socket
63
+ (.setHwm (long hwm)))
64
+ socket
65
+ ))
66
+
59
67
(defn bind
60
68
[^ZMQ$Socket socket url]
61
69
(doto socket
Original file line number Diff line number Diff line change @@ -342,7 +342,14 @@ public class Config extends HashMap<String, Object> {
342
342
* The number of threads that should be used by the zeromq context in each worker process.
343
343
*/
344
344
public static String ZMQ_THREADS = "zmq.threads" ;
345
-
345
+
346
+ /**
347
+ * The high water mark for the underlying push sockets used to send messages. If downstream
348
+ * tasks to a producer are overloaded, the producer will block until the downstream tasks
349
+ * have consumed messages. This configuration can be used to prevent overloading in topologies
350
+ * that use unreliable spouts and are thus not controllable with TOPOLOGY_MAX_SPOUT_PENDING.
351
+ */
352
+ public static String ZMQ_HIGH_WATER_MARK = "zmq.high.water.mark" ;
346
353
347
354
/**
348
355
* How long a connection should retry sending messages to a target host when
You can’t perform that action at this time.
0 commit comments