Skip to content

Commit ea2e088

Browse files
author
Nathan Marz
committed
fix conflict
2 parents fbe2047 + ebe8246 commit ea2e088

File tree

9 files changed

+524
-124
lines changed

9 files changed

+524
-124
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

+104-54
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,12 @@
346346
all-executors)]
347347
(swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
348348

349+
(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
350+
"update all the heartbeats for all the topologies's executors"
351+
(doseq [[tid assignment] existing-assignments
352+
:let [all-executors (topology->executors tid)]]
353+
(update-heartbeats! nimbus tid all-executors assignment)))
354+
349355
(defn- alive-executors
350356
[nimbus ^TopologyDetails topology-details all-executors existing-assignment]
351357
(let [conf (:conf nimbus)
@@ -403,6 +409,79 @@
403409
{executor component}))]
404410
executor->component))
405411

412+
(defn- compute-topology->executors [nimbus storm-ids]
413+
"compute a topology-id -> executors map"
414+
(into {} (for [tid storm-ids]
415+
{tid (set (compute-executors nimbus tid))})))
416+
417+
(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id]
418+
"compute a topology-id -> alive executors map"
419+
(into {} (for [[tid assignment] existing-assignments
420+
:let [topology-details (.getById topologies tid)
421+
all-executors (topology->executors tid)
422+
alive-executors (if (and scratch-topology-id (= scratch-topology-id tid))
423+
all-executors
424+
(set (alive-executors nimbus topology-details all-executors assignment)))]]
425+
{tid alive-executors})))
426+
427+
(defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors]
428+
(let [dead-slots (into [] (for [[tid assignment] existing-assignments
429+
:let [all-executors (topology->executors tid)
430+
alive-executors (topology->alive-executors tid)
431+
dead-executors (set/difference all-executors alive-executors)
432+
dead-slots (->> (:executor->node+port assignment)
433+
(filter #(contains? dead-executors (first %)))
434+
vals)]]
435+
dead-slots))
436+
supervisor->dead-ports (->> dead-slots
437+
(apply concat)
438+
(map (fn [[sid port]] {sid #{port}}))
439+
(apply (partial merge-with set/union)))]
440+
(or supervisor->dead-ports {})))
441+
442+
(defn- compute-topology->scheduler-assignment [nimbus existing-assignments topology->alive-executors]
443+
"convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api."
444+
(into {} (for [[tid assignment] existing-assignments
445+
:let [alive-executors (topology->alive-executors tid)
446+
executor->node+port (:executor->node+port assignment)
447+
executor->slot (into {} (for [[executor [node port]] executor->node+port]
448+
;; filter out the dead executors
449+
(if (contains? alive-executors executor)
450+
{(ExecutorDetails. (first executor)
451+
(second executor))
452+
(WorkerSlot. node port)}
453+
{})))]]
454+
{tid (SchedulerAssignment. tid executor->slot)})))
455+
456+
(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports]
457+
"return a map: {topology-id SupervisorDetails}"
458+
(let [storm-cluster-state (:storm-cluster-state nimbus)
459+
supervisor-infos (all-supervisor-info storm-cluster-state)
460+
all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos
461+
:let [hostname (:hostname supervisor-info)
462+
scheduler-meta (:scheduler-meta supervisor-info)
463+
dead-ports (supervisor->dead-ports sid)
464+
;; hide the dead-ports from the all-ports
465+
;; these dead-ports can be reused in next round of assignments
466+
all-ports (-> sid
467+
all-slots
468+
(set/difference dead-ports)
469+
((fn [ports] (map int ports))))
470+
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
471+
{sid supervisor-details}))]
472+
all-supervisor-details))
473+
474+
(defn- compute-topology->executor->node+port [scheduler-assignments]
475+
"convert {topology-id -> SchedulerAssignment} to
476+
{topology-id -> {executor [node port]}}"
477+
(map-val (fn [^SchedulerAssignment assignment]
478+
(->> assignment
479+
.getExecutorToSlot
480+
(#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %]
481+
{[(.getStartTask executor) (.getEndTask executor)]
482+
[(.getNodeId slot) (.getPort slot)]})))))
483+
scheduler-assignments))
484+
406485
;; NEW NOTES
407486
;; only assign to supervisors who are there and haven't timed out
408487
;; need to reassign workers with executors that have timed out (will this make it brittle?)
@@ -418,76 +497,47 @@
418497
(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
419498
(let [conf (:conf nimbus)
420499
storm-cluster-state (:storm-cluster-state nimbus)
421-
task-heartbeats-cache (:task-heartbeats-cache nimbus)
422-
assignments+dead-slots (into {} (for [[tid assignment] existing-assignments
423-
:let [storm-conf (read-storm-conf conf tid)
424-
topology-details (.getById topologies tid)
425-
all-executors (set (compute-executors nimbus tid))
426-
_ (update-heartbeats! nimbus tid all-executors assignment)
427-
alive-executors (if (and scratch-topology-id (= scratch-topology-id tid))
428-
all-executors
429-
(set (alive-executors nimbus topology-details all-executors assignment)))
430-
dead-executors (set/difference all-executors alive-executors)
431-
dead-slots (->> (:executor->node+port assignment)
432-
(filter #(contains? dead-executors (first %)))
433-
vals)
434-
executor->slot (into {} (for [[executor [node port]] (:executor->node+port assignment)]
435-
;; filter out the dead executors
436-
(if (contains? alive-executors executor)
437-
{(ExecutorDetails. (first executor)
438-
(second executor))
439-
(WorkerSlot. node port)}
440-
{})))]]
441-
{tid [(SchedulerAssignment. tid executor->slot) dead-slots]}))
442-
existing-scheduler-assignments (into {} (for [[tid [assignment _]] assignments+dead-slots]
443-
{tid assignment}))
444-
dead-slots (->> assignments+dead-slots
445-
(map (fn [[tid [_ dead-slots]]] dead-slots))
446-
(apply concat)
447-
(map (fn [[sid port]] {sid #{port}}))
448-
(apply (partial merge-with set/union)))
449-
dead-slots (if (nil? dead-slots) {} dead-slots)
500+
topology->executors (compute-topology->executors nimbus (keys existing-assignments))
501+
;; update the executors heartbeats first.
502+
_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
503+
topology->alive-executors (compute-topology->alive-executors nimbus
504+
existing-assignments
505+
topologies
506+
topology->executors
507+
scratch-topology-id)
508+
supervisor->dead-ports (compute-supervisor->dead-ports nimbus
509+
existing-assignments
510+
topology->executors
511+
topology->alive-executors)
512+
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
513+
existing-assignments
514+
topology->alive-executors)
450515
available-slots (->> topologies
451516
.getTopologies
452517
(available-slots nimbus nil)
453-
(map (fn [[nodeId port]] {nodeId #{port}}))
518+
(map (fn [[node-id port]] {node-id #{port}}))
454519
(apply merge-with set/union))
455520
assigned-slots (assigned-slots storm-cluster-state)
456521
all-slots (merge-with set/union available-slots assigned-slots)
457-
supervisor-infos (all-supervisor-info storm-cluster-state)
458-
supervisors (into {} (for [[sid supervisor-info] supervisor-infos
459-
:let [hostname (:hostname supervisor-info)
460-
scheduler-meta (:scheduler-meta supervisor-info)
461-
dead-ports (dead-slots sid)
462-
;; hide the dead-ports from the all-ports
463-
;; these dead-ports can be reused in next round of assignments
464-
all-ports (-> sid
465-
all-slots
466-
(set/difference dead-ports)
467-
((fn [ports] (map int ports))))
468-
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
469-
{sid supervisor-details}))
470-
cluster (Cluster. supervisors existing-scheduler-assignments)
522+
523+
supervisors (read-all-supervisor-details nimbus all-slots supervisor->dead-ports)
524+
cluster (Cluster. supervisors topology->scheduler-assignment)
525+
471526
;; call scheduler.schedule to schedule all the topologies
472527
;; the new assignments for all the topologies are in the cluster object.
473528
_ (.schedule (:scheduler nimbus) topologies cluster)
474529
new-scheduler-assignments (.getAssignments cluster)
475530
;; add more information to convert SchedulerAssignment to Assignment
476-
new-topology->executor->node+port (into {} (for [[topology-id assignment] new-scheduler-assignments
477-
:let [executor->slot (.getExecutorToSlots ^SchedulerAssignment assignment)
478-
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot]
479-
{[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]}))]]
480-
{topology-id executor->node+port}))]
531+
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]
481532
;; print some useful information.
482533
(doseq [[topology-id executor->node+port] new-topology->executor->node+port
483534
:let [old-executor->node+port (-> topology-id
484535
existing-assignments
485536
:executor->node+port)
486-
reassignment (into {} (for [[executor node+port] executor->node+port]
487-
(if (and (contains? old-executor->node+port executor)
488-
(= node+port (old-executor->node+port executor)))
489-
{}
490-
{executor node+port})))]]
537+
reassignment (filter (fn [[executor node+port]]
538+
(and (contains? old-executor->node+port executor)
539+
(not (= node+port (old-executor->node+port executor)))))
540+
executor->node+port)]]
491541
(when-not (empty? reassignment)
492542
(let [new-slots-cnt (count (set (vals executor->node+port)))
493543
reassign-executors (keys reassignment)]
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,44 @@
11
(ns backtype.storm.scheduler.DefaultScheduler
2-
(:use [backtype.storm util])
2+
(:use [backtype.storm util config])
33
(:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler])
44
(:import [backtype.storm.scheduler IScheduler Topologies
5-
Cluster TopologyDetails WorkerSlot SchedulerAssignment])
5+
Cluster TopologyDetails WorkerSlot SchedulerAssignment
6+
EvenScheduler ExecutorDetails])
67
(:gen-class
78
:implements [backtype.storm.scheduler.IScheduler]))
89

9-
(defn- keeper-slots [existing-slots num-executors num-workers]
10+
(defn- bad-slots [existing-slots num-executors num-workers]
1011
(if (= 0 num-workers)
11-
{}
12+
'()
1213
(let [distribution (atom (integer-divided num-executors num-workers))
1314
keepers (atom {})]
1415
(doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
1516
(when (pos? (get @distribution executor-count 0))
1617
(swap! keepers assoc node+port executor-list)
1718
(swap! distribution update-in [executor-count] dec)
1819
))
19-
@keepers
20-
)))
20+
(->> @keepers
21+
keys
22+
(apply dissoc existing-slots)
23+
keys
24+
(map (fn [[node port]]
25+
(WorkerSlot. node port)))))))
2126

2227

2328
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
24-
(EvenScheduler/schedule-topologies-evenly topologies cluster keeper-slots))
29+
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
30+
(doseq [^TopologyDetails topology needs-scheduling-topologies
31+
:let [topology-id (.getId topology)
32+
topology-conf (.getConf topology)
33+
available-slots (->> (.getAvailableSlots cluster)
34+
(map #(vector (.getNodeId %) (.getPort %))))
35+
all-executors (->> topology
36+
.getExecutors
37+
(map #(vector (.getStartTask %) (.getEndTask %)))
38+
set)
39+
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
40+
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
41+
(+ (count available-slots) (count alive-assigned)))
42+
bad-slots (bad-slots alive-assigned (count all-executors) total-slots-to-use)]]
43+
(.freeSlots cluster bad-slots)
44+
(EvenScheduler/schedule-topologies-evenly topologies cluster))))

0 commit comments

Comments
 (0)