Skip to content

Commit bf2904b

Browse files
author
Nathan Marz
committed
Added test for zero executors or tasks. Added validation that appropriate number of executors are set during submission or rebalancing.
1 parent 0deabfb commit bf2904b

File tree

6 files changed

+166
-23
lines changed

6 files changed

+166
-23
lines changed

src/clj/backtype/storm/daemon/common.clj

+20-13
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,33 @@
109109
(str id " is not a valid stream id"))))))
110110
))
111111

112+
(defn all-components [^StormTopology topology]
113+
(apply merge {}
114+
(for [f thrift/STORM-TOPOLOGY-FIELDS]
115+
(.getFieldValue topology f)
116+
)))
117+
118+
(defn component-conf [component]
119+
(->> component
120+
.get_common
121+
.get_json_conf
122+
from-json))
123+
112124
(defn validate-basic! [^StormTopology topology]
113125
(validate-ids! topology)
114126
(doseq [f thrift/SPOUT-FIELDS
115127
obj (->> f (.getFieldValue topology) vals)]
116128
(if-not (empty? (-> obj .get_common .get_inputs))
117-
(throw (InvalidTopologyException. "May not declare inputs for a spout"))
129+
(throw (InvalidTopologyException. "May not declare inputs for a spout"))))
130+
(doseq [[comp-id comp] (all-components topology)
131+
:let [conf (component-conf comp)
132+
p (-> comp .get_common thrift/parallelism-hint)]]
133+
(when (and (> (conf TOPOLOGY-TASKS) 0)
134+
p
135+
(<= p 0))
136+
(throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
118137
)))
119138

120-
(defn all-components [^StormTopology topology]
121-
(apply merge {}
122-
(for [f thrift/STORM-TOPOLOGY-FIELDS]
123-
(.getFieldValue topology f)
124-
)))
125-
126139
(defn validate-structure! [^StormTopology topology]
127140
;; validate all the component subscribe from component+stream which actually exists in the topology
128141
;; and if it is a fields grouping, validate the corresponding field exists
@@ -144,12 +157,6 @@
144157
(when-not (empty? diff-fields)
145158
(throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
146159

147-
(defn component-conf [component]
148-
(->> component
149-
.get_common
150-
.get_json_conf
151-
from-json))
152-
153160
(defn acker-inputs [^StormTopology topology]
154161
(let [bolt-ids (.. topology get_bolts keySet)
155162
spout-ids (.. topology get_spouts keySet)

src/clj/backtype/storm/daemon/nimbus.clj

+6-2
Original file line numberDiff line numberDiff line change
@@ -744,10 +744,14 @@
744744
(.get_wait_secs options))
745745
num-workers (if (.is_set_num_workers options)
746746
(.get_num_workers options))
747-
executor-overrrides (if (.is_set_num_executors options)
747+
executor-overrides (if (.is_set_num_executors options)
748748
(.get_num_executors options)
749749
{})]
750-
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrrides] true)
750+
(doseq [[c num-executors] executor-overrides]
751+
(when (<= num-executors 0)
752+
(throw (InvalidTopologyException. "Number of executors must be greater than 0"))
753+
))
754+
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
751755
))
752756

753757
(activate [this storm-name]

src/jvm/backtype/storm/generated/Nimbus.java

+104-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)