|
346 | 346 | all-executors)]
|
347 | 347 | (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
|
348 | 348 |
|
| 349 | +(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors] |
| 350 | + "update all the heartbeats for all the topologies's executors" |
| 351 | + (doseq [[tid assignment] existing-assignments |
| 352 | + :let [all-executors (topology->executors tid)]] |
| 353 | + (update-heartbeats! nimbus tid all-executors assignment))) |
| 354 | + |
349 | 355 | (defn- alive-executors
|
350 | 356 | [nimbus ^TopologyDetails topology-details all-executors existing-assignment]
|
351 | 357 | (let [conf (:conf nimbus)
|
|
403 | 409 | {executor component}))]
|
404 | 410 | executor->component))
|
405 | 411 |
|
| 412 | +(defn- compute-topology->executors [nimbus storm-ids] |
| 413 | + "compute a topology-id -> executors map" |
| 414 | + (into {} (for [tid storm-ids] |
| 415 | + {tid (set (compute-executors nimbus tid))}))) |
| 416 | + |
| 417 | +(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id] |
| 418 | + "compute a topology-id -> alive executors map" |
| 419 | + (into {} (for [[tid assignment] existing-assignments |
| 420 | + :let [topology-details (.getById topologies tid) |
| 421 | + all-executors (topology->executors tid) |
| 422 | + alive-executors (if (and scratch-topology-id (= scratch-topology-id tid)) |
| 423 | + all-executors |
| 424 | + (set (alive-executors nimbus topology-details all-executors assignment)))]] |
| 425 | + {tid alive-executors}))) |
| 426 | + |
| 427 | +(defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors] |
| 428 | + (let [dead-slots (into [] (for [[tid assignment] existing-assignments |
| 429 | + :let [all-executors (topology->executors tid) |
| 430 | + alive-executors (topology->alive-executors tid) |
| 431 | + dead-executors (set/difference all-executors alive-executors) |
| 432 | + dead-slots (->> (:executor->node+port assignment) |
| 433 | + (filter #(contains? dead-executors (first %))) |
| 434 | + vals)]] |
| 435 | + dead-slots)) |
| 436 | + supervisor->dead-ports (->> dead-slots |
| 437 | + (apply concat) |
| 438 | + (map (fn [[sid port]] {sid #{port}})) |
| 439 | + (apply (partial merge-with set/union)))] |
| 440 | + (or supervisor->dead-ports {}))) |
| 441 | + |
| 442 | +(defn- compute-topology->scheduler-assignment [nimbus existing-assignments topology->alive-executors] |
| 443 | + "convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api." |
| 444 | + (into {} (for [[tid assignment] existing-assignments |
| 445 | + :let [alive-executors (topology->alive-executors tid) |
| 446 | + executor->node+port (:executor->node+port assignment) |
| 447 | + executor->slot (into {} (for [[executor [node port]] executor->node+port] |
| 448 | + ;; filter out the dead executors |
| 449 | + (if (contains? alive-executors executor) |
| 450 | + {(ExecutorDetails. (first executor) |
| 451 | + (second executor)) |
| 452 | + (WorkerSlot. node port)} |
| 453 | + {})))]] |
| 454 | + {tid (SchedulerAssignment. tid executor->slot)}))) |
| 455 | + |
| 456 | +(defn- read-all-supervisor-details [nimbus all-slots supervisor->dead-ports] |
| 457 | + "return a map: {topology-id SupervisorDetails}" |
| 458 | + (let [storm-cluster-state (:storm-cluster-state nimbus) |
| 459 | + supervisor-infos (all-supervisor-info storm-cluster-state) |
| 460 | + all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos |
| 461 | + :let [hostname (:hostname supervisor-info) |
| 462 | + scheduler-meta (:scheduler-meta supervisor-info) |
| 463 | + dead-ports (supervisor->dead-ports sid) |
| 464 | + ;; hide the dead-ports from the all-ports |
| 465 | + ;; these dead-ports can be reused in next round of assignments |
| 466 | + all-ports (-> sid |
| 467 | + all-slots |
| 468 | + (set/difference dead-ports) |
| 469 | + ((fn [ports] (map int ports)))) |
| 470 | + supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]] |
| 471 | + {sid supervisor-details}))] |
| 472 | + all-supervisor-details)) |
| 473 | + |
| 474 | +(defn- compute-topology->executor->node+port [scheduler-assignments] |
| 475 | + "convert {topology-id -> SchedulerAssignment} to |
| 476 | + {topology-id -> {executor [node port]}}" |
| 477 | + (map-val (fn [^SchedulerAssignment assignment] |
| 478 | + (->> assignment |
| 479 | + .getExecutorToSlot |
| 480 | + (#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %] |
| 481 | + {[(.getStartTask executor) (.getEndTask executor)] |
| 482 | + [(.getNodeId slot) (.getPort slot)]}))))) |
| 483 | + scheduler-assignments)) |
| 484 | + |
406 | 485 | ;; NEW NOTES
|
407 | 486 | ;; only assign to supervisors who are there and haven't timed out
|
408 | 487 | ;; need to reassign workers with executors that have timed out (will this make it brittle?)
|
|
418 | 497 | (defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
|
419 | 498 | (let [conf (:conf nimbus)
|
420 | 499 | storm-cluster-state (:storm-cluster-state nimbus)
|
421 |
| - task-heartbeats-cache (:task-heartbeats-cache nimbus) |
422 |
| - assignments+dead-slots (into {} (for [[tid assignment] existing-assignments |
423 |
| - :let [storm-conf (read-storm-conf conf tid) |
424 |
| - topology-details (.getById topologies tid) |
425 |
| - all-executors (set (compute-executors nimbus tid)) |
426 |
| - _ (update-heartbeats! nimbus tid all-executors assignment) |
427 |
| - alive-executors (if (and scratch-topology-id (= scratch-topology-id tid)) |
428 |
| - all-executors |
429 |
| - (set (alive-executors nimbus topology-details all-executors assignment))) |
430 |
| - dead-executors (set/difference all-executors alive-executors) |
431 |
| - dead-slots (->> (:executor->node+port assignment) |
432 |
| - (filter #(contains? dead-executors (first %))) |
433 |
| - vals) |
434 |
| - executor->slot (into {} (for [[executor [node port]] (:executor->node+port assignment)] |
435 |
| - ;; filter out the dead executors |
436 |
| - (if (contains? alive-executors executor) |
437 |
| - {(ExecutorDetails. (first executor) |
438 |
| - (second executor)) |
439 |
| - (WorkerSlot. node port)} |
440 |
| - {})))]] |
441 |
| - {tid [(SchedulerAssignment. tid executor->slot) dead-slots]})) |
442 |
| - existing-scheduler-assignments (into {} (for [[tid [assignment _]] assignments+dead-slots] |
443 |
| - {tid assignment})) |
444 |
| - dead-slots (->> assignments+dead-slots |
445 |
| - (map (fn [[tid [_ dead-slots]]] dead-slots)) |
446 |
| - (apply concat) |
447 |
| - (map (fn [[sid port]] {sid #{port}})) |
448 |
| - (apply (partial merge-with set/union))) |
449 |
| - dead-slots (if (nil? dead-slots) {} dead-slots) |
| 500 | + topology->executors (compute-topology->executors nimbus (keys existing-assignments)) |
| 501 | + ;; update the executors heartbeats first. |
| 502 | + _ (update-all-heartbeats! nimbus existing-assignments topology->executors) |
| 503 | + topology->alive-executors (compute-topology->alive-executors nimbus |
| 504 | + existing-assignments |
| 505 | + topologies |
| 506 | + topology->executors |
| 507 | + scratch-topology-id) |
| 508 | + supervisor->dead-ports (compute-supervisor->dead-ports nimbus |
| 509 | + existing-assignments |
| 510 | + topology->executors |
| 511 | + topology->alive-executors) |
| 512 | + topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus |
| 513 | + existing-assignments |
| 514 | + topology->alive-executors) |
450 | 515 | available-slots (->> topologies
|
451 | 516 | .getTopologies
|
452 | 517 | (available-slots nimbus nil)
|
453 |
| - (map (fn [[nodeId port]] {nodeId #{port}})) |
| 518 | + (map (fn [[node-id port]] {node-id #{port}})) |
454 | 519 | (apply merge-with set/union))
|
455 | 520 | assigned-slots (assigned-slots storm-cluster-state)
|
456 | 521 | all-slots (merge-with set/union available-slots assigned-slots)
|
457 |
| - supervisor-infos (all-supervisor-info storm-cluster-state) |
458 |
| - supervisors (into {} (for [[sid supervisor-info] supervisor-infos |
459 |
| - :let [hostname (:hostname supervisor-info) |
460 |
| - scheduler-meta (:scheduler-meta supervisor-info) |
461 |
| - dead-ports (dead-slots sid) |
462 |
| - ;; hide the dead-ports from the all-ports |
463 |
| - ;; these dead-ports can be reused in next round of assignments |
464 |
| - all-ports (-> sid |
465 |
| - all-slots |
466 |
| - (set/difference dead-ports) |
467 |
| - ((fn [ports] (map int ports)))) |
468 |
| - supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]] |
469 |
| - {sid supervisor-details})) |
470 |
| - cluster (Cluster. supervisors existing-scheduler-assignments) |
| 522 | + |
| 523 | + supervisors (read-all-supervisor-details nimbus all-slots supervisor->dead-ports) |
| 524 | + cluster (Cluster. supervisors topology->scheduler-assignment) |
| 525 | + |
471 | 526 | ;; call scheduler.schedule to schedule all the topologies
|
472 | 527 | ;; the new assignments for all the topologies are in the cluster object.
|
473 | 528 | _ (.schedule (:scheduler nimbus) topologies cluster)
|
474 | 529 | new-scheduler-assignments (.getAssignments cluster)
|
475 | 530 | ;; add more information to convert SchedulerAssignment to Assignment
|
476 |
| - new-topology->executor->node+port (into {} (for [[topology-id assignment] new-scheduler-assignments |
477 |
| - :let [executor->slot (.getExecutorToSlots ^SchedulerAssignment assignment) |
478 |
| - executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot] |
479 |
| - {[(.getStartTask executor) (.getEndTask executor)] [(.getNodeId slot) (.getPort slot)]}))]] |
480 |
| - {topology-id executor->node+port}))] |
| 531 | + new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)] |
481 | 532 | ;; print some useful information.
|
482 | 533 | (doseq [[topology-id executor->node+port] new-topology->executor->node+port
|
483 | 534 | :let [old-executor->node+port (-> topology-id
|
484 | 535 | existing-assignments
|
485 | 536 | :executor->node+port)
|
486 |
| - reassignment (into {} (for [[executor node+port] executor->node+port] |
487 |
| - (if (and (contains? old-executor->node+port executor) |
488 |
| - (= node+port (old-executor->node+port executor))) |
489 |
| - {} |
490 |
| - {executor node+port})))]] |
| 537 | + reassignment (filter (fn [[executor node+port]] |
| 538 | + (and (contains? old-executor->node+port executor) |
| 539 | + (not (= node+port (old-executor->node+port executor))))) |
| 540 | + executor->node+port)]] |
491 | 541 | (when-not (empty? reassignment)
|
492 | 542 | (let [new-slots-cnt (count (set (vals executor->node+port)))
|
493 | 543 | reassign-executors (keys reassignment)]
|
|
0 commit comments