Skip to content

Commit c3db916

Browse files
committed
Merge pull request nathanmarz#702 from xumingming/fix-system-bolt
fix the issue that tick tuple cannot work with system bolt
2 parents a0bc262 + 483ce45 commit c3db916

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

storm-core/src/clj/backtype/storm/daemon/executor.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,10 @@
283283
receive-queue (:receive-queue executor-data)
284284
context (:worker-context executor-data)]
285285
(when tick-time-secs
286-
(if (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
287-
(= :spout (:type executor-data)))
288-
(log-message "Timeouts disabled for executor " (:executor-id executor-data))
286+
(if (or (system-id? (:component-id executor-data))
287+
(and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
288+
(= :spout (:type executor-data))))
289+
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
289290
(schedule-recurring
290291
(:user-timer worker)
291292
tick-time-secs
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
(ns backtype.storm.tick-tuple-test
2+
(:use [clojure test])
3+
(:use [backtype.storm bootstrap testing])
4+
(:use [backtype.storm.daemon common]))
5+
6+
(bootstrap)
7+
8+
(defbolt noop-bolt ["tuple"] {:prepare true}
9+
[conf context collector]
10+
(bolt
11+
(execute [tuple])))
12+
13+
(defspout noop-spout ["tuple"]
14+
[conf context collector]
15+
(spout
16+
(nextTuple [])))
17+
18+
(deftest test-tick-tuple-works-with-system-bolt
19+
(with-simulated-time-local-cluster [cluster]
20+
(let [topology (thrift/mk-topology
21+
{"1" (thrift/mk-spout-spec noop-spout)}
22+
{"2" (thrift/mk-bolt-spec {"1" ["tuple"]} noop-bolt)})]
23+
(try
24+
(submit-local-topology (:nimbus cluster)
25+
"test"
26+
{TOPOLOGY-TICK-TUPLE-FREQ-SECS 1}
27+
topology)
28+
(advance-cluster-time cluster 2)
29+
;; if reaches here, it means everything works ok.
30+
(is true)
31+
(catch Exception e
32+
(is false))))))
33+
34+
35+

0 commit comments

Comments
 (0)