File tree 2 files changed +13
-10
lines changed
2 files changed +13
-10
lines changed Original file line number Diff line number Diff line change 154
154
(recursive-map
155
155
:worker worker
156
156
:worker-context worker-context
157
+ :executor-id executor-id
157
158
:task-ids task-ids
158
159
:component-id component-id
159
160
:storm-conf storm-conf
202
203
receive-queue (:receive-queue executor-data)
203
204
context (:worker-context executor-data)]
204
205
(when tick-time-secs
205
- (schedule-recurring
206
- (:timer worker)
207
- tick-time-secs
208
- tick-time-secs
209
- (fn []
210
- (disruptor/publish
211
- receive-queue
212
- [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
213
- ))))))
206
+ (if-not (pos? tick-time-secs)
207
+ (log-message " Timeouts disabled for executor " (:executor-id executor-data))
208
+ (schedule-recurring
209
+ (:timer worker)
210
+ tick-time-secs
211
+ tick-time-secs
212
+ (fn []
213
+ (disruptor/publish
214
+ receive-queue
215
+ [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
216
+ )))))))
214
217
215
218
(defn mk-executor [worker executor-id]
216
219
(let [executor-data (executor-data worker executor-id)
Original file line number Diff line number Diff line change 101
101
daemon-conf (merge (read-storm-config )
102
102
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
103
103
ZMQ-LINGER-MILLIS 0
104
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS -1
104
105
}
105
106
daemon-conf
106
107
{STORM-CLUSTER-MODE " local"
313
314
(Thread/sleep 100 )
314
315
))
315
316
316
-
317
317
(defprotocol CompletableSpout
318
318
(exhausted? [this] " Whether all the tuples for this spout have been completed." )
319
319
(cleanup [this] " Cleanup any global state kept" )
You can’t perform that action at this time.
0 commit comments