|
163 | 163 | :conf (:conf worker)
|
164 | 164 | :storm-active-atom (:storm-active-atom worker)
|
165 | 165 | :batch-transfer-queue batch-transfer->worker
|
166 |
| - :transfer-fn (fn [task tuple] (disruptor/publish batch-transfer->worker [task tuple])) |
| 166 | + :transfer-fn (fn [task tuple] |
| 167 | + (disruptor/publish batch-transfer->worker [task tuple])) |
167 | 168 | :suicide-fn (:suicide-fn worker)
|
168 | 169 | :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
|
169 | 170 | :type executor-type
|
|
227 | 228 | report-error-and-die (:report-error-and-die executor-data)
|
228 | 229 | component-id (:component-id executor-data)
|
229 | 230 |
|
230 |
| - |
| 231 | + ;; starting the batch-transfer->worker ensures that anything publishing to that queue |
| 232 | + ;; doesn't block (because it's a single threaded queue and the caching/consumer started |
| 233 | + ;; trick isn't thread-safe) |
| 234 | + system-threads [(start-batch-transfer->worker-handler! worker executor-data)] |
231 | 235 | handlers (with-error-reaction report-error-and-die
|
232 | 236 | (mk-threads executor-data task-datas))
|
233 |
| - threads (concat handlers |
234 |
| - [(start-batch-transfer->worker-handler! worker executor-data) |
235 |
| - ])] |
236 |
| - ;;technically this is called twice for bolts, but that's ok |
237 |
| - (disruptor/consumer-started! (:receive-queue executor-data)) |
238 |
| - |
| 237 | + threads (concat handlers system-threads)] |
239 | 238 | (setup-ticks! worker executor-data)
|
240 | 239 |
|
241 | 240 | (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
|
|
251 | 250 | [this]
|
252 | 251 | (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
|
253 | 252 | (disruptor/halt-with-interrupt! (:receive-queue executor-data))
|
254 |
| - (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) |
255 | 253 | (doseq [t threads]
|
256 | 254 | (.interrupt t)
|
257 | 255 | (.join t))
|
| 256 | + ;; must do this after the threads are killed, this ensures that the interrupt message |
| 257 | + ;; goes through properly |
| 258 | + (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) |
258 | 259 |
|
259 | 260 | (doseq [user-context (map :user-context (vals task-datas))]
|
260 | 261 | (doseq [hook (.getHooks user-context)]
|
|
403 | 404 | ))
|
404 | 405 | )))
|
405 | 406 | (log-message "Opened spout " component-id ":" (keys task-datas))
|
406 |
| - ;; TODO: should redesign this to only use one thread |
407 | 407 | [(async-loop
|
408 | 408 | (fn []
|
409 |
| - ;; This design requires that spouts be non-blocking |
410 |
| - (disruptor/consume-batch receive-queue event-handler) |
411 |
| - (if (or (not max-spout-pending) |
412 |
| - (< (.size pending) max-spout-pending)) |
413 |
| - (if-let [active? (wait-fn)] |
414 |
| - (do |
415 |
| - (when-not @last-active |
416 |
| - (reset! last-active true) |
417 |
| - (log-message "Activating spout " component-id ":" (keys task-datas)) |
418 |
| - (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
| 409 | + (disruptor/consumer-started! (:receive-queue executor-data)) |
| 410 | + (fn [] |
| 411 | + ;; This design requires that spouts be non-blocking |
| 412 | + (disruptor/consume-batch receive-queue event-handler) |
| 413 | + (if (or (not max-spout-pending) |
| 414 | + (< (.size pending) max-spout-pending)) |
| 415 | + (if-let [active? (wait-fn)] |
| 416 | + (do |
| 417 | + (when-not @last-active |
| 418 | + (reset! last-active true) |
| 419 | + (log-message "Activating spout " component-id ":" (keys task-datas)) |
| 420 | + (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
419 | 421 |
|
420 |
| - (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) |
421 |
| - (do |
422 |
| - (when @last-active |
423 |
| - (reset! last-active false) |
424 |
| - (log-message "Deactivating spout " component-id ":" (keys task-datas)) |
425 |
| - (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
426 |
| - ;; TODO: log that it's getting throttled |
427 |
| - (Time/sleep 100)))) |
428 |
| - 0) |
| 422 | + (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) |
| 423 | + (do |
| 424 | + (when @last-active |
| 425 | + (reset! last-active false) |
| 426 | + (log-message "Deactivating spout " component-id ":" (keys task-datas)) |
| 427 | + (fast-list-iter [^ISpout spout spouts] (.activate spout))) |
| 428 | + ;; TODO: log that it's getting throttled |
| 429 | + (Time/sleep 100)))) |
| 430 | + 0)) |
429 | 431 | :kill-fn (:report-error-and-die executor-data)
|
430 |
| - ) |
431 |
| - ;; TODO: need to start the consumer |
432 |
| - ] |
| 432 | + :factory? true |
| 433 | + )] |
433 | 434 | ))
|
434 | 435 |
|
435 | 436 | (defn- tuple-time-delta! [^TupleImpl tuple]
|
|
0 commit comments