Skip to content

Commit 921c43e

Browse files
author
Nathan Marz
committed
use separate timers for the worker's system threads so that the heartbeat thread doesn't get blocked
1 parent 25fcbf9 commit 921c43e

File tree

1 file changed

+22
-11
lines changed

1 file changed

+22
-11
lines changed

src/clj/backtype/storm/daemon/worker.clj

+22-11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@
8787
(.put transfer-queue [task tuple]))
8888
))))
8989

90+
(defn mk-halting-timer []
91+
(mk-timer :kill-fn (fn [t]
92+
(log-error t "Error when processing event")
93+
(halt-process! 20 "Error when processing an event")
94+
)))
95+
9096
(defn worker-data [conf mq-context storm-id supervisor-id port worker-id]
9197
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
9298
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
@@ -110,10 +116,9 @@
110116
:task-ids task-ids
111117
:storm-conf storm-conf
112118
:topology (read-supervisor-topology conf storm-id)
113-
:timer (mk-timer :kill-fn (fn [t]
114-
(log-error t "Error when processing event")
115-
(halt-process! 20 "Error when processing an event")
116-
))
119+
:heartbeat-timer (mk-halting-timer)
120+
:refresh-connections-timer (mk-halting-timer)
121+
:refresh-active-timer (mk-halting-timer)
117122
:task->component (storm-task-info storm-cluster-state storm-id)
118123
:endpoint-socket-lock (mk-rw-lock)
119124
:node+port->socket (atom {})
@@ -134,7 +139,7 @@
134139
storm-id (:storm-id worker)]
135140
(fn this
136141
([]
137-
(this (fn [& ignored] (schedule (:timer worker) 0 this))))
142+
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
138143
([callback]
139144
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
140145
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
@@ -170,7 +175,7 @@
170175

171176
(defn refresh-storm-active
172177
([worker]
173-
(refresh-storm-active worker (fn [& ignored] (schedule (:timer worker) 0 (partial refresh-storm-active worker)))))
178+
(refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
174179
([worker callback]
175180
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
176181
(reset!
@@ -248,7 +253,11 @@
248253
(doseq [t threads]
249254
(.interrupt t)
250255
(.join t))
251-
(cancel-timer (:timer worker))
256+
257+
(cancel-timer (:heartbeat-timer worker))
258+
(cancel-timer (:refresh-connections-timer worker))
259+
(cancel-timer (:refresh-active-timer worker))
260+
252261
(log-message "Disconnecting from storm cluster state context")
253262
(.disconnect (:storm-cluster-state worker))
254263
(.close (:cluster-state worker))
@@ -261,12 +270,14 @@
261270
DaemonCommon
262271
(waiting? [this]
263272
(and
264-
(timer-waiting? (:timer worker))
273+
(timer-waiting? (:heartbeat-timer worker))
274+
(timer-waiting? (:refresh-connections-timer worker))
275+
(timer-waiting? (:refresh-active-timer worker))
265276
(every? (memfn waiting?) tasks)))
266277
)]
267-
(schedule-recurring (:timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
268-
(schedule-recurring (:timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
269-
(schedule-recurring (:timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
278+
(schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
279+
(schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
280+
(schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
270281

271282
(log-message "Worker has topology config " (:storm-conf worker))
272283
(log-message "Worker " worker-id " for storm " storm-id " on " supervisor-id ":" port " has finished loading")

0 commit comments

Comments
 (0)