|
17 | 17 | (into {} (for [[executor [node port]] executor->node+port]
|
18 | 18 | {(ExecutorDetails. (first executor) (second executor)) (WorkerSlot. node port)}))))
|
19 | 19 |
|
| 20 | +(defn get-alive-assigned-node+port->executors [cluster topology-id] |
| 21 | + (let [existing-assignment (.getAssignmentById cluster topology-id) |
| 22 | + executor->slot (if existing-assignment |
| 23 | + (.getExecutorToSlots existing-assignment) |
| 24 | + {}) |
| 25 | + executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot |
| 26 | + :let [executor [(.getStartTask executor) (.getEndTask executor)] |
| 27 | + node+port [(.getNodeId slot) (.getPort slot)]]] |
| 28 | + {executor node+port})) |
| 29 | + alive-assigned (reverse-map executor->node+port)] |
| 30 | + alive-assigned)) |
20 | 31 |
|
21 |
| -(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster keeper-slots-fn] |
| 32 | +(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster] |
22 | 33 | (let [topology-id (.getId topology)
|
| 34 | + topology-conf (.getConf topology) |
23 | 35 | available-slots (->> (.getAvailableSlots cluster)
|
24 | 36 | (map #(vector (.getNodeId %) (.getPort %))))
|
25 | 37 | all-executors (->> topology
|
26 | 38 | .getExecutors
|
27 | 39 | (map #(vector (.getStartTask %) (.getEndTask %)))
|
28 | 40 | set)
|
29 |
| - existing-assignment (.getAssignmentById cluster topology-id) |
30 |
| - executor->node+port (if-not existing-assignment |
31 |
| - {} |
32 |
| - (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] (.getExecutorToSlots existing-assignment)] |
33 |
| - {[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]}))) |
34 |
| - alive-assigned (reverse-map executor->node+port) |
35 |
| - topology-conf (.getConf topology) |
| 41 | + alive-assigned (get-alive-assigned-node+port->executors cluster topology-id) |
36 | 42 | total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
|
37 | 43 | (+ (count available-slots) (count alive-assigned)))
|
38 |
| - keep-assigned (if keeper-slots-fn |
39 |
| - (keeper-slots-fn alive-assigned (count all-executors) total-slots-to-use) |
40 |
| - alive-assigned |
41 |
| - ) |
42 |
| - freed-slots (keys (apply dissoc alive-assigned (keys keep-assigned))) |
43 |
| - reassign-slots (take (- total-slots-to-use (count keep-assigned)) |
| 44 | + freed-slots (keys (apply dissoc alive-assigned (keys alive-assigned))) |
| 45 | + reassign-slots (take (- total-slots-to-use (count alive-assigned)) |
44 | 46 | (sort-slots (concat available-slots freed-slots)))
|
45 |
| - reassign-executors (sort (set/difference all-executors (set (apply concat (vals keep-assigned))))) |
| 47 | + reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned))))) |
46 | 48 | reassignment (into {}
|
47 | 49 | (map vector
|
48 | 50 | reassign-executors
|
49 | 51 | ;; for some reason it goes into infinite loop without limiting the repeat-seq
|
50 | 52 | (repeat-seq (count reassign-executors) reassign-slots)))
|
51 |
| - stay-assignment (into {} (mapcat (fn [[node+port executors]] (for [executor executors] [executor node+port])) keep-assigned))] |
| 53 | + stay-assignment (into {} (mapcat (fn [[node+port executors]] (for [executor executors] [executor node+port])) alive-assigned))] |
52 | 54 | (when-not (empty? reassignment)
|
53 | 55 | (log-message "Available slots: " (pr-str available-slots))
|
54 | 56 | )
|
55 | 57 | (mk-scheduler-assignment topology-id (merge stay-assignment reassignment))))
|
56 | 58 |
|
57 |
| -(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster keeper-slots-fn] |
| 59 | +(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster] |
58 | 60 | (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
|
59 | 61 | (doseq [^TopologyDetails topology needs-scheduling-topologies
|
60 | 62 | :let [topology-id (.getId topology)
|
61 |
| - new-assignment (schedule-topology topology cluster keeper-slots-fn)]] |
| 63 | + new-assignment (schedule-topology topology cluster)]] |
62 | 64 | (.setAssignmentById cluster topology-id new-assignment))))
|
63 |
| - |
| 65 | + |
64 | 66 | (defn -schedule [this ^Topologies topologies ^Cluster cluster]
|
65 |
| - (schedule-topologies-evenly topologies cluster nil)) |
| 67 | + (schedule-topologies-evenly topologies cluster)) |
0 commit comments