Skip to content

Commit a0bc262

Browse files
committed
Merge pull request nathanmarz#706 from d2r/d2r-nimbus-hb-check-timeout-on-update
Do the worker HB timeout check when HB's are updated
2 parents 3985de7 + edbb17c commit a0bc262

File tree

1 file changed

+11
-9
lines changed
  • storm-core/src/clj/backtype/storm/daemon

1 file changed

+11
-9
lines changed

storm-core/src/clj/backtype/storm/daemon/nimbus.clj

+11-9
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@
326326
;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
327327
;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
328328
;; tracked through heartbeat-cache
329-
(defn- update-executor-cache [curr hb]
329+
(defn- update-executor-cache [curr hb timeout]
330330
(let [reported-time (:time-secs hb)
331331
{last-nimbus-time :nimbus-time
332332
last-reported-time :executor-reported-time} curr
@@ -338,15 +338,18 @@
338338
(current-time-secs)
339339
last-nimbus-time
340340
)]
341-
{:nimbus-time nimbus-time
341+
{:is-timed-out (and
342+
nimbus-time
343+
(>= (time-delta nimbus-time) timeout))
344+
:nimbus-time nimbus-time
342345
:executor-reported-time reported-time}))
343346

344-
(defn update-heartbeat-cache [cache executor-beats all-executors]
347+
(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
345348
(let [cache (select-keys cache all-executors)]
346349
(into {}
347350
(for [executor all-executors :let [curr (cache executor)]]
348351
[executor
349-
(update-executor-cache curr (get executor-beats executor))]
352+
(update-executor-cache curr (get executor-beats executor) timeout)]
350353
))))
351354

352355
(defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
@@ -355,7 +358,8 @@
355358
executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))
356359
cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
357360
executor-beats
358-
all-executors)]
361+
all-executors
362+
((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))]
359363
(swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
360364

361365
(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
@@ -380,14 +384,12 @@
380384
(->> all-executors
381385
(filter (fn [executor]
382386
(let [start-time (get executor-start-times executor)
383-
nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
387+
is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
384388
(if (and start-time
385389
(or
386390
(< (time-delta start-time)
387391
(conf NIMBUS-TASK-LAUNCH-SECS))
388-
(not nimbus-time)
389-
(< (time-delta nimbus-time)
390-
(conf NIMBUS-TASK-TIMEOUT-SECS))
392+
(not is-timed-out)
391393
))
392394
true
393395
(do

0 commit comments

Comments
 (0)