Skip to content

Commit 57fb6ce

Browse files
author
Nathan Marz
committed
compute node->host through inimbus so it works smoother in environments where supervisors don't exist until topologies exist
1 parent a44a08e commit 57fb6ce

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,6 @@
254254
supervisor-ids))
255255
)))
256256

257-
(defn get-node->host [storm-cluster-state callback]
258-
(->> (all-supervisor-info storm-cluster-state callback)
259-
(map-val :hostname)))
260-
261257
(defn- available-slots
262258
[nimbus callback topologies]
263259
(let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -583,6 +579,14 @@
583579
set)]
584580
(set/difference new-slots old-slots)))
585581

582+
583+
(defn basic-supervisor-details-map [storm-cluster-state]
584+
(let [infos (all-supervisor-info storm-cluster-state)]
585+
(->> infos
586+
(map (fn [[id info]]
587+
[id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil)]))
588+
(into {}))))
589+
586590
;; get existing assignment (just the executor->node+port map) -> default to {}
587591
;; filter out ones which have a executor timeout
588592
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
@@ -591,7 +595,7 @@
591595
(defnk mk-assignments [nimbus :scratch-topology-id nil]
592596
(let [conf (:conf nimbus)
593597
storm-cluster-state (:storm-cluster-state nimbus)
594-
node->host (get-node->host storm-cluster-state nil)
598+
^INimbus inimbus (:inimbus nimbus)
595599
;; read all the topologies
596600
topology-ids (.active-storms storm-cluster-state)
597601
topologies (into {} (for [tid topology-ids]
@@ -611,20 +615,32 @@
611615
existing-assignments
612616
topologies
613617
scratch-topology-id)
618+
619+
614620
now-secs (current-time-secs)
621+
622+
basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
623+
615624
;; construct the final Assignments by adding start-times etc into it
616625
new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
617626
:let [existing-assignment (get existing-assignments topology-id)
618-
all-node->host (merge (:node->host existing-assignment) node->host)
619-
reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
620-
start-times (merge (:executor->start-time-secs existing-assignment)
627+
all-nodes (->> executor->node+port vals (map first) set)
628+
node->host (->> all-nodes
629+
(mapcat (fn [node]
630+
(if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
631+
[[node host]]
632+
)))
633+
(into {}))
634+
all-node->host (merge (:node->host existing-assignment) node->host)
635+
reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
636+
start-times (merge (:executor->start-time-secs existing-assignment)
621637
(into {}
622638
(for [id reassign-executors]
623639
[id now-secs]
624640
)))]]
625641
{topology-id (Assignment.
626642
(master-stormdist-root conf topology-id)
627-
(select-keys all-node->host (map first (vals executor->node+port)))
643+
(select-keys all-node->host all-nodes)
628644
executor->node+port
629645
start-times)}))]
630646

@@ -644,7 +660,7 @@
644660
(newly-added-slots existing-assignment assignment))
645661
(apply concat)
646662
(map (fn [[id port]] (WorkerSlot. id port)))
647-
(.assignSlots ^INimbus (:inimbus nimbus) topologies)
663+
(.assignSlots inimbus topologies)
648664
)))
649665

650666
(defn- start-storm [nimbus storm-name storm-id]
@@ -1085,6 +1101,9 @@
10851101
)
10861102
(getForcedScheduler [this]
10871103
nil )
1104+
(getHostName [this supervisors node-id]
1105+
(if-let [^SupervisorDetails supervisor (get supervisors node-id)]
1106+
(.getHost supervisor)))
10881107
))
10891108

10901109
(defn -main []

src/jvm/backtype/storm/scheduler/INimbus.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,8 @@ public interface INimbus {
1818
// this should be called after the assignment is changed in ZK
1919
void assignSlots(Topologies topologies, Collection<WorkerSlot> newSlots);
2020

21+
// map from node id to supervisor details
22+
String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId);
23+
2124
IScheduler getForcedScheduler();
2225
}

0 commit comments

Comments
 (0)