|
3 | 3 | (:use [backtype.storm bootstrap])
|
4 | 4 | (:import [backtype.storm.hooks ITaskHook])
|
5 | 5 | (:import [backtype.storm.tuple Tuple])
|
| 6 | + (:import [backtype.storm.spout ISpoutWaitStrategy]) |
6 | 7 | (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
|
7 | 8 | EmitInfo BoltFailInfo BoltAckInfo])
|
8 | 9 | (:require [backtype.storm [tuple :as tuple]])
|
|
128 | 129 | TOPOLOGY-MAX-TASK-PARALLELISM
|
129 | 130 | TOPOLOGY-TRANSACTIONAL-ID
|
130 | 131 | TOPOLOGY-TICK-TUPLE-FREQ-SECS
|
| 132 | + TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS |
| 133 | + TOPOLOGY-SPOUT-WAIT-STRATEGY |
131 | 134 | )
|
132 | 135 | spec-conf (-> general-context
|
133 | 136 | (.getComponentCommon component-id)
|
|
319 | 322 | (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
|
320 | 323 | (if p (* p num-tasks))))
|
321 | 324 |
|
| 325 | +(defn init-spout-wait-strategy [storm-conf] |
| 326 | + (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)] |
| 327 | + (.prepare ret storm-conf) |
| 328 | + ret |
| 329 | + )) |
| 330 | + |
322 | 331 | (defmethod mk-threads :spout [executor-data task-datas]
|
323 | 332 | (let [wait-fn (fn [] @(:storm-active-atom executor-data))
|
324 | 333 | storm-conf (:storm-conf executor-data)
|
| 334 | + ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf) |
325 | 335 | last-active (atom false)
|
326 | 336 | component-id (:component-id executor-data)
|
327 | 337 | max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
|
|
360 | 370 | ))))
|
361 | 371 | receive-queue (:receive-queue executor-data)
|
362 | 372 | event-handler (mk-task-receiver executor-data tuple-action-fn)
|
363 |
| - has-ackers? (has-ackers? storm-conf)] |
| 373 | + has-ackers? (has-ackers? storm-conf) |
| 374 | + emitted-count (MutableLong. 0) |
| 375 | + empty-emit-streak (MutableLong. 0)] |
364 | 376 | (log-message "Opening spout " component-id ":" (keys task-datas))
|
365 | 377 | (doseq [[task-id task-data] task-datas
|
366 | 378 | :let [^ISpout spout-obj (:object task-data)
|
367 | 379 | tasks-fn (:tasks-fn task-data)
|
368 | 380 | send-spout-msg (fn [out-stream-id values message-id out-task-id]
|
| 381 | + (.increment emitted-count) |
369 | 382 | (let [out-tasks (if out-task-id
|
370 | 383 | (tasks-fn out-task-id out-stream-id values)
|
371 | 384 | (tasks-fn out-stream-id values))
|
|
396 | 409 | {:stream out-stream-id :values values}
|
397 | 410 | (if (sampler) 0))))
|
398 | 411 | (or out-tasks [])
|
399 |
| - ))]] |
| 412 | + ))]] |
400 | 413 | (.open spout-obj
|
401 | 414 | storm-conf
|
402 | 415 | (:user-context task-data)
|
|
420 | 433 | (fn []
|
421 | 434 | ;; This design requires that spouts be non-blocking
|
422 | 435 | (disruptor/consume-batch receive-queue event-handler)
|
423 |
| - (if (or (not max-spout-pending) |
424 |
| - (< (.size pending) max-spout-pending)) |
425 |
| - (if-let [active? (wait-fn)] |
426 |
| - (do |
427 |
| - (when-not @last-active |
428 |
| - (reset! last-active true) |
429 |
| - (log-message "Activating spout " component-id ":" (keys task-datas)) |
430 |
| - (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
| 436 | + (let [active? (wait-fn) |
| 437 | + curr-count (.get emitted-count)] |
| 438 | + (if (or (not max-spout-pending) |
| 439 | + (< (.size pending) max-spout-pending)) |
| 440 | + (if active? |
| 441 | + (do |
| 442 | + (when-not @last-active |
| 443 | + (reset! last-active true) |
| 444 | + (log-message "Activating spout " component-id ":" (keys task-datas)) |
| 445 | + (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
431 | 446 |
|
432 |
| - (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) |
433 |
| - (do |
434 |
| - (when @last-active |
435 |
| - (reset! last-active false) |
436 |
| - (log-message "Deactivating spout " component-id ":" (keys task-datas)) |
437 |
| - (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) |
438 |
| - ;; TODO: log that it's getting throttled |
439 |
| - (Time/sleep 100)))) |
440 |
| - 0)) |
| 447 | + (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) |
| 448 | + (do |
| 449 | + (when @last-active |
| 450 | + (reset! last-active false) |
| 451 | + (log-message "Deactivating spout " component-id ":" (keys task-datas)) |
| 452 | + (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) |
| 453 | + ;; TODO: log that it's getting throttled |
| 454 | + (Time/sleep 100)))) |
| 455 | + (if (and (= curr-count (.get emitted-count)) active?) |
| 456 | + (do (.increment empty-emit-streak) |
| 457 | + (.emptyEmit spout-wait-strategy (.get empty-emit-streak))) |
| 458 | + (.set empty-emit-streak 0) |
| 459 | + )) |
| 460 | + 0 )) |
441 | 461 | :kill-fn (:report-error-and-die executor-data)
|
442 | 462 | :factory? true
|
443 | 463 | )]
|
|
0 commit comments