|
87 | 87 | (.put transfer-queue [task tuple]))
|
88 | 88 | ))))
|
89 | 89 |
|
| 90 | +(defn mk-halting-timer [] |
| 91 | + (mk-timer :kill-fn (fn [t] |
| 92 | + (log-error t "Error when processing event") |
| 93 | + (halt-process! 20 "Error when processing an event") |
| 94 | + ))) |
| 95 | + |
90 | 96 | (defn worker-data [conf mq-context storm-id supervisor-id port worker-id]
|
91 | 97 | (let [cluster-state (cluster/mk-distributed-cluster-state conf)
|
92 | 98 | storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
|
|
110 | 116 | :task-ids task-ids
|
111 | 117 | :storm-conf storm-conf
|
112 | 118 | :topology (read-supervisor-topology conf storm-id)
|
113 |
| - :timer (mk-timer :kill-fn (fn [t] |
114 |
| - (log-error t "Error when processing event") |
115 |
| - (halt-process! 20 "Error when processing an event") |
116 |
| - )) |
| 119 | + :heartbeat-timer (mk-halting-timer) |
| 120 | + :refresh-connections-timer (mk-halting-timer) |
| 121 | + :refresh-active-timer (mk-halting-timer) |
117 | 122 | :task->component (storm-task-info storm-cluster-state storm-id)
|
118 | 123 | :endpoint-socket-lock (mk-rw-lock)
|
119 | 124 | :node+port->socket (atom {})
|
|
134 | 139 | storm-id (:storm-id worker)]
|
135 | 140 | (fn this
|
136 | 141 | ([]
|
137 |
| - (this (fn [& ignored] (schedule (:timer worker) 0 this)))) |
| 142 | + (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) |
138 | 143 | ([callback]
|
139 | 144 | (let [assignment (.assignment-info storm-cluster-state storm-id callback)
|
140 | 145 | my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
|
|
170 | 175 |
|
171 | 176 | (defn refresh-storm-active
|
172 | 177 | ([worker]
|
173 |
| - (refresh-storm-active worker (fn [& ignored] (schedule (:timer worker) 0 (partial refresh-storm-active worker))))) |
| 178 | + (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))))) |
174 | 179 | ([worker callback]
|
175 | 180 | (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
|
176 | 181 | (reset!
|
|
248 | 253 | (doseq [t threads]
|
249 | 254 | (.interrupt t)
|
250 | 255 | (.join t))
|
251 |
| - (cancel-timer (:timer worker)) |
| 256 | + |
| 257 | + (cancel-timer (:heartbeat-timer worker)) |
| 258 | + (cancel-timer (:refresh-connections-timer worker)) |
| 259 | + (cancel-timer (:refresh-active-timer worker)) |
| 260 | + |
252 | 261 | (log-message "Disconnecting from storm cluster state context")
|
253 | 262 | (.disconnect (:storm-cluster-state worker))
|
254 | 263 | (.close (:cluster-state worker))
|
|
261 | 270 | DaemonCommon
|
262 | 271 | (waiting? [this]
|
263 | 272 | (and
|
264 |
| - (timer-waiting? (:timer worker)) |
| 273 | + (timer-waiting? (:heartbeat-timer worker)) |
| 274 | + (timer-waiting? (:refresh-connections-timer worker)) |
| 275 | + (timer-waiting? (:refresh-active-timer worker)) |
265 | 276 | (every? (memfn waiting?) tasks)))
|
266 | 277 | )]
|
267 |
| - (schedule-recurring (:timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections) |
268 |
| - (schedule-recurring (:timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) |
269 |
| - (schedule-recurring (:timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) |
| 278 | + (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections) |
| 279 | + (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) |
| 280 | + (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) |
270 | 281 |
|
271 | 282 | (log-message "Worker has topology config " (:storm-conf worker))
|
272 | 283 | (log-message "Worker " worker-id " for storm " storm-id " on " supervisor-id ":" port " has finished loading")
|
|
0 commit comments