Skip to content

Commit 41d0e33

Browse files
author
Nathan Marz
committed
Merge remote-tracking branch 'xuming/issue164' into scheduler
2 parents f2a7046 + 1e35566 commit 41d0e33

File tree

6 files changed

+158
-129
lines changed

6 files changed

+158
-129
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
(:import [java.nio.channels Channels WritableByteChannel])
88
(:use [backtype.storm.scheduler.DefaultScheduler])
99
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
10-
Cluster Topologies SchedulerAssignment DefaultScheduler ExecutorDetails])
10+
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
1111
(:use [backtype.storm bootstrap util])
1212
(:use [backtype.storm.daemon common])
1313
(:gen-class
@@ -451,7 +451,7 @@
451451
(second executor))
452452
(WorkerSlot. node port)}
453453
{})))]]
454-
{tid (SchedulerAssignment. tid executor->slot)})))
454+
{tid (SchedulerAssignmentImpl. tid executor->slot)})))
455455

456456
(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports]
457457
"return a map: {topology-id SupervisorDetails}"

src/clj/backtype/storm/scheduler/EvenScheduler.clj

+10-12
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
(:use [backtype.storm util log config])
33
(:require [clojure.set :as set])
44
(:import [backtype.storm.scheduler IScheduler Topologies
5-
Cluster TopologyDetails WorkerSlot SchedulerAssignment
6-
ExecutorDetails])
5+
Cluster TopologyDetails WorkerSlot ExecutorDetails])
76
(:gen-class
87
:implements [backtype.storm.scheduler.IScheduler]))
98

@@ -12,11 +11,6 @@
1211
(apply interleave-all split-up)
1312
))
1413

15-
(defn- mk-scheduler-assignment [topology-id executor->node+port]
16-
(SchedulerAssignment. topology-id
17-
(into {} (for [[executor [node port]] executor->node+port]
18-
{(ExecutorDetails. (first executor) (second executor)) (WorkerSlot. node port)}))))
19-
2014
(defn get-alive-assigned-node+port->executors [cluster topology-id]
2115
(let [existing-assignment (.getAssignmentById cluster topology-id)
2216
executor->slot (if existing-assignment
@@ -41,9 +35,8 @@
4135
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
4236
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
4337
(+ (count available-slots) (count alive-assigned)))
44-
freed-slots (keys (apply dissoc alive-assigned (keys alive-assigned)))
4538
reassign-slots (take (- total-slots-to-use (count alive-assigned))
46-
(sort-slots (concat available-slots freed-slots)))
39+
(sort-slots available-slots))
4740
reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
4841
reassignment (into {}
4942
(map vector
@@ -54,14 +47,19 @@
5447
(when-not (empty? reassignment)
5548
(log-message "Available slots: " (pr-str available-slots))
5649
)
57-
(mk-scheduler-assignment topology-id (merge stay-assignment reassignment))))
50+
reassignment))
5851

5952
(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
6053
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
6154
(doseq [^TopologyDetails topology needs-scheduling-topologies
6255
:let [topology-id (.getId topology)
63-
new-assignment (schedule-topology topology cluster)]]
64-
(.setAssignmentById cluster topology-id new-assignment))))
56+
new-assignment (schedule-topology topology cluster)
57+
node+port->executors (reverse-map new-assignment)]]
58+
(doseq [[node+port executors] node+port->executors
59+
:let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
60+
executors (for [[start-task end-task] executors]
61+
(ExecutorDetails. start-task end-task))]]
62+
(.assign cluster slot topology-id executors)))))
6563

6664
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
6765
(schedule-topologies-evenly topologies cluster))

src/jvm/backtype/storm/scheduler/Cluster.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ public class Cluster {
1919
/**
2020
* key: topologyId, value: topology's current assignments.
2121
*/
22-
private Map<String, SchedulerAssignment> assignments;
22+
private Map<String, SchedulerAssignmentImpl> assignments;
2323

2424
/**
2525
* a map from hostname to supervisor id.
2626
*/
2727
private Map<String, List<String>> hostToId;
2828

29-
public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignment> assignments){
29+
public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
3030
this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size());
3131
this.supervisors.putAll(supervisors);
32-
this.assignments = new HashMap<String, SchedulerAssignment>(assignments.size());
32+
this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
3333
this.assignments.putAll(assignments);
3434
this.hostToId = new HashMap<String, List<String>>();
3535
for (String nodeId : supervisors.keySet()) {
@@ -223,9 +223,9 @@ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetail
223223
throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
224224
}
225225

226-
SchedulerAssignment assignment = this.getAssignmentById(topologyId);
226+
SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId);
227227
if (assignment == null) {
228-
assignment = new SchedulerAssignment(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
228+
assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
229229
this.assignments.put(topologyId, assignment);
230230
} else {
231231
for (ExecutorDetails executor : executors) {
@@ -262,7 +262,7 @@ public void freeSlot(WorkerSlot slot) {
262262

263263
if (supervisor != null) {
264264
// remove the slot from the existing assignments
265-
for (SchedulerAssignment assignment : this.assignments.values()) {
265+
for (SchedulerAssignmentImpl assignment : this.assignments.values()) {
266266
if (assignment.isSlotOccupied(slot)) {
267267
assignment.unassignBySlot(slot);
268268
break;
@@ -339,21 +339,17 @@ public List<SupervisorDetails> getSupervisorsByHost(String host) {
339339
return ret;
340340
}
341341

342-
/**
343-
* Set assignment for the specified topology.
344-
*
345-
* @param topologyId the id of the topology the assignment is for.
346-
* @param assignment the assignment to be assigned.
347-
*/
348-
public void setAssignmentById(String topologyId, SchedulerAssignment assignment) {
349-
this.assignments.put(topologyId, assignment);
350-
}
351-
352342
/**
353343
* Get all the assignments.
354344
*/
355345
public Map<String, SchedulerAssignment> getAssignments() {
356-
return this.assignments;
346+
Map<String, SchedulerAssignment> ret = new HashMap<String, SchedulerAssignment>(this.assignments.size());
347+
348+
for (String topologyId : this.assignments.keySet()) {
349+
ret.put(topologyId, this.assignments.get(topologyId));
350+
}
351+
352+
return ret;
357353
}
358354

359355
/**
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,39 @@
1-
package backtype.storm.scheduler;
2-
3-
import java.util.ArrayList;
4-
import java.util.Collection;
5-
import java.util.HashMap;
6-
import java.util.List;
7-
import java.util.Map;
8-
import java.util.Set;
9-
10-
public class SchedulerAssignment {
11-
/**
12-
* topology-id this assignment is for.
13-
*/
14-
String topologyId;
15-
/**
16-
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
17-
*/
18-
Map<ExecutorDetails, WorkerSlot> executorToSlot;
19-
20-
public SchedulerAssignment(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
21-
this.topologyId = topologyId;
22-
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
23-
if (executorToSlots != null) {
24-
this.executorToSlot.putAll(executorToSlots);
25-
}
26-
}
27-
28-
/**
29-
* Assign the slot to executors.
30-
* @param slot
31-
* @param executors
32-
*/
33-
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
34-
for (ExecutorDetails executor : executors) {
35-
this.executorToSlot.put(executor, slot);
36-
}
37-
}
38-
39-
/**
40-
* Release the slot occupied by this assignment.
41-
* @param slot
42-
*/
43-
public void unassignBySlot(WorkerSlot slot) {
44-
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
45-
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
46-
WorkerSlot ws = this.executorToSlot.get(executor);
47-
if (ws.equals(slot)) {
48-
executors.add(executor);
49-
}
50-
}
51-
52-
// remove
53-
for (ExecutorDetails executor : executors) {
54-
this.executorToSlot.remove(executor);
55-
}
56-
}
57-
58-
/**
59-
* Does this slot occupied by this assignment?
60-
* @param slot
61-
* @return
62-
*/
63-
public boolean isSlotOccupied(WorkerSlot slot) {
64-
return this.executorToSlot.containsValue(slot);
65-
}
66-
67-
public boolean isExecutorAssigned(ExecutorDetails executor) {
68-
return this.executorToSlot.containsKey(executor);
69-
}
70-
71-
public String getTopologyId() {
72-
return this.topologyId;
73-
}
74-
75-
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
76-
return this.executorToSlot;
77-
}
78-
79-
/**
80-
* Return the executors covered by this assignments
81-
* @return
82-
*/
83-
public Set<ExecutorDetails> getExecutors() {
84-
return this.executorToSlot.keySet();
85-
}
1+
package backtype.storm.scheduler;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
6+
public interface SchedulerAssignment {
7+
/**
8+
* Does this slot occupied by this assignment?
9+
* @param slot
10+
* @return
11+
*/
12+
public boolean isSlotOccupied(WorkerSlot slot);
13+
14+
/**
15+
* is the executor assigned?
16+
*
17+
* @param executor
18+
* @return
19+
*/
20+
public boolean isExecutorAssigned(ExecutorDetails executor);
21+
22+
/**
23+
* get the topology-id this assignment is for.
24+
* @return
25+
*/
26+
public String getTopologyId();
27+
28+
/**
29+
* get the executor -> slot map.
30+
* @return
31+
*/
32+
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
33+
34+
/**
35+
* Return the executors covered by this assignments
36+
* @return
37+
*/
38+
public Set<ExecutorDetails> getExecutors();
8639
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package backtype.storm.scheduler;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Set;
9+
10+
public class SchedulerAssignmentImpl implements SchedulerAssignment {
11+
/**
12+
* topology-id this assignment is for.
13+
*/
14+
String topologyId;
15+
/**
16+
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
17+
*/
18+
Map<ExecutorDetails, WorkerSlot> executorToSlot;
19+
20+
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
21+
this.topologyId = topologyId;
22+
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
23+
if (executorToSlots != null) {
24+
this.executorToSlot.putAll(executorToSlots);
25+
}
26+
}
27+
28+
/**
29+
* Assign the slot to executors.
30+
* @param slot
31+
* @param executors
32+
*/
33+
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
34+
for (ExecutorDetails executor : executors) {
35+
this.executorToSlot.put(executor, slot);
36+
}
37+
}
38+
39+
/**
40+
* Release the slot occupied by this assignment.
41+
* @param slot
42+
*/
43+
public void unassignBySlot(WorkerSlot slot) {
44+
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
45+
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
46+
WorkerSlot ws = this.executorToSlot.get(executor);
47+
if (ws.equals(slot)) {
48+
executors.add(executor);
49+
}
50+
}
51+
52+
// remove
53+
for (ExecutorDetails executor : executors) {
54+
this.executorToSlot.remove(executor);
55+
}
56+
}
57+
58+
/**
59+
* Does this slot occupied by this assignment?
60+
* @param slot
61+
* @return
62+
*/
63+
public boolean isSlotOccupied(WorkerSlot slot) {
64+
return this.executorToSlot.containsValue(slot);
65+
}
66+
67+
public boolean isExecutorAssigned(ExecutorDetails executor) {
68+
return this.executorToSlot.containsKey(executor);
69+
}
70+
71+
public String getTopologyId() {
72+
return this.topologyId;
73+
}
74+
75+
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
76+
return this.executorToSlot;
77+
}
78+
79+
/**
80+
* Return the executors covered by this assignments
81+
* @return
82+
*/
83+
public Set<ExecutorDetails> getExecutors() {
84+
return this.executorToSlot.keySet();
85+
}
86+
}

0 commit comments

Comments
 (0)