Skip to content

Commit d84ac47

Browse files
committed
extract the mutate methods from SchedulerAssignment to SchedulerAssignmentImpl
1 parent ebe8246 commit d84ac47

File tree

6 files changed

+171
-142
lines changed

6 files changed

+171
-142
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
(:import [java.nio.channels Channels WritableByteChannel])
88
(:use [backtype.storm.scheduler.DefaultScheduler])
99
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
10-
Cluster Topologies SchedulerAssignment DefaultScheduler ExecutorDetails])
10+
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
1111
(:use [backtype.storm bootstrap util])
1212
(:use [backtype.storm.daemon common])
1313
(:gen-class
@@ -446,7 +446,7 @@
446446
(second executor))
447447
(WorkerSlot. node port)}
448448
{})))]]
449-
{tid (SchedulerAssignment. tid executor->slot)})))
449+
{tid (SchedulerAssignmentImpl. tid executor->slot)})))
450450

451451
(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports]
452452
"return a map: {topology-id SupervisorDetails}"

src/clj/backtype/storm/scheduler/EvenScheduler.clj

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
(:use [backtype.storm util log config])
33
(:require [clojure.set :as set])
44
(:import [backtype.storm.scheduler IScheduler Topologies
5-
Cluster TopologyDetails WorkerSlot SchedulerAssignment
6-
ExecutorDetails])
5+
Cluster TopologyDetails WorkerSlot ExecutorDetails])
76
(:gen-class
87
:implements [backtype.storm.scheduler.IScheduler]))
98

@@ -12,11 +11,6 @@
1211
(apply interleave-all split-up)
1312
))
1413

15-
(defn- mk-scheduler-assignment [topology-id executor->node+port]
16-
(SchedulerAssignment. topology-id
17-
(into {} (for [[executor [node port]] executor->node+port]
18-
{(ExecutorDetails. (first executor) (second executor)) (WorkerSlot. node port)}))))
19-
2014
(defn get-alive-assigned-node+port->executors [cluster topology-id]
2115
(let [existing-assignment (.getAssignmentById cluster topology-id)
2216
executor->slot (if existing-assignment
@@ -41,9 +35,8 @@
4135
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
4236
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
4337
(+ (count available-slots) (count alive-assigned)))
44-
freed-slots (keys (apply dissoc alive-assigned (keys alive-assigned)))
4538
reassign-slots (take (- total-slots-to-use (count alive-assigned))
46-
(sort-slots (concat available-slots freed-slots)))
39+
(sort-slots available-slots))
4740
reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
4841
reassignment (into {}
4942
(map vector
@@ -54,14 +47,19 @@
5447
(when-not (empty? reassignment)
5548
(log-message "Available slots: " (pr-str available-slots))
5649
)
57-
(mk-scheduler-assignment topology-id (merge stay-assignment reassignment))))
50+
reassignment))
5851

5952
(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
6053
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
6154
(doseq [^TopologyDetails topology needs-scheduling-topologies
6255
:let [topology-id (.getId topology)
63-
new-assignment (schedule-topology topology cluster)]]
64-
(.setAssignmentById cluster topology-id new-assignment))))
56+
new-assignment (schedule-topology topology cluster)
57+
node+port->executors (reverse-map new-assignment)]]
58+
(doseq [[node+port executors] node+port->executors
59+
:let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
60+
executors (for [[start-task end-task] executors]
61+
(ExecutorDetails. start-task end-task))]]
62+
(.assign cluster slot topology-id executors)))))
6563

6664
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
6765
(schedule-topologies-evenly topologies cluster))

src/jvm/backtype/storm/scheduler/Cluster.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ public class Cluster {
1919
/**
2020
* key: topologyId, value: topology's current assignments.
2121
*/
22-
private Map<String, SchedulerAssignment> assignments;
22+
private Map<String, SchedulerAssignmentImpl> assignments;
2323

2424
/**
2525
* a map from hostname to supervisor id.
2626
*/
2727
private Map<String, List<String>> hostToId;
2828

29-
public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignment> assignments){
29+
public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
3030
this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size());
3131
this.supervisors.putAll(supervisors);
32-
this.assignments = new HashMap<String, SchedulerAssignment>(assignments.size());
32+
this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
3333
this.assignments.putAll(assignments);
3434
this.hostToId = new HashMap<String, List<String>>();
3535
for (String nodeId : supervisors.keySet()) {
@@ -223,9 +223,9 @@ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetail
223223
throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
224224
}
225225

226-
SchedulerAssignment assignment = this.getAssignmentById(topologyId);
226+
SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId);
227227
if (assignment == null) {
228-
assignment = new SchedulerAssignment(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
228+
assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
229229
this.assignments.put(topologyId, assignment);
230230
} else {
231231
for (ExecutorDetails executor : executors) {
@@ -262,7 +262,7 @@ public void freeSlot(WorkerSlot slot) {
262262

263263
if (supervisor != null) {
264264
// remove the slot from the existing assignments
265-
for (SchedulerAssignment assignment : this.assignments.values()) {
265+
for (SchedulerAssignmentImpl assignment : this.assignments.values()) {
266266
if (assignment.isSlotOccupied(slot)) {
267267
assignment.unassignBySlot(slot);
268268
break;
@@ -339,21 +339,17 @@ public List<SupervisorDetails> getSupervisorsByHost(String host) {
339339
return ret;
340340
}
341341

342-
/**
343-
* Set assignment for the specified topology.
344-
*
345-
* @param topologyId the id of the topology the assignment is for.
346-
* @param assignment the assignment to be assigned.
347-
*/
348-
public void setAssignmentById(String topologyId, SchedulerAssignment assignment) {
349-
this.assignments.put(topologyId, assignment);
350-
}
351-
352342
/**
353343
* Get all the assignments.
354344
*/
355345
public Map<String, SchedulerAssignment> getAssignments() {
356-
return this.assignments;
346+
Map<String, SchedulerAssignment> ret = new HashMap<String, SchedulerAssignment>(this.assignments.size());
347+
348+
for (String topologyId : this.assignments.keySet()) {
349+
ret.put(topologyId, this.assignments.get(topologyId));
350+
}
351+
352+
return ret;
357353
}
358354

359355
/**
Lines changed: 38 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,39 @@
1-
package backtype.storm.scheduler;
2-
3-
import java.util.ArrayList;
4-
import java.util.Collection;
5-
import java.util.HashMap;
6-
import java.util.List;
7-
import java.util.Map;
8-
import java.util.Set;
9-
10-
public class SchedulerAssignment {
11-
/**
12-
* topology-id this assignment is for.
13-
*/
14-
String topologyId;
15-
/**
16-
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
17-
*/
18-
Map<ExecutorDetails, WorkerSlot> executorToSlot;
19-
20-
public SchedulerAssignment(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
21-
this.topologyId = topologyId;
22-
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
23-
if (executorToSlots != null) {
24-
this.executorToSlot.putAll(executorToSlots);
25-
}
26-
}
27-
28-
/**
29-
* Assign the slot to executors.
30-
* @param slot
31-
* @param executors
32-
*/
33-
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
34-
for (ExecutorDetails executor : executors) {
35-
this.executorToSlot.put(executor, slot);
36-
}
37-
}
38-
39-
/**
40-
* Unassign the assignment for executor.
41-
*/
42-
public void unassignByExecutor(ExecutorDetails executor) {
43-
this.executorToSlot.remove(executor);
44-
}
45-
46-
public void unassignByExecutors(Collection<ExecutorDetails> executors) {
47-
for (ExecutorDetails executor : executors) {
48-
this.unassignByExecutor(executor);
49-
}
50-
}
51-
52-
/**
53-
* Release the slot occupied by this assignment.
54-
* @param slot
55-
*/
56-
public void unassignBySlot(WorkerSlot slot) {
57-
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
58-
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
59-
WorkerSlot ws = this.executorToSlot.get(executor);
60-
if (ws.equals(slot)) {
61-
executors.add(executor);
62-
}
63-
}
64-
65-
// remove
66-
for (ExecutorDetails executor : executors) {
67-
this.executorToSlot.remove(executor);
68-
}
69-
}
70-
71-
/**
72-
* Does this slot occupied by this assignment?
73-
* @param slot
74-
* @return
75-
*/
76-
public boolean isSlotOccupied(WorkerSlot slot) {
77-
return this.executorToSlot.containsValue(slot);
78-
}
79-
80-
public boolean isExecutorAssigned(ExecutorDetails executor) {
81-
return this.executorToSlot.containsKey(executor);
82-
}
83-
84-
public String getTopologyId() {
85-
return this.topologyId;
86-
}
87-
88-
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
89-
return this.executorToSlot;
90-
}
91-
92-
/**
93-
* Return the executors covered by this assignments
94-
* @return
95-
*/
96-
public Set<ExecutorDetails> getExecutors() {
97-
return this.executorToSlot.keySet();
98-
}
1+
package backtype.storm.scheduler;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
6+
public interface SchedulerAssignment {
7+
/**
8+
* Does this slot occupied by this assignment?
9+
* @param slot
10+
* @return
11+
*/
12+
public boolean isSlotOccupied(WorkerSlot slot);
13+
14+
/**
15+
* is the executor assigned?
16+
*
17+
* @param executor
18+
* @return
19+
*/
20+
public boolean isExecutorAssigned(ExecutorDetails executor);
21+
22+
/**
23+
* get the topology-id this assignment is for.
24+
* @return
25+
*/
26+
public String getTopologyId();
27+
28+
/**
29+
* get the executor -> slot map.
30+
* @return
31+
*/
32+
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
33+
34+
/**
35+
* Return the executors covered by this assignments
36+
* @return
37+
*/
38+
public Set<ExecutorDetails> getExecutors() ;
9939
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package backtype.storm.scheduler;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Set;
9+
10+
public class SchedulerAssignmentImpl implements SchedulerAssignment {
11+
/**
12+
* topology-id this assignment is for.
13+
*/
14+
String topologyId;
15+
/**
16+
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
17+
*/
18+
Map<ExecutorDetails, WorkerSlot> executorToSlot;
19+
20+
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
21+
this.topologyId = topologyId;
22+
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
23+
if (executorToSlots != null) {
24+
this.executorToSlot.putAll(executorToSlots);
25+
}
26+
}
27+
28+
/**
29+
* Assign the slot to executors.
30+
* @param slot
31+
* @param executors
32+
*/
33+
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
34+
for (ExecutorDetails executor : executors) {
35+
this.executorToSlot.put(executor, slot);
36+
}
37+
}
38+
39+
/**
40+
* Unassign the assignment for executor.
41+
*/
42+
public void unassignByExecutor(ExecutorDetails executor) {
43+
this.executorToSlot.remove(executor);
44+
}
45+
46+
public void unassignByExecutors(Collection<ExecutorDetails> executors) {
47+
for (ExecutorDetails executor : executors) {
48+
this.unassignByExecutor(executor);
49+
}
50+
}
51+
52+
/**
53+
* Release the slot occupied by this assignment.
54+
* @param slot
55+
*/
56+
public void unassignBySlot(WorkerSlot slot) {
57+
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
58+
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
59+
WorkerSlot ws = this.executorToSlot.get(executor);
60+
if (ws.equals(slot)) {
61+
executors.add(executor);
62+
}
63+
}
64+
65+
// remove
66+
for (ExecutorDetails executor : executors) {
67+
this.executorToSlot.remove(executor);
68+
}
69+
}
70+
71+
/**
72+
* Does this slot occupied by this assignment?
73+
* @param slot
74+
* @return
75+
*/
76+
public boolean isSlotOccupied(WorkerSlot slot) {
77+
return this.executorToSlot.containsValue(slot);
78+
}
79+
80+
public boolean isExecutorAssigned(ExecutorDetails executor) {
81+
return this.executorToSlot.containsKey(executor);
82+
}
83+
84+
public String getTopologyId() {
85+
return this.topologyId;
86+
}
87+
88+
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
89+
return this.executorToSlot;
90+
}
91+
92+
/**
93+
* Return the executors covered by this assignments
94+
* @return
95+
*/
96+
public Set<ExecutorDetails> getExecutors() {
97+
return this.executorToSlot.keySet();
98+
}
99+
}

0 commit comments

Comments
 (0)