Skip to content

Commit a8df52d

Browse files
author
Nathan Marz
committed
Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
1 parent 7bfe363 commit a8df52d

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
2929
* Added MultiScheme interface (thanks sritchie)
3030
* Added MockTridentTuple for testing (thanks emblem)
31+
* Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
3132
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
3233
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
3334
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.

src/jvm/backtype/storm/scheduler/Cluster.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public class Cluster {
2323
* a map from hostname to supervisor id.
2424
*/
2525
private Map<String, List<String>> hostToId;
26+
27+
private Set<String> whiteListedHosts = new HashSet<String>();
2628

2729
public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
2830
this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size());
@@ -39,7 +41,11 @@ public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, Scheduler
3941
this.hostToId.get(host).add(nodeId);
4042
}
4143
}
42-
44+
45+
public void setWhitelistedHosts(Set<String> hosts) {
46+
whiteListedHosts = hosts;
47+
}
48+
4349
/**
4450
* Gets all the topologies which needs scheduling.
4551
*
@@ -161,6 +167,7 @@ public List<Integer> getAvailablePorts(SupervisorDetails supervisor) {
161167
* @return
162168
*/
163169
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
170+
if(whiteListedHosts!=null && !whiteListedHosts.isEmpty() && !whiteListedHosts.contains(supervisor.host)) return new ArrayList();
164171
List<Integer> ports = this.getAvailablePorts(supervisor);
165172
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
166173

0 commit comments

Comments
 (0)