Skip to content

Commit 52f79ad

Browse files
author
Nathan Marz
committed
add getAssignableSlots method to Cluster and improve blacklisting logic
1 parent 29997fd commit 52f79ad

File tree

1 file changed

+36
-8
lines changed

1 file changed

+36
-8
lines changed

src/jvm/backtype/storm/scheduler/Cluster.java

+36-8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public void setBlacklistedHosts(Set<String> hosts) {
4848
blackListedHosts = hosts;
4949
}
5050

51+
public Set<String> getBlacklistedHosts() {
52+
return blackListedHosts;
53+
}
54+
5155
public void blacklistHost(String host) {
5256
// this is so it plays well with setting blackListedHosts to an immutable list
5357
if(blackListedHosts==null) blackListedHosts = new HashSet<String>();
@@ -147,9 +151,9 @@ public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors
147151
* @param cluster
148152
* @return
149153
*/
150-
public List<Integer> getUsedPorts(SupervisorDetails supervisor) {
154+
public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
151155
Map<String, SchedulerAssignment> assignments = this.getAssignments();
152-
List<Integer> usedPorts = new ArrayList<Integer>();
156+
Set<Integer> usedPorts = new HashSet<Integer>();
153157

154158
for (SchedulerAssignment assignment : assignments.values()) {
155159
for (WorkerSlot slot : assignment.getExecutorToSlot().values()) {
@@ -168,15 +172,20 @@ public List<Integer> getUsedPorts(SupervisorDetails supervisor) {
168172
* @param cluster
169173
* @return
170174
*/
171-
public List<Integer> getAvailablePorts(SupervisorDetails supervisor) {
172-
List<Integer> usedPorts = this.getUsedPorts(supervisor);
175+
public Set<Integer> getAvailablePorts(SupervisorDetails supervisor) {
176+
Set<Integer> usedPorts = this.getUsedPorts(supervisor);
173177

174-
List<Integer> ret = new ArrayList<Integer>();
175-
ret.addAll(supervisor.allPorts);
178+
Set<Integer> ret = new HashSet();
179+
ret.addAll(getAssignablePorts(supervisor));
176180
ret.removeAll(usedPorts);
177181

178182
return ret;
179183
}
184+
185+
public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {
186+
if(isBlackListed(supervisor.id)) return new HashSet();
187+
return supervisor.allPorts;
188+
}
180189

181190
/**
182191
* Return all the available slots on this supervisor.
@@ -185,8 +194,7 @@ public List<Integer> getAvailablePorts(SupervisorDetails supervisor) {
185194
* @return
186195
*/
187196
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
188-
if(isBlackListed(supervisor.id)) return new ArrayList();
189-
List<Integer> ports = this.getAvailablePorts(supervisor);
197+
Set<Integer> ports = this.getAvailablePorts(supervisor);
190198
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
191199

192200
for (Integer port : ports) {
@@ -196,6 +204,17 @@ public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
196204
return slots;
197205
}
198206

207+
public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
208+
Set<Integer> ports = this.getAssignablePorts(supervisor);
209+
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
210+
211+
for (Integer port : ports) {
212+
slots.add(new WorkerSlot(supervisor.getId(), port));
213+
}
214+
215+
return slots;
216+
}
217+
199218
/**
200219
* get the unassigned executors of the topology.
201220
*/
@@ -271,6 +290,15 @@ public List<WorkerSlot> getAvailableSlots() {
271290

272291
return slots;
273292
}
293+
294+
public List<WorkerSlot> getAssignableSlots() {
295+
List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
296+
for (SupervisorDetails supervisor : this.supervisors.values()) {
297+
slots.addAll(this.getAssignableSlots(supervisor));
298+
}
299+
300+
return slots;
301+
}
274302

275303
/**
276304
* Free the specified slot.

0 commit comments

Comments
 (0)