File tree Expand file tree Collapse file tree 2 files changed +39
-3
lines changed
src/clj/backtype/storm/daemon Expand file tree Collapse file tree 2 files changed +39
-3
lines changed Original file line number Diff line number Diff line change 283
283
receive-queue (:receive-queue executor-data)
284
284
context (:worker-context executor-data)]
285
285
(when tick-time-secs
286
- (if (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
287
- (= :spout (:type executor-data)))
288
- (log-message " Timeouts disabled for executor " (:executor-id executor-data))
286
+ (if (or (system-id? (:component-id executor-data))
287
+ (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
288
+ (= :spout (:type executor-data))))
289
+ (log-message " Timeouts disabled for executor " (:component-id executor-data) " :" (:executor-id executor-data))
289
290
(schedule-recurring
290
291
(:user-timer worker)
291
292
tick-time-secs
Original file line number Diff line number Diff line change
1
+ (ns backtype.storm.tick-tuple-test
2
+ (:use [clojure test])
3
+ (:use [backtype.storm bootstrap testing])
4
+ (:use [backtype.storm.daemon common]))
5
+
6
+ (bootstrap )
7
+
8
+ (defbolt noop-bolt [" tuple" ] {:prepare true }
9
+ [conf context collector]
10
+ (bolt
11
+ (execute [tuple])))
12
+
13
+ (defspout noop-spout [" tuple" ]
14
+ [conf context collector]
15
+ (spout
16
+ (nextTuple [])))
17
+
18
+ (deftest test-tick-tuple-works-with-system-bolt
19
+ (with-simulated-time-local-cluster [cluster]
20
+ (let [topology (thrift/mk-topology
21
+ {" 1" (thrift/mk-spout-spec noop-spout)}
22
+ {" 2" (thrift/mk-bolt-spec {" 1" [" tuple" ]} noop-bolt)})]
23
+ (try
24
+ (submit-local-topology (:nimbus cluster)
25
+ " test"
26
+ {TOPOLOGY-TICK-TUPLE-FREQ-SECS 1 }
27
+ topology)
28
+ (advance-cluster-time cluster 2 )
29
+ ; ; if reaches here, it means everything works ok.
30
+ (is true )
31
+ (catch Exception e
32
+ (is false ))))))
33
+
34
+
35
+
You can’t perform that action at this time.
0 commit comments