Skip to content

Commit debb8da

Browse files
author
Nathan Marz
committed
fix disruptor cache / consumer started problem for single threaded claim strategy
1 parent 2892dd8 commit debb8da

File tree

5 files changed

+45
-36
lines changed

5 files changed

+45
-36
lines changed

src/clj/backtype/storm/daemon/executor.clj

+33-32
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@
163163
:conf (:conf worker)
164164
:storm-active-atom (:storm-active-atom worker)
165165
:batch-transfer-queue batch-transfer->worker
166-
:transfer-fn (fn [task tuple] (disruptor/publish batch-transfer->worker [task tuple]))
166+
:transfer-fn (fn [task tuple]
167+
(disruptor/publish batch-transfer->worker [task tuple]))
167168
:suicide-fn (:suicide-fn worker)
168169
:storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
169170
:type executor-type
@@ -227,15 +228,13 @@
227228
report-error-and-die (:report-error-and-die executor-data)
228229
component-id (:component-id executor-data)
229230

230-
231+
;; starting the batch-transfer->worker ensures that anything publishing to that queue
232+
;; doesn't block (because it's a single threaded queue and the caching/consumer started
233+
;; trick isn't thread-safe)
234+
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
231235
handlers (with-error-reaction report-error-and-die
232236
(mk-threads executor-data task-datas))
233-
threads (concat handlers
234-
[(start-batch-transfer->worker-handler! worker executor-data)
235-
])]
236-
;;technically this is called twice for bolts, but that's ok
237-
(disruptor/consumer-started! (:receive-queue executor-data))
238-
237+
threads (concat handlers system-threads)]
239238
(setup-ticks! worker executor-data)
240239

241240
(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
@@ -251,10 +250,12 @@
251250
[this]
252251
(log-message "Shutting down executor " component-id ":" (pr-str executor-id))
253252
(disruptor/halt-with-interrupt! (:receive-queue executor-data))
254-
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
255253
(doseq [t threads]
256254
(.interrupt t)
257255
(.join t))
256+
;; must do this after the threads are killed, this ensures that the interrupt message
257+
;; goes through properly
258+
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
258259

259260
(doseq [user-context (map :user-context (vals task-datas))]
260261
(doseq [hook (.getHooks user-context)]
@@ -403,33 +404,33 @@
403404
))
404405
)))
405406
(log-message "Opened spout " component-id ":" (keys task-datas))
406-
;; TODO: should redesign this to only use one thread
407407
[(async-loop
408408
(fn []
409-
;; This design requires that spouts be non-blocking
410-
(disruptor/consume-batch receive-queue event-handler)
411-
(if (or (not max-spout-pending)
412-
(< (.size pending) max-spout-pending))
413-
(if-let [active? (wait-fn)]
414-
(do
415-
(when-not @last-active
416-
(reset! last-active true)
417-
(log-message "Activating spout " component-id ":" (keys task-datas))
418-
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
409+
(disruptor/consumer-started! (:receive-queue executor-data))
410+
(fn []
411+
;; This design requires that spouts be non-blocking
412+
(disruptor/consume-batch receive-queue event-handler)
413+
(if (or (not max-spout-pending)
414+
(< (.size pending) max-spout-pending))
415+
(if-let [active? (wait-fn)]
416+
(do
417+
(when-not @last-active
418+
(reset! last-active true)
419+
(log-message "Activating spout " component-id ":" (keys task-datas))
420+
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
419421

420-
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
421-
(do
422-
(when @last-active
423-
(reset! last-active false)
424-
(log-message "Deactivating spout " component-id ":" (keys task-datas))
425-
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
426-
;; TODO: log that it's getting throttled
427-
(Time/sleep 100))))
428-
0)
422+
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
423+
(do
424+
(when @last-active
425+
(reset! last-active false)
426+
(log-message "Deactivating spout " component-id ":" (keys task-datas))
427+
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
428+
;; TODO: log that it's getting throttled
429+
(Time/sleep 100))))
430+
0))
429431
:kill-fn (:report-error-and-die executor-data)
430-
)
431-
;; TODO: need to start the consumer
432-
]
432+
:factory? true
433+
)]
433434
))
434435

435436
(defn- tuple-time-delta! [^TupleImpl tuple]

src/clj/backtype/storm/daemon/task.clj

+2
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@
150150
storm-conf (:storm-conf executor-data)]
151151
(doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
152152
(.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
153+
;; when this is called, the threads for the executor haven't been started yet,
154+
;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
153155
(send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
154156
task-data
155157
))

src/clj/backtype/storm/disruptor.clj

+2-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
(fn []
5656
(consume-batch-when-available queue handler)
5757
0 )
58-
:kill-fn kill-fn)]
58+
:kill-fn kill-fn
59+
)]
5960
(consumer-started! queue)
6061
ret
6162
))

src/jvm/backtype/storm/tuple/TupleImpl.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package backtype.storm.tuple;
22

33
import backtype.storm.task.GeneralTopologyContext;
4-
import backtype.storm.task.TopologyContext;
54
import java.util.List;
65

76
public class TupleImpl extends Tuple {

src/jvm/backtype/storm/utils/DisruptorQueue.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.lmax.disruptor.RingBuffer;
88
import com.lmax.disruptor.Sequence;
99
import com.lmax.disruptor.SequenceBarrier;
10+
import com.lmax.disruptor.SingleThreadedClaimStrategy;
1011
import com.lmax.disruptor.WaitStrategy;
1112
import java.util.concurrent.ConcurrentLinkedQueue;
1213

@@ -32,6 +33,9 @@ public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
3233
_consumer = new Sequence();
3334
_barrier = _buffer.newBarrier();
3435
_buffer.setGatingSequences(_consumer);
36+
if(claim instanceof SingleThreadedClaimStrategy) {
37+
consumerStartedFlag = true;
38+
}
3539
}
3640

3741
public void consumeBatch(EventHandler<Object> handler) {
@@ -95,8 +99,10 @@ public void publish(Object obj) {
9599
}
96100

97101
public void consumerStarted() {
98-
consumerStartedFlag = true;
99-
flushCache();
102+
if(!consumerStartedFlag) {
103+
consumerStartedFlag = true;
104+
flushCache();
105+
}
100106
}
101107

102108
private void flushCache() {

0 commit comments

Comments
 (0)