|
292 | 292 | (stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
|
293 | 293 | ))
|
294 | 294 |
|
295 |
| -(defmacro with-received-tuple [[^LinkedBlockingQueue receive-queue deserializer tuple-sym] & body] |
296 |
| - `(let [msg# (.take ~receive-queue) |
297 |
| - is-ser-msg?# (not (instance? Tuple msg#)) |
298 |
| - is-empty-msg?# (or (nil? msg#) (and is-ser-msg?# (empty? msg#)))] |
299 |
| - (when-not is-empty-msg?# ; skip empty messages (used during shutdown) |
300 |
| - (log-debug "Processing message") |
301 |
| - (let [~tuple-sym (if is-ser-msg?# |
302 |
| - (.deserialize ~deserializer msg#) |
303 |
| - msg#)] |
304 |
| - ~@body |
305 |
| - )) |
306 |
| - )) |
| 295 | +(defn mk-task-receiver [^LinkedBlockingQueue receive-queue ^KryoTupleDeserializer deserializer tuple-action-fn] |
| 296 | + (fn [] |
| 297 | + (let [msg (.take receive-queue) |
| 298 | + is-ser-msg? (not (instance? Tuple msg)) |
| 299 | + is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))] |
| 300 | + (when-not is-empty-msg? ; skip empty messages (used during shutdown) |
| 301 | + (log-debug "Processing message") |
| 302 | + (let [^Tuple tuple (if is-ser-msg? |
| 303 | + (.deserialize deserializer msg) |
| 304 | + msg)] |
| 305 | + (tuple-action-fn tuple) |
| 306 | + )) |
| 307 | + ))) |
307 | 308 |
|
308 | 309 | (defmethod mk-executors ISpout [^ISpout spout storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom
|
309 | 310 | ^TopologyContext topology-context ^TopologyContext user-context
|
|
364 | 365 | )
|
365 | 366 | (reportError [this error]
|
366 | 367 | (report-error-fn error)
|
367 |
| - ))] |
| 368 | + )) |
| 369 | + tuple-action-fn (fn [^Tuple tuple] |
| 370 | + (let [id (.getValue tuple 0) |
| 371 | + [spout-id tuple-finished-info start-time-ms] (.remove pending id)] |
| 372 | + (when spout-id |
| 373 | + (let [time-delta (time-delta-ms start-time-ms)] |
| 374 | + (condp = (.getSourceStreamId tuple) |
| 375 | + ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id |
| 376 | + tuple-finished-info time-delta task-stats sampler)) |
| 377 | + ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id |
| 378 | + tuple-finished-info time-delta task-stats sampler)) |
| 379 | + ))) |
| 380 | + ;; TODO: on failure, emit tuple to failure stream |
| 381 | + ))] |
368 | 382 | (log-message "Opening spout " component-id ":" task-id)
|
369 | 383 | (.open spout storm-conf user-context (SpoutOutputCollector. output-collector))
|
370 | 384 | (log-message "Opened spout " component-id ":" task-id)
|
|
392 | 406 | ;; TODO: log that it's getting throttled
|
393 | 407 | (Time/sleep 100)))
|
394 | 408 | ))
|
395 |
| - (fn [] |
396 |
| - (with-received-tuple [receive-queue deserializer tuple] |
397 |
| - (let [id (.getValue tuple 0) |
398 |
| - [spout-id tuple-finished-info start-time-ms] (.remove pending id)] |
399 |
| - (when spout-id |
400 |
| - (let [time-delta (time-delta-ms start-time-ms)] |
401 |
| - (condp = (.getSourceStreamId tuple) |
402 |
| - ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id |
403 |
| - tuple-finished-info time-delta task-stats sampler)) |
404 |
| - ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id |
405 |
| - tuple-finished-info time-delta task-stats sampler)) |
406 |
| - ))) |
407 |
| - ;; TODO: on failure, emit tuple to failure stream |
408 |
| - ))) |
| 409 | + (mk-task-receiver receive-queue deserializer tuple-action-fn) |
409 | 410 | ]
|
410 | 411 | ))
|
411 | 412 |
|
|
478 | 479 | )))
|
479 | 480 | (reportError [this error]
|
480 | 481 | (report-error-fn error)
|
481 |
| - ))] |
| 482 | + )) |
| 483 | + tuple-action-fn (fn [^Tuple tuple] |
| 484 | + ;; synchronization needs to be done with a key provided by this bolt, otherwise: |
| 485 | + ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update |
| 486 | + ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization |
| 487 | + ;; buffer other tuples until fully synchronized, then process all of those tuples |
| 488 | + ;; then go into normal loop |
| 489 | + ;; spill to disk? |
| 490 | + ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task |
| 491 | + ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests |
| 492 | + ;; or just timeout the sync messages that are coming in until full sync is hit from that task |
| 493 | + ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates |
| 494 | + ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state |
| 495 | + ;; TODO: how to handle incremental updates as well as synchronizations at same time |
| 496 | + ;; TODO: need to version tuples somehow |
| 497 | + |
| 498 | + (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context)) |
| 499 | + (.put tuple-start-times tuple (System/currentTimeMillis)) |
| 500 | + |
| 501 | + (.execute bolt tuple))] |
482 | 502 | (log-message "Preparing bolt " component-id ":" task-id)
|
483 | 503 | (.prepare bolt
|
484 | 504 | storm-conf
|
485 | 505 | user-context
|
486 | 506 | (OutputCollector. output-collector))
|
487 | 507 | (log-message "Prepared bolt " component-id ":" task-id)
|
488 | 508 | ;; TODO: can get any SubscribedState objects out of the context now
|
489 |
| - [(fn [] |
490 |
| - ;; synchronization needs to be done with a key provided by this bolt, otherwise: |
491 |
| - ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update |
492 |
| - ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization |
493 |
| - ;; buffer other tuples until fully synchronized, then process all of those tuples |
494 |
| - ;; then go into normal loop |
495 |
| - ;; spill to disk? |
496 |
| - ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task |
497 |
| - ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests |
498 |
| - ;; or just timeout the sync messages that are coming in until full sync is hit from that task |
499 |
| - ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates |
500 |
| - (with-received-tuple [receive-queue deserializer tuple] |
501 |
| - ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state |
502 |
| - ;; TODO: how to handle incremental updates as well as synchronizations at same time |
503 |
| - ;; TODO: need to version tuples somehow |
504 |
| - (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context)) |
505 |
| - (.put tuple-start-times tuple (System/currentTimeMillis)) |
506 |
| - |
507 |
| - (.execute bolt tuple) |
508 |
| - ))] |
| 509 | + [(mk-task-receiver receive-queue deserializer tuple-action-fn)] |
509 | 510 | ))
|
510 | 511 |
|
511 | 512 | (defmethod close-component ISpout [spout]
|
|
0 commit comments