|
| 1 | +(ns backtype.storm.scheduler-test |
| 2 | + (:use [clojure test]) |
| 3 | + (:use [backtype.storm bootstrap config testing]) |
| 4 | + (:import [backtype.storm.generated StormTopology]) |
| 5 | + (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails |
| 6 | + SchedulerAssignment Topologies TopologyDetails])) |
| 7 | + |
| 8 | +(bootstrap) |
| 9 | + |
| 10 | +(defn clojurify-executor->slot [executorToSlot] |
| 11 | + (into {} (for [[executor slot] executorToSlot] |
| 12 | + {[(.getStartTask executor) (.getEndTask executor)] |
| 13 | + [(.getNodeId slot) (.getPort slot)]}))) |
| 14 | + |
| 15 | +(defn clojurify-executor->comp [executorToComp] |
| 16 | + (into {} (for [[executor component] executorToComp] |
| 17 | + {[(.getStartTask executor) (.getEndTask executor)] component}))) |
| 18 | + |
| 19 | +(defn clojurify-component->executors [compToExecutor] |
| 20 | + (into {} (for [[component executors] compToExecutor |
| 21 | + :let [new-executors (set (map #(vector (.getStartTask %) (.getEndTask %)) executors))]] |
| 22 | + {component new-executors}))) |
| 23 | + |
| 24 | +(deftest test-supervisor-details |
| 25 | + (let [executor->slot {(ExecutorDetails. (int 1) (int 5)) (WorkerSlot. "supervisor1" (int 1)) |
| 26 | + (ExecutorDetails. (int 6) (int 10)) (WorkerSlot. "supervisor2" (int 2))} |
| 27 | + topology-id "topology1" |
| 28 | + assignment (SchedulerAssignment. topology-id executor->slot)] |
| 29 | + ;; test assign |
| 30 | + (.assign assignment (WorkerSlot. "supervisor1" 1) |
| 31 | + (list (ExecutorDetails. (int 11) (int 15)) (ExecutorDetails. (int 16) (int 20)))) |
| 32 | + (is (= {[1 5] ["supervisor1" 1] |
| 33 | + [6 10] ["supervisor2" 2] |
| 34 | + [11 15] ["supervisor1" 1] |
| 35 | + [16 20] ["supervisor1" 1]} |
| 36 | + (clojurify-executor->slot (.getExecutorToSlot assignment)))) |
| 37 | + ;; test isSlotOccupied |
| 38 | + (is (= true (.isSlotOccupied assignment (WorkerSlot. "supervisor2" (int 2))))) |
| 39 | + (is (= true (.isSlotOccupied assignment (WorkerSlot. "supervisor1" (int 1))))) |
| 40 | + |
| 41 | + ;; test isExecutorAssigned |
| 42 | + (is (= true (.isExecutorAssigned assignment (ExecutorDetails. (int 1) (int 5))))) |
| 43 | + (is (= false (.isExecutorAssigned assignment (ExecutorDetails. (int 21) (int 25))))) |
| 44 | + |
| 45 | + ;; test unassignBySlot |
| 46 | + (.unassignBySlot assignment (WorkerSlot. "supervisor1" (int 1))) |
| 47 | + (is (= {[6 10] ["supervisor2" 2]} |
| 48 | + (clojurify-executor->slot (.getExecutorToSlot assignment)))) |
| 49 | + |
| 50 | + ;; test unassignByExecutor |
| 51 | + (.unassignByExecutor assignment (ExecutorDetails. (int 6) (int 10))) |
| 52 | + (is (= true |
| 53 | + (empty? (.getExecutorToSlot assignment)))) |
| 54 | + )) |
| 55 | + |
| 56 | +(deftest test-topologies |
| 57 | + (let [executor1 (ExecutorDetails. (int 1) (int 5)) |
| 58 | + executor2 (ExecutorDetails. (int 6) (int 10)) |
| 59 | + topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) |
| 60 | + {executor1 "spout1" |
| 61 | + executor2 "bolt1"}) |
| 62 | + ;; test topology.selectExecutorToComponent |
| 63 | + executor->comp (.selectExecutorToComponent topology1 (list executor1)) |
| 64 | + _ (is (= (clojurify-executor->comp {executor1 "spout1"}) |
| 65 | + (clojurify-executor->comp executor->comp))) |
| 66 | + ;; test topologies.getById |
| 67 | + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) {}) |
| 68 | + topologies (Topologies. {"topology1" topology1 "topology2" topology2}) |
| 69 | + _ (is (= "topology1" (->> "topology1" |
| 70 | + (.getById topologies) |
| 71 | + .getId))) |
| 72 | + ;; test topologies.getByName |
| 73 | + _ (is (= "topology2" (->> "topology-name-2" |
| 74 | + (.getByName topologies) |
| 75 | + .getId))) |
| 76 | + ] |
| 77 | + ) |
| 78 | + ) |
| 79 | + |
| 80 | +(deftest test-cluster |
| 81 | + (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9))) |
| 82 | + supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10))) |
| 83 | + executor1 (ExecutorDetails. (int 1) (int 5)) |
| 84 | + executor2 (ExecutorDetails. (int 6) (int 10)) |
| 85 | + executor3 (ExecutorDetails. (int 11) (int 15)) |
| 86 | + executor11 (ExecutorDetails. (int 100) (int 105)) |
| 87 | + executor12 (ExecutorDetails. (int 106) (int 110)) |
| 88 | + executor21 (ExecutorDetails. (int 201) (int 205)) |
| 89 | + executor22 (ExecutorDetails. (int 206) (int 210)) |
| 90 | + ;; topology1 needs scheduling: executor3 is NOT assigned a slot. |
| 91 | + topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" TOPOLOGY-WORKERS 2} |
| 92 | + (StormTopology.) |
| 93 | + {executor1 "spout1" |
| 94 | + executor2 "bolt1" |
| 95 | + executor3 "bolt2"}) |
| 96 | + ;; topology2 is fully scheduled |
| 97 | + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-WORKERS 2} |
| 98 | + (StormTopology.) |
| 99 | + {executor11 "spout11" |
| 100 | + executor12 "bolt12"}) |
| 101 | + ;; topology3 needs scheduling, since the assignment is squeezed |
| 102 | + topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" TOPOLOGY-WORKERS 2} |
| 103 | + (StormTopology.) |
| 104 | + {executor21 "spout21" |
| 105 | + executor22 "bolt22"}) |
| 106 | + topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) |
| 107 | + executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) |
| 108 | + executor2 (WorkerSlot. "supervisor2" (int 2))} |
| 109 | + executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 3)) |
| 110 | + executor12 (WorkerSlot. "supervisor2" (int 4))} |
| 111 | + executor->slot3 {executor21 (WorkerSlot. "supervisor1" (int 5)) |
| 112 | + executor22 (WorkerSlot. "supervisor1" (int 5))} |
| 113 | + assignment1 (SchedulerAssignment. "topology1" executor->slot1) |
| 114 | + assignment2 (SchedulerAssignment. "topology2" executor->slot2) |
| 115 | + assignment3 (SchedulerAssignment. "topology3" executor->slot3) |
| 116 | + cluster (Cluster. {"supervisor1" supervisor1 "supervisor2" supervisor2} |
| 117 | + {"topology1" assignment1 "topology2" assignment2 "topology3" assignment3})] |
| 118 | + ;; test Cluster constructor |
| 119 | + (is (= #{"supervisor1" "supervisor2"} |
| 120 | + (->> cluster |
| 121 | + .getSupervisors |
| 122 | + keys |
| 123 | + set))) |
| 124 | + (is (= #{"topology1" "topology2" "topology3"} |
| 125 | + (->> cluster |
| 126 | + .getAssignments |
| 127 | + keys |
| 128 | + set))) |
| 129 | + |
| 130 | + ;; test Cluster.getUnassignedExecutors |
| 131 | + (is (= (set (list executor3)) |
| 132 | + (-> cluster |
| 133 | + (.getUnassignedExecutors topology1) |
| 134 | + set))) |
| 135 | + (is (= true |
| 136 | + (empty? (-> cluster |
| 137 | + (.getUnassignedExecutors topology2))))) |
| 138 | + ;; test Cluster.needsScheduling |
| 139 | + (is (= true (.needsScheduling cluster topology1))) |
| 140 | + (is (= false (.needsScheduling cluster topology2))) |
| 141 | + (is (= true (.needsScheduling cluster topology3))) |
| 142 | + ;; test Cluster.needsSchedulingTopologies |
| 143 | + (is (= #{"topology1" "topology3"} |
| 144 | + (->> (.needsSchedulingTopologies cluster topologies) |
| 145 | + (map (fn [topology] (.getId topology))) |
| 146 | + set))) |
| 147 | + |
| 148 | + ;; test Cluster.getNeedsSchedulingExecutorToComponents |
| 149 | + (is (= {executor3 "bolt2"} |
| 150 | + (.getNeedsSchedulingExecutorToComponents cluster topology1))) |
| 151 | + (is (= true |
| 152 | + (empty? (.getNeedsSchedulingExecutorToComponents cluster topology2)))) |
| 153 | + (is (= true |
| 154 | + (empty? (.getNeedsSchedulingExecutorToComponents cluster topology3)))) |
| 155 | + |
| 156 | + ;; test Cluster.getNeedsSchedulingComponentToExecutors |
| 157 | + (is (= {"bolt2" #{[(.getStartTask executor3) (.getEndTask executor3)]}} |
| 158 | + (clojurify-component->executors (.getNeedsSchedulingComponentToExecutors cluster topology1)))) |
| 159 | + (is (= true |
| 160 | + (empty? (.getNeedsSchedulingComponentToExecutors cluster topology2)))) |
| 161 | + (is (= true |
| 162 | + (empty? (.getNeedsSchedulingComponentToExecutors cluster topology3)))) |
| 163 | + |
| 164 | + ;; test Cluster.getUsedPorts |
| 165 | + (is (= #{1 3 5} (set (.getUsedPorts cluster supervisor1)))) |
| 166 | + (is (= #{2 4} (set (.getUsedPorts cluster supervisor2)))) |
| 167 | + (is (= #{1 3 5} (set (.getUsedPorts cluster supervisor1)))) |
| 168 | + |
| 169 | + ;; test Cluster.getAvailablePorts |
| 170 | + (is (= #{7 9} (set (.getAvailablePorts cluster supervisor1)))) |
| 171 | + (is (= #{6 8 10} (set (.getAvailablePorts cluster supervisor2)))) |
| 172 | + |
| 173 | + ;; test Cluster.getAvailableSlots |
| 174 | + (is (= #{["supervisor1" 7] ["supervisor1" 9]} (set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster supervisor1))))) |
| 175 | + (is (= #{["supervisor2" 6] ["supervisor2" 8] ["supervisor2" 10]} (set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster supervisor2))))) |
| 176 | + ;; test Cluster.getAvailableSlots |
| 177 | + (is (= #{["supervisor1" 7] ["supervisor1" 9] ["supervisor2" 6] ["supervisor2" 8] ["supervisor2" 10]} |
| 178 | + (set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster))))) |
| 179 | + ;; test Cluster.getAssignedNumWorkers |
| 180 | + (is (= 2 (.getAssignedNumWorkers cluster topology1))) |
| 181 | + (is (= 2 (.getAssignedNumWorkers cluster topology2))) |
| 182 | + (is (= 1 (.getAssignedNumWorkers cluster topology3))) |
| 183 | + |
| 184 | + ;; test Cluster.isSlotOccupied |
| 185 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1))))) |
| 186 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) |
| 187 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) |
| 188 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7))))) |
| 189 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 9))))) |
| 190 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 2))))) |
| 191 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 4))))) |
| 192 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 6))))) |
| 193 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 8))))) |
| 194 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 10))))) |
| 195 | + ;; test Cluster.getAssignmentById |
| 196 | + (is (= assignment1 (.getAssignmentById cluster "topology1"))) |
| 197 | + (is (= assignment2 (.getAssignmentById cluster "topology2"))) |
| 198 | + (is (= assignment3 (.getAssignmentById cluster "topology3"))) |
| 199 | + ;; test Cluster.getSupervisorById |
| 200 | + (is (= supervisor1 (.getSupervisorById cluster "supervisor1"))) |
| 201 | + (is (= supervisor2 (.getSupervisorById cluster "supervisor2"))) |
| 202 | + ;; test Cluster.getSupervisorsByHost |
| 203 | + (is (= #{supervisor1} (set (.getSupervisorsByHost cluster "192.168.0.1")))) |
| 204 | + (is (= #{supervisor2} (set (.getSupervisorsByHost cluster "192.168.0.2")))) |
| 205 | + |
| 206 | + ;; ==== the following tests will change the state of the cluster, so put it here at the end ==== |
| 207 | + ;; test Cluster.assign |
| 208 | + (.assign cluster (WorkerSlot. "supervisor1" (int 7)) "topology1" (list executor3)) |
| 209 | + (is (= false (.needsScheduling cluster topology1))) |
| 210 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7))))) |
| 211 | + |
| 212 | + ;; test Cluster.assign: if a executor is already assigned, there will be an exception |
| 213 | + (let [has-exception (try |
| 214 | + (.assign cluster (WorkerSlot. "supervisor1" (int 9)) "topology1" (list executor1)) |
| 215 | + false |
| 216 | + (catch Exception e |
| 217 | + true))] |
| 218 | + (is (= true has-exception))) |
| 219 | + |
| 220 | + ;; test Cluster.assign: if a slot is occupied, there will be an exception |
| 221 | + (.setAssignmentById cluster "topology1" (SchedulerAssignment. "topology1" {})) |
| 222 | + (let [has-exception (try |
| 223 | + (.assign cluster (WorkerSlot. "supervisor2" (int 4)) "topology1" (list executor1)) |
| 224 | + false |
| 225 | + (catch Exception e |
| 226 | + true))] |
| 227 | + (is (= true has-exception))) |
| 228 | + |
| 229 | + ;; revert the changes |
| 230 | + (.setAssignmentById cluster "topology1" assignment1) |
| 231 | + ;; test Cluster.freeSlot |
| 232 | + (.freeSlot cluster (WorkerSlot. "supervisor1" (int 7))) |
| 233 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7))))) |
| 234 | + |
| 235 | + ;; test Cluster.freeSlots |
| 236 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1))))) |
| 237 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) |
| 238 | + (is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) |
| 239 | + (.freeSlots cluster (list (WorkerSlot. "supervisor1" (int 1)) |
| 240 | + (WorkerSlot. "supervisor1" (int 3)) |
| 241 | + (WorkerSlot. "supervisor1" (int 5)))) |
| 242 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1))))) |
| 243 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) |
| 244 | + (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) |
| 245 | + |
| 246 | + ;; test Cluster.setAssignmentById |
| 247 | + (.setAssignmentById cluster "topology1" assignment1) |
| 248 | + (is (= assignment1 (.getAssignmentById cluster "topology1"))) |
| 249 | + )) |
0 commit comments