|
96 | 96 | (LinkedList. <>)
|
97 | 97 | ))
|
98 | 98 |
|
| 99 | +(defn- host->used-slots [^Cluster cluster] |
| 100 | + (->> cluster |
| 101 | + .getUsedSlots |
| 102 | + (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %))) |
| 103 | + )) |
| 104 | + |
99 | 105 | (defn- distribution->sorted-amts [distribution]
|
100 | 106 | (->> distribution
|
101 | 107 | (mapcat (fn [[val amt]] (repeat amt val)))
|
|
156 | 162 | ^Set worker-specs (get topology-worker-specs top-id)
|
157 | 163 | num-workers (count host-assignments)
|
158 | 164 | ]
|
159 |
| - (if (and (every? #(= (second %) top-id) assignments) |
| 165 | + (if (and (contains? iso-ids-set top-id) |
| 166 | + (every? #(= (second %) top-id) assignments) |
160 | 167 | (contains? distribution num-workers)
|
161 | 168 | (every? #(contains? worker-specs (nth % 2)) assignments))
|
162 | 169 | (do (decrement-distribution! distribution num-workers)
|
|
168 | 175 | ))
|
169 | 176 | )))
|
170 | 177 |
|
171 |
| - (let [^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)] |
| 178 | + (let [host->used-slots (host->used-slots cluster) |
| 179 | + ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)] |
172 | 180 | ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
|
173 | 181 | (doseq [[top-id worker-specs] topology-worker-specs
|
174 | 182 | :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
|
175 | 183 | (doseq [amt amts
|
176 | 184 | :let [[host host-slots] (.peek sorted-assignable-hosts)]]
|
177 | 185 | (when (and host-slots (>= (count host-slots) amt))
|
178 | 186 | (.poll sorted-assignable-hosts)
|
179 |
| - (.freeSlots cluster host-slots) |
| 187 | + (.freeSlots cluster (get host->used-slots host)) |
180 | 188 | (doseq [slot (take amt host-slots)
|
181 | 189 | :let [executors-set (remove-elem-from-set! worker-specs)]]
|
182 | 190 | (.assign cluster slot top-id executors-set))
|
|
0 commit comments