@@ -19,17 +19,17 @@ public class Cluster {
19
19
/**
20
20
* key: topologyId, value: topology's current assignments.
21
21
*/
22
- private Map <String , SchedulerAssignment > assignments ;
22
+ private Map <String , SchedulerAssignmentImpl > assignments ;
23
23
24
24
/**
25
25
* a map from hostname to supervisor id.
26
26
*/
27
27
private Map <String , List <String >> hostToId ;
28
28
29
- public Cluster (Map <String , SupervisorDetails > supervisors , Map <String , SchedulerAssignment > assignments ){
29
+ public Cluster (Map <String , SupervisorDetails > supervisors , Map <String , SchedulerAssignmentImpl > assignments ){
30
30
this .supervisors = new HashMap <String , SupervisorDetails >(supervisors .size ());
31
31
this .supervisors .putAll (supervisors );
32
- this .assignments = new HashMap <String , SchedulerAssignment >(assignments .size ());
32
+ this .assignments = new HashMap <String , SchedulerAssignmentImpl >(assignments .size ());
33
33
this .assignments .putAll (assignments );
34
34
this .hostToId = new HashMap <String , List <String >>();
35
35
for (String nodeId : supervisors .keySet ()) {
@@ -223,9 +223,9 @@ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetail
223
223
throw new RuntimeException ("slot: [" + slot .getNodeId () + ", " + slot .getPort () + "] is already occupied." );
224
224
}
225
225
226
- SchedulerAssignment assignment = this .getAssignmentById (topologyId );
226
+ SchedulerAssignmentImpl assignment = ( SchedulerAssignmentImpl ) this .getAssignmentById (topologyId );
227
227
if (assignment == null ) {
228
- assignment = new SchedulerAssignment (topologyId , new HashMap <ExecutorDetails , WorkerSlot >());
228
+ assignment = new SchedulerAssignmentImpl (topologyId , new HashMap <ExecutorDetails , WorkerSlot >());
229
229
this .assignments .put (topologyId , assignment );
230
230
} else {
231
231
for (ExecutorDetails executor : executors ) {
@@ -262,7 +262,7 @@ public void freeSlot(WorkerSlot slot) {
262
262
263
263
if (supervisor != null ) {
264
264
// remove the slot from the existing assignments
265
- for (SchedulerAssignment assignment : this .assignments .values ()) {
265
+ for (SchedulerAssignmentImpl assignment : this .assignments .values ()) {
266
266
if (assignment .isSlotOccupied (slot )) {
267
267
assignment .unassignBySlot (slot );
268
268
break ;
@@ -339,21 +339,17 @@ public List<SupervisorDetails> getSupervisorsByHost(String host) {
339
339
return ret ;
340
340
}
341
341
342
- /**
343
- * Set assignment for the specified topology.
344
- *
345
- * @param topologyId the id of the topology the assignment is for.
346
- * @param assignment the assignment to be assigned.
347
- */
348
- public void setAssignmentById (String topologyId , SchedulerAssignment assignment ) {
349
- this .assignments .put (topologyId , assignment );
350
- }
351
-
352
342
/**
353
343
* Get all the assignments.
354
344
*/
355
345
public Map <String , SchedulerAssignment > getAssignments () {
356
- return this .assignments ;
346
+ Map <String , SchedulerAssignment > ret = new HashMap <String , SchedulerAssignment >(this .assignments .size ());
347
+
348
+ for (String topologyId : this .assignments .keySet ()) {
349
+ ret .put (topologyId , this .assignments .get (topologyId ));
350
+ }
351
+
352
+ return ret ;
357
353
}
358
354
359
355
/**
0 commit comments