|
| 1 | +(ns backtype.storm.scheduler.IsolationScheduler |
| 2 | + (:use [backtype.storm util config log]) |
| 3 | + (:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler]) |
| 4 | + (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap]) |
| 5 | + (:import [backtype.storm.scheduler IScheduler Topologies |
| 6 | + Cluster TopologyDetails WorkerSlot SchedulerAssignment |
| 7 | + EvenScheduler ExecutorDetails]) |
| 8 | + (:gen-class |
| 9 | + :init init |
| 10 | + :constructors {[] []} |
| 11 | + :state state |
| 12 | + :implements [backtype.storm.scheduler.IScheduler])) |
| 13 | + |
| 14 | +(defn -init [] |
| 15 | + [[] (container)]) |
| 16 | + |
| 17 | +(defn -prepare [this conf] |
| 18 | + (container-set! (.state this) conf)) |
| 19 | + |
| 20 | + |
| 21 | +(defn- compute-worker-specs "Returns list of sets of executors" |
| 22 | + [^TopologyDetails details] |
| 23 | + (->> (.getExecutorToComponent details) |
| 24 | + reverse-map |
| 25 | + (map second) |
| 26 | + (apply interleave-all) |
| 27 | + (partition-fixed (.getNumWorkers details)) |
| 28 | + (map set))) |
| 29 | + |
| 30 | +(defn- compute-worker-specs "Returns mutable set of sets of executors" |
| 31 | + [^TopologyDetails details] |
| 32 | + (->> (.getExecutorToComponent details) |
| 33 | + reverse-map |
| 34 | + (map second) |
| 35 | + (apply concat) |
| 36 | + (map vector (repeat-seq (range (.getNumWorkers details)))) |
| 37 | + (group-by first) |
| 38 | + (map-val #(map second %)) |
| 39 | + vals |
| 40 | + (map set) |
| 41 | + (HashSet.) |
| 42 | + )) |
| 43 | + |
| 44 | +(defn isolated-topologies [conf topologies] |
| 45 | + (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)] |
| 46 | + (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies) |
| 47 | + )) |
| 48 | + |
| 49 | +;; map from topology id -> set of sets of executors |
| 50 | +(defn topology-worker-specs [iso-topologies] |
| 51 | + (->> iso-topologies |
| 52 | + (map (fn [t] {(.getId t) (compute-worker-specs t)})) |
| 53 | + (apply merge))) |
| 54 | + |
| 55 | +(defn machine-distribution [conf ^TopologyDetails topology] |
| 56 | + (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES) |
| 57 | + machines (get name->machines (.getName topology)) |
| 58 | + workers (.getNumWorkers topology)] |
| 59 | + (-> (integer-divided workers machines) |
| 60 | + (dissoc 0) |
| 61 | + (HashMap.) |
| 62 | + ))) |
| 63 | + |
| 64 | +(defn topology-machine-distribution [conf iso-topologies] |
| 65 | + (->> iso-topologies |
| 66 | + (map (fn [t] {(.getId t) (machine-distribution conf t)})) |
| 67 | + (apply merge))) |
| 68 | + |
| 69 | +(defn host-assignments [^Cluster cluster] |
| 70 | + (letfn [(to-slot-specs [^SchedulerAssignment ass] |
| 71 | + (->> ass |
| 72 | + .getExecutorToSlot |
| 73 | + reverse-map |
| 74 | + (map (fn [[slot executors]] |
| 75 | + [slot (.getTopologyId ass) (set executors)]))))] |
| 76 | + (->> cluster |
| 77 | + .getAssignments |
| 78 | + vals |
| 79 | + (mapcat to-slot-specs) |
| 80 | + (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot)))) |
| 81 | + ))) |
| 82 | + |
| 83 | +(defn- decrement-distribution! [^Map distribution value] |
| 84 | + (let [v (-> distribution (get value) dec)] |
| 85 | + (if (zero? v) |
| 86 | + (.remove distribution value) |
| 87 | + (.put distribution value v)))) |
| 88 | + |
| 89 | +;; returns list of list of slots, reverse sorted by number of slots |
| 90 | +(defn- host-assignable-slots [^Cluster cluster] |
| 91 | + (-<> cluster |
| 92 | + .getAssignableSlots |
| 93 | + (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>) |
| 94 | + (dissoc <> nil) |
| 95 | + (sort-by #(-> % second count -) <>) |
| 96 | + shuffle |
| 97 | + (LinkedList. <>) |
| 98 | + )) |
| 99 | + |
| 100 | +(defn- host->used-slots [^Cluster cluster] |
| 101 | + (->> cluster |
| 102 | + .getUsedSlots |
| 103 | + (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %))) |
| 104 | + )) |
| 105 | + |
| 106 | +(defn- distribution->sorted-amts [distribution] |
| 107 | + (->> distribution |
| 108 | + (mapcat (fn [[val amt]] (repeat amt val))) |
| 109 | + (sort-by -) |
| 110 | + )) |
| 111 | + |
| 112 | +(defn- allocated-topologies [topology-worker-specs] |
| 113 | + (->> topology-worker-specs |
| 114 | + (filter (fn [[_ worker-specs]] (empty? worker-specs))) |
| 115 | + (map first) |
| 116 | + set |
| 117 | + )) |
| 118 | + |
| 119 | +(defn- leftover-topologies [^Topologies topologies filter-ids-set] |
| 120 | + (->> topologies |
| 121 | + .getTopologies |
| 122 | + (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t))))) |
| 123 | + (map (fn [^TopologyDetails t] {(.getId t) t})) |
| 124 | + (apply merge) |
| 125 | + (Topologies.) |
| 126 | + )) |
| 127 | + |
| 128 | +;; for each isolated topology: |
| 129 | +;; compute even distribution of executors -> workers on the number of workers specified for the topology |
| 130 | +;; compute distribution of workers to machines |
| 131 | +;; determine host -> list of [slot, topology id, executors] |
| 132 | +;; iterate through hosts and: a machine is good if: |
| 133 | +;; 1. only running workers from one isolated topology |
| 134 | +;; 2. all workers running on it match one of the distributions of executors for that topology |
| 135 | +;; 3. matches one of the # of workers |
| 136 | +;; blacklist the good hosts and remove those workers from the list of need to be assigned workers |
| 137 | +;; otherwise unassign all other workers for isolated topologies if assigned |
| 138 | + |
| 139 | +(defn remove-elem-from-set! [^Set aset] |
| 140 | + (let [elem (-> aset .iterator .next)] |
| 141 | + (.remove aset elem) |
| 142 | + elem |
| 143 | + )) |
| 144 | + |
| 145 | +;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned) |
| 146 | +;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors]) |
| 147 | +;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time) |
| 148 | +;; blacklist all machines who had production slots defined |
| 149 | +;; log isolated topologies who weren't able to get enough slots / machines |
| 150 | +;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines |
| 151 | +;; set blacklist to what it was initially |
| 152 | +(defn -schedule [this ^Topologies topologies ^Cluster cluster] |
| 153 | + (let [conf (container-get (.state this)) |
| 154 | + orig-blacklist (HashSet. (.getBlacklistedHosts cluster)) |
| 155 | + iso-topologies (isolated-topologies conf (.getTopologies topologies)) |
| 156 | + iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set) |
| 157 | + topology-worker-specs (topology-worker-specs iso-topologies) |
| 158 | + topology-machine-distribution (topology-machine-distribution conf iso-topologies) |
| 159 | + host-assignments (host-assignments cluster)] |
| 160 | + (doseq [[host assignments] host-assignments] |
| 161 | + (let [top-id (-> assignments first second) |
| 162 | + distribution (get topology-machine-distribution top-id) |
| 163 | + ^Set worker-specs (get topology-worker-specs top-id) |
| 164 | + num-workers (count assignments) |
| 165 | + ] |
| 166 | + (if (and (contains? iso-ids-set top-id) |
| 167 | + (every? #(= (second %) top-id) assignments) |
| 168 | + (contains? distribution num-workers) |
| 169 | + (every? #(contains? worker-specs (nth % 2)) assignments)) |
| 170 | + (do (decrement-distribution! distribution num-workers) |
| 171 | + (doseq [[_ _ executors] assignments] (.remove worker-specs executors)) |
| 172 | + (.blacklistHost cluster host)) |
| 173 | + (doseq [[slot top-id _] assignments] |
| 174 | + (when (contains? iso-ids-set top-id) |
| 175 | + (.freeSlot cluster slot) |
| 176 | + )) |
| 177 | + ))) |
| 178 | + |
| 179 | + (let [host->used-slots (host->used-slots cluster) |
| 180 | + ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)] |
| 181 | + ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers |
| 182 | + (doseq [[top-id worker-specs] topology-worker-specs |
| 183 | + :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]] |
| 184 | + (doseq [amt amts |
| 185 | + :let [[host host-slots] (.peek sorted-assignable-hosts)]] |
| 186 | + (when (and host-slots (>= (count host-slots) amt)) |
| 187 | + (.poll sorted-assignable-hosts) |
| 188 | + (.freeSlots cluster (get host->used-slots host)) |
| 189 | + (doseq [slot (take amt host-slots) |
| 190 | + :let [executors-set (remove-elem-from-set! worker-specs)]] |
| 191 | + (.assign cluster slot top-id executors-set)) |
| 192 | + (.blacklistHost cluster host)) |
| 193 | + ))) |
| 194 | + |
| 195 | + (doseq [[top-id worker-specs] topology-worker-specs] |
| 196 | + (if-not (empty? worker-specs) |
| 197 | + (log-warn "Unable to isolate topology " top-id) |
| 198 | + )) |
| 199 | + |
| 200 | + |
| 201 | + ;; run default scheduler on iso topologies that didn't have enough slot + non-isolated topologies |
| 202 | + (-<> topology-worker-specs |
| 203 | + allocated-topologies |
| 204 | + (leftover-topologies topologies <>) |
| 205 | + (DefaultScheduler/default-schedule <> cluster)) |
| 206 | + (.setBlacklistedHosts cluster orig-blacklist) |
| 207 | + )) |
0 commit comments