|
326 | 326 | ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
|
327 | 327 | ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
|
328 | 328 | ;; tracked through heartbeat-cache
|
329 |
| -(defn- update-executor-cache [curr hb] |
| 329 | +(defn- update-executor-cache [curr hb timeout] |
330 | 330 | (let [reported-time (:time-secs hb)
|
331 | 331 | {last-nimbus-time :nimbus-time
|
332 | 332 | last-reported-time :executor-reported-time} curr
|
|
338 | 338 | (current-time-secs)
|
339 | 339 | last-nimbus-time
|
340 | 340 | )]
|
341 |
| - {:nimbus-time nimbus-time |
| 341 | + {:is-timed-out (and |
| 342 | + nimbus-time |
| 343 | + (>= (time-delta nimbus-time) timeout)) |
| 344 | + :nimbus-time nimbus-time |
342 | 345 | :executor-reported-time reported-time}))
|
343 | 346 |
|
344 |
| -(defn update-heartbeat-cache [cache executor-beats all-executors] |
| 347 | +(defn update-heartbeat-cache [cache executor-beats all-executors timeout] |
345 | 348 | (let [cache (select-keys cache all-executors)]
|
346 | 349 | (into {}
|
347 | 350 | (for [executor all-executors :let [curr (cache executor)]]
|
348 | 351 | [executor
|
349 |
| - (update-executor-cache curr (get executor-beats executor))] |
| 352 | + (update-executor-cache curr (get executor-beats executor) timeout)] |
350 | 353 | ))))
|
351 | 354 |
|
352 | 355 | (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
|
|
355 | 358 | executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))
|
356 | 359 | cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
|
357 | 360 | executor-beats
|
358 |
| - all-executors)] |
| 361 | + all-executors |
| 362 | + ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))] |
359 | 363 | (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
|
360 | 364 |
|
361 | 365 | (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
|
|
380 | 384 | (->> all-executors
|
381 | 385 | (filter (fn [executor]
|
382 | 386 | (let [start-time (get executor-start-times executor)
|
383 |
| - nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)] |
| 387 | + is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)] |
384 | 388 | (if (and start-time
|
385 | 389 | (or
|
386 | 390 | (< (time-delta start-time)
|
387 | 391 | (conf NIMBUS-TASK-LAUNCH-SECS))
|
388 |
| - (not nimbus-time) |
389 |
| - (< (time-delta nimbus-time) |
390 |
| - (conf NIMBUS-TASK-TIMEOUT-SECS)) |
| 392 | + (not is-timed-out) |
391 | 393 | ))
|
392 | 394 | true
|
393 | 395 | (do
|
|
0 commit comments