|
254 | 254 | supervisor-ids))
|
255 | 255 | )))
|
256 | 256 |
|
257 |
| -(defn get-node->host [storm-cluster-state callback] |
258 |
| - (->> (all-supervisor-info storm-cluster-state callback) |
259 |
| - (map-val :hostname))) |
260 |
| - |
261 | 257 | (defn- available-slots
|
262 | 258 | [nimbus callback topologies]
|
263 | 259 | (let [storm-cluster-state (:storm-cluster-state nimbus)
|
|
583 | 579 | set)]
|
584 | 580 | (set/difference new-slots old-slots)))
|
585 | 581 |
|
| 582 | + |
| 583 | +(defn basic-supervisor-details-map [storm-cluster-state] |
| 584 | + (let [infos (all-supervisor-info storm-cluster-state)] |
| 585 | + (->> infos |
| 586 | + (map (fn [[id info]] |
| 587 | + [id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil)])) |
| 588 | + (into {})))) |
| 589 | + |
586 | 590 | ;; get existing assignment (just the executor->node+port map) -> default to {}
|
587 | 591 | ;; filter out ones which have a executor timeout
|
588 | 592 | ;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
|
|
591 | 595 | (defnk mk-assignments [nimbus :scratch-topology-id nil]
|
592 | 596 | (let [conf (:conf nimbus)
|
593 | 597 | storm-cluster-state (:storm-cluster-state nimbus)
|
594 |
| - node->host (get-node->host storm-cluster-state nil) |
| 598 | + ^INimbus inimbus (:inimbus nimbus) |
595 | 599 | ;; read all the topologies
|
596 | 600 | topology-ids (.active-storms storm-cluster-state)
|
597 | 601 | topologies (into {} (for [tid topology-ids]
|
|
611 | 615 | existing-assignments
|
612 | 616 | topologies
|
613 | 617 | scratch-topology-id)
|
| 618 | + |
| 619 | + |
614 | 620 | now-secs (current-time-secs)
|
| 621 | + |
| 622 | + basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state) |
| 623 | + |
615 | 624 | ;; construct the final Assignments by adding start-times etc into it
|
616 | 625 | new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
|
617 | 626 | :let [existing-assignment (get existing-assignments topology-id)
|
618 |
| - all-node->host (merge (:node->host existing-assignment) node->host) |
619 |
| - reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port) |
620 |
| - start-times (merge (:executor->start-time-secs existing-assignment) |
| 627 | + all-nodes (->> executor->node+port vals (map first) set) |
| 628 | + node->host (->> all-nodes |
| 629 | + (mapcat (fn [node] |
| 630 | + (if-let [host (.getHostName inimbus basic-supervisor-details-map node)] |
| 631 | + [[node host]] |
| 632 | + ))) |
| 633 | + (into {})) |
| 634 | + all-node->host (merge (:node->host existing-assignment) node->host) |
| 635 | + reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port) |
| 636 | + start-times (merge (:executor->start-time-secs existing-assignment) |
621 | 637 | (into {}
|
622 | 638 | (for [id reassign-executors]
|
623 | 639 | [id now-secs]
|
624 | 640 | )))]]
|
625 | 641 | {topology-id (Assignment.
|
626 | 642 | (master-stormdist-root conf topology-id)
|
627 |
| - (select-keys all-node->host (map first (vals executor->node+port))) |
| 643 | + (select-keys all-node->host all-nodes) |
628 | 644 | executor->node+port
|
629 | 645 | start-times)}))]
|
630 | 646 |
|
|
644 | 660 | (newly-added-slots existing-assignment assignment))
|
645 | 661 | (apply concat)
|
646 | 662 | (map (fn [[id port]] (WorkerSlot. id port)))
|
647 |
| - (.assignSlots ^INimbus (:inimbus nimbus) topologies) |
| 663 | + (.assignSlots inimbus topologies) |
648 | 664 | )))
|
649 | 665 |
|
650 | 666 | (defn- start-storm [nimbus storm-name storm-id]
|
|
1085 | 1101 | )
|
1086 | 1102 | (getForcedScheduler [this]
|
1087 | 1103 | nil )
|
| 1104 | + (getHostName [this supervisors node-id] |
| 1105 | + (if-let [^SupervisorDetails supervisor (get supervisors node-id)] |
| 1106 | + (.getHost supervisor))) |
1088 | 1107 | ))
|
1089 | 1108 |
|
1090 | 1109 | (defn -main []
|
|
0 commit comments