Skip to content

Commit ebe8246

Browse files
committed
add unit tests for scheduler java api
1 parent 7efff3c commit ebe8246

File tree

4 files changed

+287
-12
lines changed

4 files changed

+287
-12
lines changed

src/jvm/backtype/storm/scheduler/Cluster.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,19 @@ public int getAssignedNumWorkers(TopologyDetails topology) {
220220
*/
221221
public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
222222
if (this.isSlotOccupied(slot)) {
223-
new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
223+
throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
224224
}
225225

226226
SchedulerAssignment assignment = this.getAssignmentById(topologyId);
227227
if (assignment == null) {
228228
assignment = new SchedulerAssignment(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
229229
this.assignments.put(topologyId, assignment);
230+
} else {
231+
for (ExecutorDetails executor : executors) {
232+
if (assignment.isExecutorAssigned(executor)) {
233+
throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot.");
234+
}
235+
}
230236
}
231237

232238
assignment.assign(slot, executors);
@@ -258,7 +264,7 @@ public void freeSlot(WorkerSlot slot) {
258264
// remove the slot from the existing assignments
259265
for (SchedulerAssignment assignment : this.assignments.values()) {
260266
if (assignment.isSlotOccupied(slot)) {
261-
assignment.removeSlot(slot);
267+
assignment.unassignBySlot(slot);
262268
break;
263269
}
264270
}
@@ -356,4 +362,14 @@ public Map<String, SchedulerAssignment> getAssignments() {
356362
public Map<String, SupervisorDetails> getSupervisors() {
357363
return this.supervisors;
358364
}
365+
366+
private boolean isExecutorAssigned(ExecutorDetails executor) {
367+
for (SchedulerAssignment assignment : this.assignments.values()) {
368+
if (assignment.isExecutorAssigned(executor)) {
369+
return true;
370+
}
371+
}
372+
373+
return false;
374+
}
359375
}

src/jvm/backtype/storm/scheduler/ExecutorDetails.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ public boolean equals(Object other) {
3535
}
3636

3737
public int hashCode() {
38-
return 13 * this.startTask + 17 * this.endTask;
38+
return this.startTask + 13 * this.endTask;
3939
}
4040
}

src/jvm/backtype/storm/scheduler/SchedulerAssignment.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,24 @@ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
3636
}
3737
}
3838

39+
/**
40+
* Unassign the assignment for executor.
41+
*/
42+
public void unassignByExecutor(ExecutorDetails executor) {
43+
this.executorToSlot.remove(executor);
44+
}
45+
46+
public void unassignByExecutors(Collection<ExecutorDetails> executors) {
47+
for (ExecutorDetails executor : executors) {
48+
this.unassignByExecutor(executor);
49+
}
50+
}
51+
3952
/**
4053
* Release the slot occupied by this assignment.
4154
* @param slot
4255
*/
43-
public void removeSlot(WorkerSlot slot) {
56+
public void unassignBySlot(WorkerSlot slot) {
4457
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
4558
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
4659
WorkerSlot ws = this.executorToSlot.get(executor);
@@ -61,16 +74,13 @@ public void removeSlot(WorkerSlot slot) {
6174
* @return
6275
*/
6376
public boolean isSlotOccupied(WorkerSlot slot) {
64-
Collection<WorkerSlot> slots = this.executorToSlot.values();
65-
for (WorkerSlot slot1 : slots) {
66-
if (slot1.equals(slot)) {
67-
return true;
68-
}
69-
}
70-
71-
return false;
77+
return this.executorToSlot.containsValue(slot);
7278
}
7379

80+
public boolean isExecutorAssigned(ExecutorDetails executor) {
81+
return this.executorToSlot.containsKey(executor);
82+
}
83+
7484
public String getTopologyId() {
7585
return this.topologyId;
7686
}
+249
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
(ns backtype.storm.scheduler-test
2+
(:use [clojure test])
3+
(:use [backtype.storm bootstrap config testing])
4+
(:import [backtype.storm.generated StormTopology])
5+
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
6+
SchedulerAssignment Topologies TopologyDetails]))
7+
8+
(bootstrap)
9+
10+
(defn clojurify-executor->slot [executorToSlot]
11+
(into {} (for [[executor slot] executorToSlot]
12+
{[(.getStartTask executor) (.getEndTask executor)]
13+
[(.getNodeId slot) (.getPort slot)]})))
14+
15+
(defn clojurify-executor->comp [executorToComp]
16+
(into {} (for [[executor component] executorToComp]
17+
{[(.getStartTask executor) (.getEndTask executor)] component})))
18+
19+
(defn clojurify-component->executors [compToExecutor]
20+
(into {} (for [[component executors] compToExecutor
21+
:let [new-executors (set (map #(vector (.getStartTask %) (.getEndTask %)) executors))]]
22+
{component new-executors})))
23+
24+
(deftest test-supervisor-details
25+
(let [executor->slot {(ExecutorDetails. (int 1) (int 5)) (WorkerSlot. "supervisor1" (int 1))
26+
(ExecutorDetails. (int 6) (int 10)) (WorkerSlot. "supervisor2" (int 2))}
27+
topology-id "topology1"
28+
assignment (SchedulerAssignment. topology-id executor->slot)]
29+
;; test assign
30+
(.assign assignment (WorkerSlot. "supervisor1" 1)
31+
(list (ExecutorDetails. (int 11) (int 15)) (ExecutorDetails. (int 16) (int 20))))
32+
(is (= {[1 5] ["supervisor1" 1]
33+
[6 10] ["supervisor2" 2]
34+
[11 15] ["supervisor1" 1]
35+
[16 20] ["supervisor1" 1]}
36+
(clojurify-executor->slot (.getExecutorToSlot assignment))))
37+
;; test isSlotOccupied
38+
(is (= true (.isSlotOccupied assignment (WorkerSlot. "supervisor2" (int 2)))))
39+
(is (= true (.isSlotOccupied assignment (WorkerSlot. "supervisor1" (int 1)))))
40+
41+
;; test isExecutorAssigned
42+
(is (= true (.isExecutorAssigned assignment (ExecutorDetails. (int 1) (int 5)))))
43+
(is (= false (.isExecutorAssigned assignment (ExecutorDetails. (int 21) (int 25)))))
44+
45+
;; test unassignBySlot
46+
(.unassignBySlot assignment (WorkerSlot. "supervisor1" (int 1)))
47+
(is (= {[6 10] ["supervisor2" 2]}
48+
(clojurify-executor->slot (.getExecutorToSlot assignment))))
49+
50+
;; test unassignByExecutor
51+
(.unassignByExecutor assignment (ExecutorDetails. (int 6) (int 10)))
52+
(is (= true
53+
(empty? (.getExecutorToSlot assignment))))
54+
))
55+
56+
(deftest test-topologies
57+
(let [executor1 (ExecutorDetails. (int 1) (int 5))
58+
executor2 (ExecutorDetails. (int 6) (int 10))
59+
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.)
60+
{executor1 "spout1"
61+
executor2 "bolt1"})
62+
;; test topology.selectExecutorToComponent
63+
executor->comp (.selectExecutorToComponent topology1 (list executor1))
64+
_ (is (= (clojurify-executor->comp {executor1 "spout1"})
65+
(clojurify-executor->comp executor->comp)))
66+
;; test topologies.getById
67+
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) {})
68+
topologies (Topologies. {"topology1" topology1 "topology2" topology2})
69+
_ (is (= "topology1" (->> "topology1"
70+
(.getById topologies)
71+
.getId)))
72+
;; test topologies.getByName
73+
_ (is (= "topology2" (->> "topology-name-2"
74+
(.getByName topologies)
75+
.getId)))
76+
]
77+
)
78+
)
79+
80+
(deftest test-cluster
81+
(let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9)))
82+
supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10)))
83+
executor1 (ExecutorDetails. (int 1) (int 5))
84+
executor2 (ExecutorDetails. (int 6) (int 10))
85+
executor3 (ExecutorDetails. (int 11) (int 15))
86+
executor11 (ExecutorDetails. (int 100) (int 105))
87+
executor12 (ExecutorDetails. (int 106) (int 110))
88+
executor21 (ExecutorDetails. (int 201) (int 205))
89+
executor22 (ExecutorDetails. (int 206) (int 210))
90+
;; topology1 needs scheduling: executor3 is NOT assigned a slot.
91+
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" TOPOLOGY-WORKERS 2}
92+
(StormTopology.)
93+
{executor1 "spout1"
94+
executor2 "bolt1"
95+
executor3 "bolt2"})
96+
;; topology2 is fully scheduled
97+
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-WORKERS 2}
98+
(StormTopology.)
99+
{executor11 "spout11"
100+
executor12 "bolt12"})
101+
;; topology3 needs scheduling, since the assignment is squeezed
102+
topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" TOPOLOGY-WORKERS 2}
103+
(StormTopology.)
104+
{executor21 "spout21"
105+
executor22 "bolt22"})
106+
topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3})
107+
executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1))
108+
executor2 (WorkerSlot. "supervisor2" (int 2))}
109+
executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 3))
110+
executor12 (WorkerSlot. "supervisor2" (int 4))}
111+
executor->slot3 {executor21 (WorkerSlot. "supervisor1" (int 5))
112+
executor22 (WorkerSlot. "supervisor1" (int 5))}
113+
assignment1 (SchedulerAssignment. "topology1" executor->slot1)
114+
assignment2 (SchedulerAssignment. "topology2" executor->slot2)
115+
assignment3 (SchedulerAssignment. "topology3" executor->slot3)
116+
cluster (Cluster. {"supervisor1" supervisor1 "supervisor2" supervisor2}
117+
{"topology1" assignment1 "topology2" assignment2 "topology3" assignment3})]
118+
;; test Cluster constructor
119+
(is (= #{"supervisor1" "supervisor2"}
120+
(->> cluster
121+
.getSupervisors
122+
keys
123+
set)))
124+
(is (= #{"topology1" "topology2" "topology3"}
125+
(->> cluster
126+
.getAssignments
127+
keys
128+
set)))
129+
130+
;; test Cluster.getUnassignedExecutors
131+
(is (= (set (list executor3))
132+
(-> cluster
133+
(.getUnassignedExecutors topology1)
134+
set)))
135+
(is (= true
136+
(empty? (-> cluster
137+
(.getUnassignedExecutors topology2)))))
138+
;; test Cluster.needsScheduling
139+
(is (= true (.needsScheduling cluster topology1)))
140+
(is (= false (.needsScheduling cluster topology2)))
141+
(is (= true (.needsScheduling cluster topology3)))
142+
;; test Cluster.needsSchedulingTopologies
143+
(is (= #{"topology1" "topology3"}
144+
(->> (.needsSchedulingTopologies cluster topologies)
145+
(map (fn [topology] (.getId topology)))
146+
set)))
147+
148+
;; test Cluster.getNeedsSchedulingExecutorToComponents
149+
(is (= {executor3 "bolt2"}
150+
(.getNeedsSchedulingExecutorToComponents cluster topology1)))
151+
(is (= true
152+
(empty? (.getNeedsSchedulingExecutorToComponents cluster topology2))))
153+
(is (= true
154+
(empty? (.getNeedsSchedulingExecutorToComponents cluster topology3))))
155+
156+
;; test Cluster.getNeedsSchedulingComponentToExecutors
157+
(is (= {"bolt2" #{[(.getStartTask executor3) (.getEndTask executor3)]}}
158+
(clojurify-component->executors (.getNeedsSchedulingComponentToExecutors cluster topology1))))
159+
(is (= true
160+
(empty? (.getNeedsSchedulingComponentToExecutors cluster topology2))))
161+
(is (= true
162+
(empty? (.getNeedsSchedulingComponentToExecutors cluster topology3))))
163+
164+
;; test Cluster.getUsedPorts
165+
(is (= #{1 3 5} (set (.getUsedPorts cluster supervisor1))))
166+
(is (= #{2 4} (set (.getUsedPorts cluster supervisor2))))
167+
(is (= #{1 3 5} (set (.getUsedPorts cluster supervisor1))))
168+
169+
;; test Cluster.getAvailablePorts
170+
(is (= #{7 9} (set (.getAvailablePorts cluster supervisor1))))
171+
(is (= #{6 8 10} (set (.getAvailablePorts cluster supervisor2))))
172+
173+
;; test Cluster.getAvailableSlots
174+
(is (= #{["supervisor1" 7] ["supervisor1" 9]} (set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster supervisor1)))))
175+
(is (= #{["supervisor2" 6] ["supervisor2" 8] ["supervisor2" 10]} (set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster supervisor2)))))
176+
;; test Cluster.getAvailableSlots
177+
(is (= #{["supervisor1" 7] ["supervisor1" 9] ["supervisor2" 6] ["supervisor2" 8] ["supervisor2" 10]}
178+
(set (map (fn [slot] [(.getNodeId slot) (.getPort slot)]) (.getAvailableSlots cluster)))))
179+
;; test Cluster.getAssignedNumWorkers
180+
(is (= 2 (.getAssignedNumWorkers cluster topology1)))
181+
(is (= 2 (.getAssignedNumWorkers cluster topology2)))
182+
(is (= 1 (.getAssignedNumWorkers cluster topology3)))
183+
184+
;; test Cluster.isSlotOccupied
185+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1)))))
186+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3)))))
187+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5)))))
188+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7)))))
189+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 9)))))
190+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 2)))))
191+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 4)))))
192+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 6)))))
193+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 8)))))
194+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor2" (int 10)))))
195+
;; test Cluster.getAssignmentById
196+
(is (= assignment1 (.getAssignmentById cluster "topology1")))
197+
(is (= assignment2 (.getAssignmentById cluster "topology2")))
198+
(is (= assignment3 (.getAssignmentById cluster "topology3")))
199+
;; test Cluster.getSupervisorById
200+
(is (= supervisor1 (.getSupervisorById cluster "supervisor1")))
201+
(is (= supervisor2 (.getSupervisorById cluster "supervisor2")))
202+
;; test Cluster.getSupervisorsByHost
203+
(is (= #{supervisor1} (set (.getSupervisorsByHost cluster "192.168.0.1"))))
204+
(is (= #{supervisor2} (set (.getSupervisorsByHost cluster "192.168.0.2"))))
205+
206+
;; ==== the following tests will change the state of the cluster, so put it here at the end ====
207+
;; test Cluster.assign
208+
(.assign cluster (WorkerSlot. "supervisor1" (int 7)) "topology1" (list executor3))
209+
(is (= false (.needsScheduling cluster topology1)))
210+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7)))))
211+
212+
;; test Cluster.assign: if a executor is already assigned, there will be an exception
213+
(let [has-exception (try
214+
(.assign cluster (WorkerSlot. "supervisor1" (int 9)) "topology1" (list executor1))
215+
false
216+
(catch Exception e
217+
true))]
218+
(is (= true has-exception)))
219+
220+
;; test Cluster.assign: if a slot is occupied, there will be an exception
221+
(.setAssignmentById cluster "topology1" (SchedulerAssignment. "topology1" {}))
222+
(let [has-exception (try
223+
(.assign cluster (WorkerSlot. "supervisor2" (int 4)) "topology1" (list executor1))
224+
false
225+
(catch Exception e
226+
true))]
227+
(is (= true has-exception)))
228+
229+
;; revert the changes
230+
(.setAssignmentById cluster "topology1" assignment1)
231+
;; test Cluster.freeSlot
232+
(.freeSlot cluster (WorkerSlot. "supervisor1" (int 7)))
233+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 7)))))
234+
235+
;; test Cluster.freeSlots
236+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1)))))
237+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3)))))
238+
(is (= true (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5)))))
239+
(.freeSlots cluster (list (WorkerSlot. "supervisor1" (int 1))
240+
(WorkerSlot. "supervisor1" (int 3))
241+
(WorkerSlot. "supervisor1" (int 5))))
242+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 1)))))
243+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3)))))
244+
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5)))))
245+
246+
;; test Cluster.setAssignmentById
247+
(.setAssignmentById cluster "topology1" assignment1)
248+
(is (= assignment1 (.getAssignmentById cluster "topology1")))
249+
))

0 commit comments

Comments
 (0)