Skip to content

Commit 0deabfb

Browse files
author
Nathan Marz
committed
get rid of TOPOLOGY-ACKER-TASKS config and make # acker tasks always equal to number of acker executors
1 parent 2708e40 commit 0deabfb

File tree

4 files changed

+9
-31
lines changed

4 files changed

+9
-31
lines changed

src/clj/backtype/storm/daemon/common.clj

+2-5
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,13 @@
166166

167167
(defn add-acker! [storm-conf ^StormTopology ret]
168168
(let [num-executors (storm-conf TOPOLOGY-ACKER-EXECUTORS)
169-
num-tasks (storm-conf TOPOLOGY-ACKER-TASKS)
170169
acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
171170
(new backtype.storm.daemon.acker)
172171
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
173172
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
174173
}
175174
:p num-executors
176-
:conf {TOPOLOGY-TASKS num-tasks
175+
:conf {TOPOLOGY-TASKS num-executors
177176
TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
178177
(dofor [[_ bolt] (.get_bolts ret)
179178
:let [common (.get_common bolt)]]
@@ -228,9 +227,7 @@
228227
))
229228

230229
(defn has-ackers? [storm-conf]
231-
(let [tasks (storm-conf TOPOLOGY-ACKER-TASKS)]
232-
(and (or (nil? tasks) (> tasks 0))
233-
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))))
230+
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))
234231

235232
(defn num-start-executors [component]
236233
(thrift/parallelism-hint (.get_common component)))

src/clj/backtype/storm/daemon/nimbus.clj

-1
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,6 @@
623623
(merge storm-conf
624624
{TOPOLOGY-KRYO-REGISTER (merge (mapify-serializations component-sers)
625625
(mapify-serializations base-sers))
626-
TOPOLOGY-ACKER-TASKS (or (total-conf TOPOLOGY-ACKER-TASKS) (total-conf TOPOLOGY-ACKER-EXECUTORS))
627626
TOPOLOGY-ACKER-EXECUTORS (total-conf TOPOLOGY-ACKER-EXECUTORS)
628627
TOPOLOGY-MAX-TASK-PARALLELISM (total-conf TOPOLOGY-MAX-TASK-PARALLELISM)
629628
})

src/jvm/backtype/storm/Config.java

+1-19
Original file line numberDiff line numberDiff line change
@@ -319,20 +319,6 @@ public class Config extends HashMap<String, Object> {
319319
*/
320320
public static String TOPOLOGY_TASKS = "topology.tasks";
321321

322-
/**
323-
* How many acker tasks should be spawned for the topology. An acker task keeps
324-
* track of a subset of the tuples emitted by spouts and detects when a spout
325-
* tuple is fully processed. When an acker task detects that a spout tuple
326-
* is finished, it sends a message to the spout to acknowledge the tuple. The
327-
* number of ackers should be scaled with the amount of throughput going
328-
* through the cluster for the topology. Typically, you don't need that many
329-
* ackers though.
330-
*
331-
* <p>If this is set to 0, then Storm will immediately ack tuples as soon
332-
* as they come off the spout, effectively disabling reliability.</p>
333-
*/
334-
public static String TOPOLOGY_ACKER_TASKS = "topology.acker.tasks";
335-
336322
/**
337323
* How many executors to spawn for ackers.
338324
*
@@ -519,12 +505,8 @@ public void setOptimize(boolean isOn) {
519505
public void setNumWorkers(int workers) {
520506
put(Config.TOPOLOGY_WORKERS, workers);
521507
}
522-
523-
public void setNumAckerTasks(int numTasks) {
524-
put(Config.TOPOLOGY_ACKER_TASKS, numTasks);
525-
}
526508

527-
public void setNumAckerExecutors(int numExecutors) {
509+
public void setNumAckers(int numExecutors) {
528510
put(Config.TOPOLOGY_ACKER_EXECUTORS, numExecutors);
529511
}
530512

test/clj/backtype/storm/nimbus_test.clj

+6-6
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
))
8585

8686
(deftest test-assignment
87-
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-TASKS 0}]
87+
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
8888
(let [state (:storm-cluster-state cluster)
8989
nimbus (:nimbus cluster)
9090
topology (thrift/mk-topology
@@ -120,7 +120,7 @@
120120
)))
121121

122122
(deftest test-over-parallelism-assignment
123-
(with-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-TASKS 0}]
123+
(with-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
124124
(let [state (:storm-cluster-state cluster)
125125
nimbus (:nimbus cluster)
126126
topology (thrift/mk-topology
@@ -146,7 +146,7 @@
146146
:daemon-conf {SUPERVISOR-ENABLE false
147147
NIMBUS-TASK-TIMEOUT-SECS 30
148148
NIMBUS-MONITOR-FREQ-SECS 10
149-
TOPOLOGY-ACKER-TASKS 0}]
149+
TOPOLOGY-ACKER-EXECUTORS 0}]
150150
(letlocals
151151
(bind conf (:daemon-conf cluster))
152152
(bind topology (thrift/mk-topology
@@ -235,7 +235,7 @@
235235
NIMBUS-TASK-TIMEOUT-SECS 20
236236
NIMBUS-MONITOR-FREQ-SECS 10
237237
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
238-
TOPOLOGY-ACKER-TASKS 0}]
238+
TOPOLOGY-ACKER-EXECUTORS 0}]
239239
(letlocals
240240
(bind conf (:daemon-conf cluster))
241241
(bind topology (thrift/mk-topology
@@ -338,7 +338,7 @@
338338
NIMBUS-TASK-LAUNCH-SECS 60
339339
NIMBUS-TASK-TIMEOUT-SECS 20
340340
NIMBUS-MONITOR-FREQ-SECS 10
341-
TOPOLOGY-ACKER-TASKS 0}]
341+
TOPOLOGY-ACKER-EXECUTORS 0}]
342342
(letlocals
343343
(bind topology (thrift/mk-topology
344344
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 9)}
@@ -384,7 +384,7 @@
384384
:daemon-conf {SUPERVISOR-ENABLE false
385385
NIMBUS-MONITOR-FREQ-SECS 10
386386
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
387-
TOPOLOGY-ACKER-TASKS 0}]
387+
TOPOLOGY-ACKER-EXECUTORS 0}]
388388
(letlocals
389389
(bind topology (thrift/mk-topology
390390
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}

0 commit comments

Comments
 (0)