Skip to content

Commit 33e3cfc

Browse files
committed
refactor EvenScheduler and DefaultScheduler
1 parent 2575d71 commit 33e3cfc

File tree

5 files changed

+82
-28
lines changed

5 files changed

+82
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,44 @@
11
(ns backtype.storm.scheduler.DefaultScheduler
2-
(:use [backtype.storm util])
2+
(:use [backtype.storm util config])
33
(:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler])
44
(:import [backtype.storm.scheduler IScheduler Topologies
5-
Cluster TopologyDetails WorkerSlot SchedulerAssignment])
5+
Cluster TopologyDetails WorkerSlot SchedulerAssignment
6+
EvenScheduler ExecutorDetails])
67
(:gen-class
78
:implements [backtype.storm.scheduler.IScheduler]))
89

9-
(defn- keeper-slots [existing-slots num-executors num-workers]
10+
(defn- bad-slots [existing-slots num-executors num-workers]
1011
(if (= 0 num-workers)
11-
{}
12+
'()
1213
(let [distribution (atom (integer-divided num-executors num-workers))
1314
keepers (atom {})]
1415
(doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
1516
(when (pos? (get @distribution executor-count 0))
1617
(swap! keepers assoc node+port executor-list)
1718
(swap! distribution update-in [executor-count] dec)
1819
))
19-
@keepers
20-
)))
20+
(->> @keepers
21+
keys
22+
(apply dissoc existing-slots)
23+
keys
24+
(map (fn [[node port]]
25+
(WorkerSlot. node port)))))))
2126

2227

2328
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
24-
(EvenScheduler/schedule-topologies-evenly topologies cluster keeper-slots))
29+
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
30+
(doseq [^TopologyDetails topology needs-scheduling-topologies
31+
:let [topology-id (.getId topology)
32+
topology-conf (.getConf topology)
33+
available-slots (->> (.getAvailableSlots cluster)
34+
(map #(vector (.getNodeId %) (.getPort %))))
35+
all-executors (->> topology
36+
.getExecutors
37+
(map #(vector (.getStartTask %) (.getEndTask %)))
38+
set)
39+
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
40+
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
41+
(+ (count available-slots) (count alive-assigned)))
42+
bad-slots (bad-slots alive-assigned (count all-executors) total-slots-to-use)]]
43+
(.freeSlots cluster bad-slots)
44+
(EvenScheduler/schedule-topologies-evenly topologies cluster))))

src/clj/backtype/storm/scheduler/EvenScheduler.clj

+22-20
Original file line numberDiff line numberDiff line change
@@ -17,49 +17,51 @@
1717
(into {} (for [[executor [node port]] executor->node+port]
1818
{(ExecutorDetails. (first executor) (second executor)) (WorkerSlot. node port)}))))
1919

20+
(defn get-alive-assigned-node+port->executors [cluster topology-id]
21+
(let [existing-assignment (.getAssignmentById cluster topology-id)
22+
executor->slot (if existing-assignment
23+
(.getExecutorToSlots existing-assignment)
24+
{})
25+
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
26+
:let [executor [(.getStartTask executor) (.getEndTask executor)]
27+
node+port [(.getNodeId slot) (.getPort slot)]]]
28+
{executor node+port}))
29+
alive-assigned (reverse-map executor->node+port)]
30+
alive-assigned))
2031

21-
(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster keeper-slots-fn]
32+
(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
2233
(let [topology-id (.getId topology)
34+
topology-conf (.getConf topology)
2335
available-slots (->> (.getAvailableSlots cluster)
2436
(map #(vector (.getNodeId %) (.getPort %))))
2537
all-executors (->> topology
2638
.getExecutors
2739
(map #(vector (.getStartTask %) (.getEndTask %)))
2840
set)
29-
existing-assignment (.getAssignmentById cluster topology-id)
30-
executor->node+port (if-not existing-assignment
31-
{}
32-
(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] (.getExecutorToSlots existing-assignment)]
33-
{[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]})))
34-
alive-assigned (reverse-map executor->node+port)
35-
topology-conf (.getConf topology)
41+
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
3642
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
3743
(+ (count available-slots) (count alive-assigned)))
38-
keep-assigned (if keeper-slots-fn
39-
(keeper-slots-fn alive-assigned (count all-executors) total-slots-to-use)
40-
alive-assigned
41-
)
42-
freed-slots (keys (apply dissoc alive-assigned (keys keep-assigned)))
43-
reassign-slots (take (- total-slots-to-use (count keep-assigned))
44+
freed-slots (keys (apply dissoc alive-assigned (keys alive-assigned)))
45+
reassign-slots (take (- total-slots-to-use (count alive-assigned))
4446
(sort-slots (concat available-slots freed-slots)))
45-
reassign-executors (sort (set/difference all-executors (set (apply concat (vals keep-assigned)))))
47+
reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
4648
reassignment (into {}
4749
(map vector
4850
reassign-executors
4951
;; for some reason it goes into infinite loop without limiting the repeat-seq
5052
(repeat-seq (count reassign-executors) reassign-slots)))
51-
stay-assignment (into {} (mapcat (fn [[node+port executors]] (for [executor executors] [executor node+port])) keep-assigned))]
53+
stay-assignment (into {} (mapcat (fn [[node+port executors]] (for [executor executors] [executor node+port])) alive-assigned))]
5254
(when-not (empty? reassignment)
5355
(log-message "Available slots: " (pr-str available-slots))
5456
)
5557
(mk-scheduler-assignment topology-id (merge stay-assignment reassignment))))
5658

57-
(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster keeper-slots-fn]
59+
(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
5860
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
5961
(doseq [^TopologyDetails topology needs-scheduling-topologies
6062
:let [topology-id (.getId topology)
61-
new-assignment (schedule-topology topology cluster keeper-slots-fn)]]
63+
new-assignment (schedule-topology topology cluster)]]
6264
(.setAssignmentById cluster topology-id new-assignment))))
63-
65+
6466
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
65-
(schedule-topologies-evenly topologies cluster nil))
67+
(schedule-topologies-evenly topologies cluster))

src/jvm/backtype/storm/scheduler/Cluster.java

+11
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,17 @@ public void freeSlot(WorkerSlot slot) {
264264
}
265265
}
266266
}
267+
268+
/**
269+
* free the slots.
270+
*
271+
* @param slots
272+
*/
273+
public void freeSlots(Collection<WorkerSlot> slots) {
274+
for (WorkerSlot slot : slots) {
275+
this.freeSlot(slot);
276+
}
277+
}
267278

268279
/**
269280
* Checks the specified slot is occupied.

src/jvm/backtype/storm/scheduler/ExecutorDetails.java

+13
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,17 @@ public Integer getEndTask() {
2424
public void setEndTask(Integer endTask) {
2525
this.endTask = endTask;
2626
}
27+
28+
public boolean equals(Object other) {
29+
if (other == null || !(other instanceof ExecutorDetails)) {
30+
return false;
31+
}
32+
33+
ExecutorDetails executor = (ExecutorDetails)other;
34+
return (this.startTask == executor.startTask) && (this.endTask == executor.endTask);
35+
}
36+
37+
public int hashCode() {
38+
return 13 * this.startTask + 17 * this.endTask;
39+
}
2740
}

src/jvm/backtype/storm/scheduler/SchedulerAssignment.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package backtype.storm.scheduler;
22

3+
import java.util.ArrayList;
34
import java.util.Collection;
45
import java.util.HashMap;
6+
import java.util.List;
57
import java.util.Map;
68
import java.util.Set;
79

@@ -39,12 +41,18 @@ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
3941
* @param slot
4042
*/
4143
public void removeSlot(WorkerSlot slot) {
44+
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
4245
for (ExecutorDetails executor : this.executorToSlots.keySet()) {
4346
WorkerSlot ws = this.executorToSlots.get(executor);
4447
if (ws.equals(slot)) {
45-
this.executorToSlots.remove(executor);
48+
executors.add(executor);
4649
}
4750
}
51+
52+
// remove
53+
for (ExecutorDetails executor : executors) {
54+
this.executorToSlots.remove(executor);
55+
}
4856
}
4957

5058
/**

0 commit comments

Comments
 (0)