Skip to content

Commit 2575d71

Browse files
committed
split compute-new-topology->executor->node+port into small functions
1 parent 1bc4665 commit 2575d71

File tree

3 files changed

+126
-57
lines changed

3 files changed

+126
-57
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

+102-54
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,12 @@
341341
all-executors)]
342342
(swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
343343

344+
(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
345+
"update all the heartbeats for all the topologies's executors"
346+
(doseq [[tid assignment] existing-assignments
347+
:let [all-executors (topology->executors tid)]]
348+
(update-heartbeats! nimbus tid all-executors assignment)))
349+
344350
(defn- alive-executors
345351
[nimbus ^TopologyDetails topology-details all-executors existing-assignment]
346352
(let [conf (:conf nimbus)
@@ -398,6 +404,79 @@
398404
{executor component}))]
399405
executor->component))
400406

407+
(defn- compute-topology->executors [nimbus storm-ids]
408+
"compute a topology-id -> executors map"
409+
(into {} (for [tid storm-ids]
410+
{tid (set (compute-executors nimbus tid))})))
411+
412+
(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id]
413+
"compute a topology-id -> alive executors map"
414+
(into {} (for [[tid assignment] existing-assignments
415+
:let [topology-details (.getById topologies tid)
416+
all-executors (topology->executors tid)
417+
alive-executors (if (and scratch-topology-id (= scratch-topology-id tid))
418+
all-executors
419+
(set (alive-executors nimbus topology-details all-executors assignment)))]]
420+
{tid alive-executors})))
421+
422+
(defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors]
423+
(let [dead-slots (into [] (for [[tid assignment] existing-assignments
424+
:let [all-executors (topology->executors tid)
425+
alive-executors (topology->alive-executors tid)
426+
dead-executors (set/difference all-executors alive-executors)
427+
dead-slots (->> (:executor->node+port assignment)
428+
(filter #(contains? dead-executors (first %)))
429+
vals)]]
430+
dead-slots))
431+
supervisor->dead-ports (->> dead-slots
432+
(apply concat)
433+
(map (fn [[sid port]] {sid #{port}}))
434+
(apply (partial merge-with set/union)))]
435+
(or supervisor->dead-ports {})))
436+
437+
(defn- compute-topology->scheduler-assignment [nimbus existing-assignments topology->alive-executors]
438+
"convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api."
439+
(into {} (for [[tid assignment] existing-assignments
440+
:let [alive-executors (topology->alive-executors tid)
441+
executor->node+port (:executor->node+port assignment)
442+
executor->slot (into {} (for [[executor [node port]] executor->node+port]
443+
;; filter out the dead executors
444+
(if (contains? alive-executors executor)
445+
{(ExecutorDetails. (first executor)
446+
(second executor))
447+
(WorkerSlot. node port)}
448+
{})))]]
449+
{tid (SchedulerAssignment. tid executor->slot)})))
450+
451+
(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports]
452+
"return a map: {topology-id SupervisorDetails}"
453+
(let [storm-cluster-state (:storm-cluster-state nimbus)
454+
supervisor-infos (all-supervisor-info storm-cluster-state)
455+
all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos
456+
:let [hostname (:hostname supervisor-info)
457+
scheduler-meta (:scheduler-meta supervisor-info)
458+
dead-ports (supervisor->dead-ports sid)
459+
;; hide the dead-ports from the all-ports
460+
;; these dead-ports can be reused in next round of assignments
461+
all-ports (-> sid
462+
all-slots
463+
(set/difference dead-ports)
464+
((fn [ports] (map int ports))))
465+
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
466+
{sid supervisor-details}))]
467+
all-supervisor-details))
468+
469+
(defn- compute-topology->executor->node+port [scheduler-assignments]
470+
"convert {topology-id -> SchedulerAssignment} to
471+
{topology-id -> {executor [node port]}}"
472+
(map-val (fn [^SchedulerAssignment assignment]
473+
(->> assignment
474+
.getExecutorToSlots
475+
(#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %]
476+
{[(.getStartTask executor) (.getEndTask executor)]
477+
[(.getNodeId slot) (.getPort slot)]})))))
478+
scheduler-assignments))
479+
401480
;; NEW NOTES
402481
;; only assign to supervisors who are there and haven't timed out
403482
;; need to reassign workers with executors that have timed out (will this make it brittle?)
@@ -413,56 +492,30 @@
413492
(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
414493
(let [conf (:conf nimbus)
415494
storm-cluster-state (:storm-cluster-state nimbus)
416-
task-heartbeats-cache (:task-heartbeats-cache nimbus)
417-
assignments+dead-slots (into {} (for [[tid assignment] existing-assignments
418-
:let [storm-conf (read-storm-conf conf tid)
419-
topology-details (.getById topologies tid)
420-
all-executors (set (compute-executors nimbus tid))
421-
_ (update-heartbeats! nimbus tid all-executors assignment)
422-
alive-executors (if (and scratch-topology-id (= scratch-topology-id tid))
423-
all-executors
424-
(set (alive-executors nimbus topology-details all-executors assignment)))
425-
dead-executors (set/difference all-executors alive-executors)
426-
dead-slots (->> (:executor->node+port assignment)
427-
(filter #(contains? dead-executors (first %)))
428-
vals)
429-
executor->slot (into {} (for [[executor [node port]] (:executor->node+port assignment)]
430-
;; filter out the dead executors
431-
(if (contains? alive-executors executor)
432-
{(ExecutorDetails. (first executor)
433-
(second executor))
434-
(WorkerSlot. node port)}
435-
{})))]]
436-
{tid [(SchedulerAssignment. tid executor->slot) dead-slots]}))
437-
existing-scheduler-assignments (into {} (for [[tid [assignment _]] assignments+dead-slots]
438-
{tid assignment}))
439-
dead-slots (->> assignments+dead-slots
440-
(map (fn [[tid [_ dead-slots]]] dead-slots))
441-
(apply concat)
442-
(map (fn [[sid port]] {sid #{port}}))
443-
(apply (partial merge-with set/union)))
444-
dead-slots (if (nil? dead-slots) {} dead-slots)
495+
topology->executors (compute-topology->executors nimbus (keys existing-assignments))
496+
;; update the executors heartbeats first.
497+
_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
498+
topology->alive-executors (compute-topology->alive-executors nimbus
499+
existing-assignments
500+
topologies
501+
topology->executors
502+
scratch-topology-id)
503+
supervisor->dead-ports (compute-supervisor->dead-ports nimbus
504+
existing-assignments
505+
topology->executors
506+
topology->alive-executors)
507+
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
508+
existing-assignments
509+
topology->alive-executors)
445510
available-slots (->> topologies
446511
.getTopologies
447512
(available-slots nimbus nil)
448-
(map (fn [[nodeId port]] {nodeId #{port}}))
513+
(map (fn [[node-id port]] {node-id #{port}}))
449514
(apply merge-with set/union))
450515
assigned-slots (assigned-slots storm-cluster-state)
451516
all-slots (merge-with set/union available-slots assigned-slots)
452-
supervisor-infos (all-supervisor-info storm-cluster-state)
453-
supervisors (into {} (for [[sid supervisor-info] supervisor-infos
454-
:let [hostname (:hostname supervisor-info)
455-
scheduler-meta (:scheduler-meta supervisor-info)
456-
dead-ports (dead-slots sid)
457-
;; hide the dead-ports from the all-ports
458-
;; these dead-ports can be reused in next round of assignments
459-
all-ports (-> sid
460-
all-slots
461-
(set/difference dead-ports)
462-
((fn [ports] (map int ports))))
463-
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
464-
{sid supervisor-details}))
465-
cluster (Cluster. supervisors existing-scheduler-assignments)
517+
supervisors (read-all-supervisor-details nimbus all-slots supervisor->dead-ports)
518+
cluster (Cluster. supervisors topology->scheduler-assignment)
466519
scheduler (if (conf STORM-SCHEDULER)
467520
(do
468521
(log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
@@ -475,21 +528,16 @@
475528
_ (.schedule scheduler topologies cluster)
476529
new-scheduler-assignments (.getAssignments cluster)
477530
;; add more information to convert SchedulerAssignment to Assignment
478-
new-topology->executor->node+port (into {} (for [[topology-id assignment] new-scheduler-assignments
479-
:let [executor->slot (.getExecutorToSlots ^SchedulerAssignment assignment)
480-
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot]
481-
{[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]}))]]
482-
{topology-id executor->node+port}))]
531+
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]
483532
;; print some useful information.
484533
(doseq [[topology-id executor->node+port] new-topology->executor->node+port
485534
:let [old-executor->node+port (-> topology-id
486535
existing-assignments
487536
:executor->node+port)
488-
reassignment (into {} (for [[executor node+port] executor->node+port]
489-
(if (and (contains? old-executor->node+port executor)
490-
(= node+port (old-executor->node+port executor)))
491-
{}
492-
{executor node+port})))]]
537+
reassignment (filter (fn [[executor node+port]]
538+
(and (contains? old-executor->node+port executor)
539+
(not (= node+port (old-executor->node+port executor)))))
540+
executor->node+port)]]
493541
(when-not (empty? reassignment)
494542
(let [new-slots-cnt (count (set (vals executor->node+port)))
495543
reassign-executors (keys reassignment)]

src/jvm/backtype/storm/scheduler/Cluster.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,15 @@ public int getAssignedNumWorkers(TopologyDetails topology) {
215215

216216
/**
217217
* Assign the slot to the executors for this topology.
218+
*
219+
* @throws RuntimeException if the specified slot is already occupied.
218220
*/
219221
public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
222+
if (this.isSlotOccupied(slot)) {
223+
new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
224+
}
225+
220226
SchedulerAssignment assignment = this.getAssignmentById(topologyId);
221-
222227
if (assignment == null) {
223228
assignment = new SchedulerAssignment(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
224229
this.assignments.put(topologyId, assignment);
@@ -252,14 +257,30 @@ public void freeSlot(WorkerSlot slot) {
252257
if (supervisor != null) {
253258
// remove the slot from the existing assignments
254259
for (SchedulerAssignment assignment : this.assignments.values()) {
255-
if (assignment.occupiedSlot(slot)) {
260+
if (assignment.isSlotOccupied(slot)) {
256261
assignment.removeSlot(slot);
257262
break;
258263
}
259264
}
260265
}
261266
}
262267

268+
/**
269+
* Checks the specified slot is occupied.
270+
*
271+
* @param slot the slot be to checked.
272+
* @return
273+
*/
274+
public boolean isSlotOccupied(WorkerSlot slot) {
275+
for (SchedulerAssignment assignment : this.assignments.values()) {
276+
if (assignment.isSlotOccupied(slot)) {
277+
return true;
278+
}
279+
}
280+
281+
return false;
282+
}
283+
263284
/**
264285
* get the current assignment for the topology.
265286
*/

src/jvm/backtype/storm/scheduler/SchedulerAssignment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void removeSlot(WorkerSlot slot) {
5252
* @param slot
5353
* @return
5454
*/
55-
public boolean occupiedSlot(WorkerSlot slot) {
55+
public boolean isSlotOccupied(WorkerSlot slot) {
5656
Collection<WorkerSlot> slots = this.executorToSlots.values();
5757
for (WorkerSlot slot1 : slots) {
5858
if (slot1.equals(slot)) {

0 commit comments

Comments
 (0)