Skip to content

Commit f2a7046

Browse files
author
Nathan Marz
committed
Merge branch '0.8.0' into scheduler
2 parents 436ae85 + 365eaeb commit f2a7046

File tree

8 files changed

+276
-69
lines changed

8 files changed

+276
-69
lines changed

src/clj/backtype/storm/daemon/common.clj

+22-18
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,33 @@
109109
(str id " is not a valid stream id"))))))
110110
))
111111

112+
(defn all-components [^StormTopology topology]
113+
(apply merge {}
114+
(for [f thrift/STORM-TOPOLOGY-FIELDS]
115+
(.getFieldValue topology f)
116+
)))
117+
118+
(defn component-conf [component]
119+
(->> component
120+
.get_common
121+
.get_json_conf
122+
from-json))
123+
112124
(defn validate-basic! [^StormTopology topology]
113125
(validate-ids! topology)
114126
(doseq [f thrift/SPOUT-FIELDS
115127
obj (->> f (.getFieldValue topology) vals)]
116128
(if-not (empty? (-> obj .get_common .get_inputs))
117-
(throw (InvalidTopologyException. "May not declare inputs for a spout"))
129+
(throw (InvalidTopologyException. "May not declare inputs for a spout"))))
130+
(doseq [[comp-id comp] (all-components topology)
131+
:let [conf (component-conf comp)
132+
p (-> comp .get_common thrift/parallelism-hint)]]
133+
(when (and (> (conf TOPOLOGY-TASKS) 0)
134+
p
135+
(<= p 0))
136+
(throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
118137
)))
119138

120-
(defn all-components [^StormTopology topology]
121-
(apply merge {}
122-
(for [f thrift/STORM-TOPOLOGY-FIELDS]
123-
(.getFieldValue topology f)
124-
)))
125-
126139
(defn validate-structure! [^StormTopology topology]
127140
;; validate all the component subscribe from component+stream which actually exists in the topology
128141
;; and if it is a fields grouping, validate the corresponding field exists
@@ -144,12 +157,6 @@
144157
(when-not (empty? diff-fields)
145158
(throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
146159

147-
(defn component-conf [component]
148-
(->> component
149-
.get_common
150-
.get_json_conf
151-
from-json))
152-
153160
(defn acker-inputs [^StormTopology topology]
154161
(let [bolt-ids (.. topology get_bolts keySet)
155162
spout-ids (.. topology get_spouts keySet)
@@ -166,14 +173,13 @@
166173

167174
(defn add-acker! [storm-conf ^StormTopology ret]
168175
(let [num-executors (storm-conf TOPOLOGY-ACKER-EXECUTORS)
169-
num-tasks (storm-conf TOPOLOGY-ACKER-TASKS)
170176
acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
171177
(new backtype.storm.daemon.acker)
172178
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
173179
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
174180
}
175181
:p num-executors
176-
:conf {TOPOLOGY-TASKS num-tasks
182+
:conf {TOPOLOGY-TASKS num-executors
177183
TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
178184
(dofor [[_ bolt] (.get_bolts ret)
179185
:let [common (.get_common bolt)]]
@@ -228,9 +234,7 @@
228234
))
229235

230236
(defn has-ackers? [storm-conf]
231-
(let [tasks (storm-conf TOPOLOGY-ACKER-TASKS)]
232-
(and (or (nil? tasks) (> tasks 0))
233-
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))))
237+
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))
234238

235239
(defn num-start-executors [component]
236240
(thrift/parallelism-hint (.get_common component)))

src/clj/backtype/storm/daemon/nimbus.clj

+6-3
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,6 @@
718718
(merge storm-conf
719719
{TOPOLOGY-KRYO-REGISTER (merge (mapify-serializations component-sers)
720720
(mapify-serializations base-sers))
721-
TOPOLOGY-ACKER-TASKS (or (total-conf TOPOLOGY-ACKER-TASKS) (total-conf TOPOLOGY-ACKER-EXECUTORS))
722721
TOPOLOGY-ACKER-EXECUTORS (total-conf TOPOLOGY-ACKER-EXECUTORS)
723722
TOPOLOGY-MAX-TASK-PARALLELISM (total-conf TOPOLOGY-MAX-TASK-PARALLELISM)
724723
})
@@ -841,10 +840,14 @@
841840
(.get_wait_secs options))
842841
num-workers (if (.is_set_num_workers options)
843842
(.get_num_workers options))
844-
executor-overrrides (if (.is_set_num_executors options)
843+
executor-overrides (if (.is_set_num_executors options)
845844
(.get_num_executors options)
846845
{})]
847-
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrrides] true)
846+
(doseq [[c num-executors] executor-overrides]
847+
(when (<= num-executors 0)
848+
(throw (InvalidTopologyException. "Number of executors must be greater than 0"))
849+
))
850+
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
848851
))
849852

850853
(activate [this storm-name]

src/jvm/backtype/storm/Config.java

+1-19
Original file line numberDiff line numberDiff line change
@@ -329,20 +329,6 @@ public class Config extends HashMap<String, Object> {
329329
*/
330330
public static String TOPOLOGY_TASKS = "topology.tasks";
331331

332-
/**
333-
* How many acker tasks should be spawned for the topology. An acker task keeps
334-
* track of a subset of the tuples emitted by spouts and detects when a spout
335-
* tuple is fully processed. When an acker task detects that a spout tuple
336-
* is finished, it sends a message to the spout to acknowledge the tuple. The
337-
* number of ackers should be scaled with the amount of throughput going
338-
* through the cluster for the topology. Typically, you don't need that many
339-
* ackers though.
340-
*
341-
* <p>If this is set to 0, then Storm will immediately ack tuples as soon
342-
* as they come off the spout, effectively disabling reliability.</p>
343-
*/
344-
public static String TOPOLOGY_ACKER_TASKS = "topology.acker.tasks";
345-
346332
/**
347333
* How many executors to spawn for ackers.
348334
*
@@ -529,12 +515,8 @@ public void setOptimize(boolean isOn) {
529515
public void setNumWorkers(int workers) {
530516
put(Config.TOPOLOGY_WORKERS, workers);
531517
}
532-
533-
public void setNumAckerTasks(int numTasks) {
534-
put(Config.TOPOLOGY_ACKER_TASKS, numTasks);
535-
}
536518

537-
public void setNumAckerExecutors(int numExecutors) {
519+
public void setNumAckers(int numExecutors) {
538520
put(Config.TOPOLOGY_ACKER_EXECUTORS, numExecutors);
539521
}
540522

0 commit comments

Comments
 (0)