Skip to content

Commit 2ec670b

Browse files
author
Nathan Marz
committed
fix checking of correct nodes in isolation scheduler, add test that isolation scheduler doesn't reassign after correct assignment
1 parent a0e52c7 commit 2ec670b

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

src/clj/backtype/storm/scheduler/IsolationScheduler.clj

+2-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
9494
(dissoc <> nil)
9595
(sort-by #(-> % second count -) <>)
96+
shuffle
9697
(LinkedList. <>)
9798
))
9899

@@ -160,7 +161,7 @@
160161
(let [top-id (-> assignments first second)
161162
distribution (get topology-machine-distribution top-id)
162163
^Set worker-specs (get topology-worker-specs top-id)
163-
num-workers (count host-assignments)
164+
num-workers (count assignments)
164165
]
165166
(if (and (contains? iso-ids-set top-id)
166167
(every? #(= (second %) top-id) assignments)

test/clj/backtype/storm/nimbus_test.clj

+23-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@
4848
set
4949
)))
5050

51+
(defn topology-slots [state storm-name]
52+
(let [storm-id (get-storm-id state storm-name)
53+
assignment (.assignment-info state storm-id nil)]
54+
(->> assignment
55+
:executor->node+port
56+
vals
57+
set
58+
)))
59+
5160
(defn topology-node-distribution [state storm-name]
5261
(let [storm-id (get-storm-id state storm-name)
5362
assignment (.assignment-info state storm-id nil)]
@@ -189,13 +198,14 @@
189198
))))
190199

191200
(deftest test-isolated-assignment
192-
(with-local-cluster [cluster :supervisors 6
201+
(with-simulated-time-local-cluster [cluster :supervisors 6
193202
:ports-per-supervisor 3
194203
:inimbus (isolation-nimbus)
195204
:daemon-conf {SUPERVISOR-ENABLE false
196205
TOPOLOGY-ACKER-EXECUTORS 0
197206
STORM-SCHEDULER "backtype.storm.scheduler.IsolationScheduler"
198207
ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
208+
NIMBUS-MONITOR-FREQ-SECS 10
199209
}]
200210
(letlocals
201211
(bind state (:storm-cluster-state cluster))
@@ -206,11 +216,13 @@
206216
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}))
207217

208218
(submit-local-topology nimbus "noniso" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 4} topology)
219+
(advance-cluster-time cluster 1)
209220
(is (= 4 (topology-num-nodes state "noniso")))
210221
(is (= 4 (storm-num-workers state "noniso")))
211222

212223
(submit-local-topology nimbus "tester1" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
213224
(submit-local-topology nimbus "tester2" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
225+
(advance-cluster-time cluster 1)
214226

215227
(bind task-info-tester1 (storm-component->task-info cluster "tester1"))
216228
(bind task-info-tester2 (storm-component->task-info cluster "tester2"))
@@ -227,6 +239,16 @@
227239
(check-consistency cluster "tester1")
228240
(check-consistency cluster "tester2")
229241
(check-consistency cluster "noniso")
242+
243+
;;check that nothing gets reassigned
244+
(bind tester1-slots (topology-slots state "tester1"))
245+
(bind tester2-slots (topology-slots state "tester2"))
246+
(bind noniso-slots (topology-slots state "noniso"))
247+
(advance-cluster-time cluster 20)
248+
(is (= tester1-slots (topology-slots state "tester1")))
249+
(is (= tester2-slots (topology-slots state "tester2")))
250+
(is (= noniso-slots (topology-slots state "noniso")))
251+
230252
)))
231253

232254
(deftest test-zero-executor-or-tasks

0 commit comments

Comments
 (0)