|
1 | 1 | (ns backtype.storm.daemon.task
|
2 | 2 | (:use [backtype.storm.daemon common])
|
3 | 3 | (:use [backtype.storm bootstrap])
|
4 |
| - (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap]) |
| 4 | + (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap LinkedBlockingQueue]) |
5 | 5 | (:import [backtype.storm.hooks ITaskHook])
|
| 6 | + (:import [backtype.storm.tuple Tuple]) |
6 | 7 | (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
|
7 | 8 | EmitInfo BoltFailInfo BoltAckInfo])
|
8 | 9 | (:require [backtype.storm [tuple :as tuple]]))
|
|
156 | 157 | (.getThisTaskId topology-context)
|
157 | 158 | stream))))
|
158 | 159 |
|
159 |
| -(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn] |
| 160 | +(defn mk-task [conf storm-conf topology-context user-context storm-id cluster-state storm-active-atom transfer-fn suicide-fn |
| 161 | + receive-queue] |
160 | 162 | (let [task-id (.getThisTaskId topology-context)
|
| 163 | + worker-port (.getThisWorkerPort topology-context) |
161 | 164 | component-id (.getThisComponentId topology-context)
|
162 | 165 | storm-conf (component-conf storm-conf topology-context component-id)
|
163 | 166 | _ (log-message "Loading task " component-id ":" task-id)
|
|
196 | 199 |
|
197 | 200 | stream->component->grouper (outbound-components topology-context user-context)
|
198 | 201 | component->tasks (reverse-map task-info)
|
199 |
| - ;; important it binds to virtual port before function returns |
200 |
| - puller (msg/bind mq-context storm-id task-id) |
201 |
| - |
| 202 | + |
202 | 203 | ;; TODO: consider DRYing things up and moving stats
|
203 | 204 | task-readable-name (get-readable-name topology-context)
|
204 | 205 |
|
|
239 | 240 | _ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"])
|
240 | 241 | executor-threads (dofor
|
241 | 242 | [exec (with-error-reaction report-error-and-die
|
242 |
| - (mk-executors task-object storm-conf puller tasks-fn |
| 243 | + (mk-executors task-object storm-conf receive-queue tasks-fn |
243 | 244 | transfer-fn
|
244 | 245 | storm-active-atom topology-context
|
245 | 246 | user-context task-stats report-error))]
|
|
254 | 255 | [this]
|
255 | 256 | (log-message "Shutting down task " storm-id ":" task-id)
|
256 | 257 | (reset! active false)
|
257 |
| - ;; empty messages are skip messages (this unblocks the socket) |
258 |
| - (msg/send-local-task-empty mq-context storm-id task-id) |
| 258 | + ;; put an empty message into receive-queue |
| 259 | + ;; empty messages are skip messages (this unblocks the receive-queue.take thread) |
| 260 | + (.put receive-queue (byte-array [])) |
259 | 261 | (doseq [t all-threads]
|
260 | 262 | (.interrupt t)
|
261 | 263 | (.join t))
|
262 | 264 | (doseq [hook (.getHooks user-context)]
|
263 | 265 | (.cleanup hook))
|
264 | 266 | (.remove-task-heartbeat! storm-cluster-state storm-id task-id)
|
265 | 267 | (.disconnect storm-cluster-state)
|
266 |
| - (.close puller) |
267 | 268 | (close-component task-object)
|
268 | 269 | (log-message "Shut down task " storm-id ":" task-id))
|
269 | 270 | DaemonCommon
|
|
290 | 291 | (stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
|
291 | 292 | ))
|
292 | 293 |
|
293 |
| -(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom |
| 294 | +(defn mk-task-receiver [^LinkedBlockingQueue receive-queue ^KryoTupleDeserializer deserializer tuple-action-fn] |
| 295 | + (fn [] |
| 296 | + (let [msg (.take receive-queue) |
| 297 | + is-tuple? (instance? Tuple msg)] |
| 298 | + (when (or is-tuple? (not (empty? msg))) ; skip empty messages (used during shutdown) |
| 299 | + (log-debug "Processing message " msg) |
| 300 | + (let [^Tuple tuple (if is-tuple? msg (.deserialize deserializer msg))] |
| 301 | + (tuple-action-fn tuple) |
| 302 | + )) |
| 303 | + ))) |
| 304 | + |
| 305 | +(defmethod mk-executors ISpout [^ISpout spout storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom |
294 | 306 | ^TopologyContext topology-context ^TopologyContext user-context
|
295 | 307 | task-stats report-error-fn]
|
296 | 308 | (let [wait-fn (fn [] @storm-active-atom)
|
|
349 | 361 | )
|
350 | 362 | (reportError [this error]
|
351 | 363 | (report-error-fn error)
|
352 |
| - ))] |
| 364 | + )) |
| 365 | + tuple-action-fn (fn [^Tuple tuple] |
| 366 | + (let [id (.getValue tuple 0) |
| 367 | + [spout-id tuple-finished-info start-time-ms] (.remove pending id)] |
| 368 | + (when spout-id |
| 369 | + (let [time-delta (time-delta-ms start-time-ms)] |
| 370 | + (condp = (.getSourceStreamId tuple) |
| 371 | + ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id |
| 372 | + tuple-finished-info time-delta task-stats sampler)) |
| 373 | + ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id |
| 374 | + tuple-finished-info time-delta task-stats sampler)) |
| 375 | + ))) |
| 376 | + ;; TODO: on failure, emit tuple to failure stream |
| 377 | + ))] |
353 | 378 | (log-message "Opening spout " component-id ":" task-id)
|
354 | 379 | (.open spout storm-conf user-context (SpoutOutputCollector. output-collector))
|
355 | 380 | (log-message "Opened spout " component-id ":" task-id)
|
| 381 | + ;; TODO: should redesign this to only use one thread |
356 | 382 | [(fn []
|
357 | 383 | ;; This design requires that spouts be non-blocking
|
358 | 384 | (loop []
|
|
377 | 403 | ;; TODO: log that it's getting throttled
|
378 | 404 | (Time/sleep 100)))
|
379 | 405 | ))
|
380 |
| - (fn [] |
381 |
| - (let [^bytes ser-msg (msg/recv puller)] |
382 |
| - ;; skip empty messages (used during shutdown) |
383 |
| - (when-not (empty? ser-msg) |
384 |
| - (let [tuple (.deserialize deserializer ser-msg) |
385 |
| - id (.getValue tuple 0) |
386 |
| - [spout-id tuple-finished-info start-time-ms] (.remove pending id)] |
387 |
| - (when spout-id |
388 |
| - (let [time-delta (time-delta-ms start-time-ms)] |
389 |
| - (condp = (.getSourceStreamId tuple) |
390 |
| - ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id |
391 |
| - tuple-finished-info time-delta task-stats sampler)) |
392 |
| - ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id |
393 |
| - tuple-finished-info time-delta task-stats sampler)) |
394 |
| - )))) |
395 |
| - ;; TODO: on failure, emit tuple to failure stream |
396 |
| - ))) |
| 406 | + (mk-task-receiver receive-queue deserializer tuple-action-fn) |
397 | 407 | ]
|
398 | 408 | ))
|
399 | 409 |
|
|
405 | 415 | ;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
|
406 | 416 | (.put pending key (bit-xor curr id))))
|
407 | 417 |
|
408 |
| -(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom |
| 418 | +(defmethod mk-executors IBolt [^IBolt bolt storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom |
409 | 419 | ^TopologyContext topology-context ^TopologyContext user-context
|
410 | 420 | task-stats report-error-fn]
|
411 | 421 | (let [deserializer (KryoTupleDeserializer. storm-conf topology-context)
|
|
466 | 476 | )))
|
467 | 477 | (reportError [this error]
|
468 | 478 | (report-error-fn error)
|
469 |
| - ))] |
| 479 | + )) |
| 480 | + tuple-action-fn (fn [^Tuple tuple] |
| 481 | + ;; synchronization needs to be done with a key provided by this bolt, otherwise: |
| 482 | + ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update |
| 483 | + ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization |
| 484 | + ;; buffer other tuples until fully synchronized, then process all of those tuples |
| 485 | + ;; then go into normal loop |
| 486 | + ;; spill to disk? |
| 487 | + ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task |
| 488 | + ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests |
| 489 | + ;; or just timeout the sync messages that are coming in until full sync is hit from that task |
| 490 | + ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates |
| 491 | + ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state |
| 492 | + ;; TODO: how to handle incremental updates as well as synchronizations at same time |
| 493 | + ;; TODO: need to version tuples somehow |
| 494 | + |
| 495 | + (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context)) |
| 496 | + (.put tuple-start-times tuple (System/currentTimeMillis)) |
| 497 | + |
| 498 | + (.execute bolt tuple))] |
470 | 499 | (log-message "Preparing bolt " component-id ":" task-id)
|
471 | 500 | (.prepare bolt
|
472 | 501 | storm-conf
|
473 | 502 | user-context
|
474 | 503 | (OutputCollector. output-collector))
|
475 | 504 | (log-message "Prepared bolt " component-id ":" task-id)
|
476 | 505 | ;; TODO: can get any SubscribedState objects out of the context now
|
477 |
| - [(fn [] |
478 |
| - ;; synchronization needs to be done with a key provided by this bolt, otherwise: |
479 |
| - ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update |
480 |
| - ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization |
481 |
| - ;; buffer other tuples until fully synchronized, then process all of those tuples |
482 |
| - ;; then go into normal loop |
483 |
| - ;; spill to disk? |
484 |
| - ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task |
485 |
| - ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests |
486 |
| - ;; or just timeout the sync messages that are coming in until full sync is hit from that task |
487 |
| - ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates |
488 |
| - (let [^bytes ser (msg/recv puller)] |
489 |
| - (when-not (empty? ser) ; skip empty messages (used during shutdown) |
490 |
| - (log-debug "Processing message") |
491 |
| - (let [tuple (.deserialize deserializer ser)] |
492 |
| - ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state |
493 |
| - ;; TODO: how to handle incremental updates as well as synchronizations at same time |
494 |
| - ;; TODO: need to version tuples somehow |
495 |
| - (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context)) |
496 |
| - (.put tuple-start-times tuple (System/currentTimeMillis)) |
497 |
| - |
498 |
| - (.execute bolt tuple) |
499 |
| - ))))] |
| 506 | + [(mk-task-receiver receive-queue deserializer tuple-action-fn)] |
500 | 507 | ))
|
501 | 508 |
|
502 | 509 | (defmethod close-component ISpout [spout]
|
|
0 commit comments