|
341 | 341 | all-executors)]
|
342 | 342 | (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
|
343 | 343 |
|
| 344 | +(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors] |
| 345 | + "update all the heartbeats for all the topologies's executors" |
| 346 | + (doseq [[tid assignment] existing-assignments |
| 347 | + :let [all-executors (topology->executors tid)]] |
| 348 | + (update-heartbeats! nimbus tid all-executors assignment))) |
| 349 | + |
344 | 350 | (defn- alive-executors
|
345 | 351 | [nimbus ^TopologyDetails topology-details all-executors existing-assignment]
|
346 | 352 | (let [conf (:conf nimbus)
|
|
398 | 404 | {executor component}))]
|
399 | 405 | executor->component))
|
400 | 406 |
|
| 407 | +(defn- compute-topology->executors [nimbus storm-ids] |
| 408 | + "compute a topology-id -> executors map" |
| 409 | + (into {} (for [tid storm-ids] |
| 410 | + {tid (set (compute-executors nimbus tid))}))) |
| 411 | + |
| 412 | +(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id] |
| 413 | + "compute a topology-id -> alive executors map" |
| 414 | + (into {} (for [[tid assignment] existing-assignments |
| 415 | + :let [topology-details (.getById topologies tid) |
| 416 | + all-executors (topology->executors tid) |
| 417 | + alive-executors (if (and scratch-topology-id (= scratch-topology-id tid)) |
| 418 | + all-executors |
| 419 | + (set (alive-executors nimbus topology-details all-executors assignment)))]] |
| 420 | + {tid alive-executors}))) |
| 421 | + |
| 422 | +(defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors] |
| 423 | + (let [dead-slots (into [] (for [[tid assignment] existing-assignments |
| 424 | + :let [all-executors (topology->executors tid) |
| 425 | + alive-executors (topology->alive-executors tid) |
| 426 | + dead-executors (set/difference all-executors alive-executors) |
| 427 | + dead-slots (->> (:executor->node+port assignment) |
| 428 | + (filter #(contains? dead-executors (first %))) |
| 429 | + vals)]] |
| 430 | + dead-slots)) |
| 431 | + supervisor->dead-ports (->> dead-slots |
| 432 | + (apply concat) |
| 433 | + (map (fn [[sid port]] {sid #{port}})) |
| 434 | + (apply (partial merge-with set/union)))] |
| 435 | + (or supervisor->dead-ports {}))) |
| 436 | + |
| 437 | +(defn- compute-topology->scheduler-assignment [nimbus existing-assignments topology->alive-executors] |
| 438 | + "convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api." |
| 439 | + (into {} (for [[tid assignment] existing-assignments |
| 440 | + :let [alive-executors (topology->alive-executors tid) |
| 441 | + executor->node+port (:executor->node+port assignment) |
| 442 | + executor->slot (into {} (for [[executor [node port]] executor->node+port] |
| 443 | + ;; filter out the dead executors |
| 444 | + (if (contains? alive-executors executor) |
| 445 | + {(ExecutorDetails. (first executor) |
| 446 | + (second executor)) |
| 447 | + (WorkerSlot. node port)} |
| 448 | + {})))]] |
| 449 | + {tid (SchedulerAssignment. tid executor->slot)}))) |
| 450 | + |
| 451 | +(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports] |
| 452 | + "return a map: {topology-id SupervisorDetails}" |
| 453 | + (let [storm-cluster-state (:storm-cluster-state nimbus) |
| 454 | + supervisor-infos (all-supervisor-info storm-cluster-state) |
| 455 | + all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos |
| 456 | + :let [hostname (:hostname supervisor-info) |
| 457 | + scheduler-meta (:scheduler-meta supervisor-info) |
| 458 | + dead-ports (supervisor->dead-ports sid) |
| 459 | + ;; hide the dead-ports from the all-ports |
| 460 | + ;; these dead-ports can be reused in next round of assignments |
| 461 | + all-ports (-> sid |
| 462 | + all-slots |
| 463 | + (set/difference dead-ports) |
| 464 | + ((fn [ports] (map int ports)))) |
| 465 | + supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]] |
| 466 | + {sid supervisor-details}))] |
| 467 | + all-supervisor-details)) |
| 468 | + |
| 469 | +(defn- compute-topology->executor->node+port [scheduler-assignments] |
| 470 | + "convert {topology-id -> SchedulerAssignment} to |
| 471 | + {topology-id -> {executor [node port]}}" |
| 472 | + (map-val (fn [^SchedulerAssignment assignment] |
| 473 | + (->> assignment |
| 474 | + .getExecutorToSlots |
| 475 | + (#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %] |
| 476 | + {[(.getStartTask executor) (.getEndTask executor)] |
| 477 | + [(.getNodeId slot) (.getPort slot)]}))))) |
| 478 | + scheduler-assignments)) |
| 479 | + |
401 | 480 | ;; NEW NOTES
|
402 | 481 | ;; only assign to supervisors who are there and haven't timed out
|
403 | 482 | ;; need to reassign workers with executors that have timed out (will this make it brittle?)
|
|
413 | 492 | (defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
|
414 | 493 | (let [conf (:conf nimbus)
|
415 | 494 | storm-cluster-state (:storm-cluster-state nimbus)
|
416 |
| - task-heartbeats-cache (:task-heartbeats-cache nimbus) |
417 |
| - assignments+dead-slots (into {} (for [[tid assignment] existing-assignments |
418 |
| - :let [storm-conf (read-storm-conf conf tid) |
419 |
| - topology-details (.getById topologies tid) |
420 |
| - all-executors (set (compute-executors nimbus tid)) |
421 |
| - _ (update-heartbeats! nimbus tid all-executors assignment) |
422 |
| - alive-executors (if (and scratch-topology-id (= scratch-topology-id tid)) |
423 |
| - all-executors |
424 |
| - (set (alive-executors nimbus topology-details all-executors assignment))) |
425 |
| - dead-executors (set/difference all-executors alive-executors) |
426 |
| - dead-slots (->> (:executor->node+port assignment) |
427 |
| - (filter #(contains? dead-executors (first %))) |
428 |
| - vals) |
429 |
| - executor->slot (into {} (for [[executor [node port]] (:executor->node+port assignment)] |
430 |
| - ;; filter out the dead executors |
431 |
| - (if (contains? alive-executors executor) |
432 |
| - {(ExecutorDetails. (first executor) |
433 |
| - (second executor)) |
434 |
| - (WorkerSlot. node port)} |
435 |
| - {})))]] |
436 |
| - {tid [(SchedulerAssignment. tid executor->slot) dead-slots]})) |
437 |
| - existing-scheduler-assignments (into {} (for [[tid [assignment _]] assignments+dead-slots] |
438 |
| - {tid assignment})) |
439 |
| - dead-slots (->> assignments+dead-slots |
440 |
| - (map (fn [[tid [_ dead-slots]]] dead-slots)) |
441 |
| - (apply concat) |
442 |
| - (map (fn [[sid port]] {sid #{port}})) |
443 |
| - (apply (partial merge-with set/union))) |
444 |
| - dead-slots (if (nil? dead-slots) {} dead-slots) |
| 495 | + topology->executors (compute-topology->executors nimbus (keys existing-assignments)) |
| 496 | + ;; update the executors heartbeats first. |
| 497 | + _ (update-all-heartbeats! nimbus existing-assignments topology->executors) |
| 498 | + topology->alive-executors (compute-topology->alive-executors nimbus |
| 499 | + existing-assignments |
| 500 | + topologies |
| 501 | + topology->executors |
| 502 | + scratch-topology-id) |
| 503 | + supervisor->dead-ports (compute-supervisor->dead-ports nimbus |
| 504 | + existing-assignments |
| 505 | + topology->executors |
| 506 | + topology->alive-executors) |
| 507 | + topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus |
| 508 | + existing-assignments |
| 509 | + topology->alive-executors) |
445 | 510 | available-slots (->> topologies
|
446 | 511 | .getTopologies
|
447 | 512 | (available-slots nimbus nil)
|
448 |
| - (map (fn [[nodeId port]] {nodeId #{port}})) |
| 513 | + (map (fn [[node-id port]] {node-id #{port}})) |
449 | 514 | (apply merge-with set/union))
|
450 | 515 | assigned-slots (assigned-slots storm-cluster-state)
|
451 | 516 | all-slots (merge-with set/union available-slots assigned-slots)
|
452 |
| - supervisor-infos (all-supervisor-info storm-cluster-state) |
453 |
| - supervisors (into {} (for [[sid supervisor-info] supervisor-infos |
454 |
| - :let [hostname (:hostname supervisor-info) |
455 |
| - scheduler-meta (:scheduler-meta supervisor-info) |
456 |
| - dead-ports (dead-slots sid) |
457 |
| - ;; hide the dead-ports from the all-ports |
458 |
| - ;; these dead-ports can be reused in next round of assignments |
459 |
| - all-ports (-> sid |
460 |
| - all-slots |
461 |
| - (set/difference dead-ports) |
462 |
| - ((fn [ports] (map int ports)))) |
463 |
| - supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]] |
464 |
| - {sid supervisor-details})) |
465 |
| - cluster (Cluster. supervisors existing-scheduler-assignments) |
| 517 | + supervisors (read-all-supervisor-details nimbus all-slots supervisor->dead-ports) |
| 518 | + cluster (Cluster. supervisors topology->scheduler-assignment) |
466 | 519 | scheduler (if (conf STORM-SCHEDULER)
|
467 | 520 | (do
|
468 | 521 | (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
|
|
475 | 528 | _ (.schedule scheduler topologies cluster)
|
476 | 529 | new-scheduler-assignments (.getAssignments cluster)
|
477 | 530 | ;; add more information to convert SchedulerAssignment to Assignment
|
478 |
| - new-topology->executor->node+port (into {} (for [[topology-id assignment] new-scheduler-assignments |
479 |
| - :let [executor->slot (.getExecutorToSlots ^SchedulerAssignment assignment) |
480 |
| - executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot] |
481 |
| - {[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]}))]] |
482 |
| - {topology-id executor->node+port}))] |
| 531 | + new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)] |
483 | 532 | ;; print some useful information.
|
484 | 533 | (doseq [[topology-id executor->node+port] new-topology->executor->node+port
|
485 | 534 | :let [old-executor->node+port (-> topology-id
|
486 | 535 | existing-assignments
|
487 | 536 | :executor->node+port)
|
488 |
| - reassignment (into {} (for [[executor node+port] executor->node+port] |
489 |
| - (if (and (contains? old-executor->node+port executor) |
490 |
| - (= node+port (old-executor->node+port executor))) |
491 |
| - {} |
492 |
| - {executor node+port})))]] |
| 537 | + reassignment (filter (fn [[executor node+port]] |
| 538 | + (and (contains? old-executor->node+port executor) |
| 539 | + (not (= node+port (old-executor->node+port executor))))) |
| 540 | + executor->node+port)]] |
493 | 541 | (when-not (empty? reassignment)
|
494 | 542 | (let [new-slots-cnt (count (set (vals executor->node+port)))
|
495 | 543 | reassign-executors (keys reassignment)]
|
|
0 commit comments