Skip to content

Commit a0e52c7

Browse files
author
Nathan Marz
committed
fix isolation scheduler to properly clear a machine of workers when putting an isolated topology there
1 parent a39add8 commit a0e52c7

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

src/clj/backtype/storm/scheduler/IsolationScheduler.clj

+11-3
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@
9696
(LinkedList. <>)
9797
))
9898

99+
(defn- host->used-slots [^Cluster cluster]
100+
(->> cluster
101+
.getUsedSlots
102+
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
103+
))
104+
99105
(defn- distribution->sorted-amts [distribution]
100106
(->> distribution
101107
(mapcat (fn [[val amt]] (repeat amt val)))
@@ -156,7 +162,8 @@
156162
^Set worker-specs (get topology-worker-specs top-id)
157163
num-workers (count host-assignments)
158164
]
159-
(if (and (every? #(= (second %) top-id) assignments)
165+
(if (and (contains? iso-ids-set top-id)
166+
(every? #(= (second %) top-id) assignments)
160167
(contains? distribution num-workers)
161168
(every? #(contains? worker-specs (nth % 2)) assignments))
162169
(do (decrement-distribution! distribution num-workers)
@@ -168,15 +175,16 @@
168175
))
169176
)))
170177

171-
(let [^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
178+
(let [host->used-slots (host->used-slots cluster)
179+
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
172180
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
173181
(doseq [[top-id worker-specs] topology-worker-specs
174182
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
175183
(doseq [amt amts
176184
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
177185
(when (and host-slots (>= (count host-slots) amt))
178186
(.poll sorted-assignable-hosts)
179-
(.freeSlots cluster host-slots)
187+
(.freeSlots cluster (get host->used-slots host))
180188
(doseq [slot (take amt host-slots)
181189
:let [executors-set (remove-elem-from-set! worker-specs)]]
182190
(.assign cluster slot top-id executors-set))

src/jvm/backtype/storm/scheduler/Cluster.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,10 @@ public void freeSlot(WorkerSlot slot) {
320320
* @param slots
321321
*/
322322
public void freeSlots(Collection<WorkerSlot> slots) {
323-
for (WorkerSlot slot : slots) {
324-
this.freeSlot(slot);
323+
if(slots!=null) {
324+
for (WorkerSlot slot : slots) {
325+
this.freeSlot(slot);
326+
}
325327
}
326328
}
327329

0 commit comments

Comments
 (0)