Skip to content

Commit ecd26e5

Browse files
author
Nathan Marz
committed
Implemented pluggable spout wait strategies
The wait strategy is invoked when all tasks in an executor emit nothing in nextTuple or when a spout hits the MAX_SPOUT_PENDING limit. The default strategy is to sleep for 1 ms.
1 parent bab56fd commit ecd26e5

File tree

11 files changed

+144
-22
lines changed

11 files changed

+144
-22
lines changed

conf/defaults.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,7 @@ topology.transfer.buffer.size: 1024 # batched
9494
topology.tick.tuple.freq.secs: null
9595
topology.worker.shared.thread.pool.size: 4
9696
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
97+
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
98+
topology.sleep.spout.wait.strategy.time.ms: 1
9799

98100
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

src/clj/backtype/storm/bootstrap.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
RotatingMap RotatingMap$ExpiredCallback
1111
BufferFileInputStream
1212
RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
13-
MutableObject]))
13+
MutableObject MutableLong]))
1414
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
1515
(import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
1616
(import (quote [backtype.storm.tuple Tuple TupleImpl Fields MessageId]))

src/clj/backtype/storm/daemon/executor.clj

+39-19
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(:use [backtype.storm bootstrap])
44
(:import [backtype.storm.hooks ITaskHook])
55
(:import [backtype.storm.tuple Tuple])
6+
(:import [backtype.storm.spout ISpoutWaitStrategy])
67
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
78
EmitInfo BoltFailInfo BoltAckInfo])
89
(:require [backtype.storm [tuple :as tuple]])
@@ -128,6 +129,8 @@
128129
TOPOLOGY-MAX-TASK-PARALLELISM
129130
TOPOLOGY-TRANSACTIONAL-ID
130131
TOPOLOGY-TICK-TUPLE-FREQ-SECS
132+
TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
133+
TOPOLOGY-SPOUT-WAIT-STRATEGY
131134
)
132135
spec-conf (-> general-context
133136
(.getComponentCommon component-id)
@@ -319,9 +322,16 @@
319322
(let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
320323
(if p (* p num-tasks))))
321324

325+
(defn init-spout-wait-strategy [storm-conf]
326+
(let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)]
327+
(.prepare ret storm-conf)
328+
ret
329+
))
330+
322331
(defmethod mk-threads :spout [executor-data task-datas]
323332
(let [wait-fn (fn [] @(:storm-active-atom executor-data))
324333
storm-conf (:storm-conf executor-data)
334+
^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
325335
last-active (atom false)
326336
component-id (:component-id executor-data)
327337
max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
@@ -360,12 +370,15 @@
360370
))))
361371
receive-queue (:receive-queue executor-data)
362372
event-handler (mk-task-receiver executor-data tuple-action-fn)
363-
has-ackers? (has-ackers? storm-conf)]
373+
has-ackers? (has-ackers? storm-conf)
374+
emitted-count (MutableLong. 0)
375+
empty-emit-streak (MutableLong. 0)]
364376
(log-message "Opening spout " component-id ":" (keys task-datas))
365377
(doseq [[task-id task-data] task-datas
366378
:let [^ISpout spout-obj (:object task-data)
367379
tasks-fn (:tasks-fn task-data)
368380
send-spout-msg (fn [out-stream-id values message-id out-task-id]
381+
(.increment emitted-count)
369382
(let [out-tasks (if out-task-id
370383
(tasks-fn out-task-id out-stream-id values)
371384
(tasks-fn out-stream-id values))
@@ -396,7 +409,7 @@
396409
{:stream out-stream-id :values values}
397410
(if (sampler) 0))))
398411
(or out-tasks [])
399-
))]]
412+
))]]
400413
(.open spout-obj
401414
storm-conf
402415
(:user-context task-data)
@@ -420,24 +433,31 @@
420433
(fn []
421434
;; This design requires that spouts be non-blocking
422435
(disruptor/consume-batch receive-queue event-handler)
423-
(if (or (not max-spout-pending)
424-
(< (.size pending) max-spout-pending))
425-
(if-let [active? (wait-fn)]
426-
(do
427-
(when-not @last-active
428-
(reset! last-active true)
429-
(log-message "Activating spout " component-id ":" (keys task-datas))
430-
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
436+
(let [active? (wait-fn)
437+
curr-count (.get emitted-count)]
438+
(if (or (not max-spout-pending)
439+
(< (.size pending) max-spout-pending))
440+
(if active?
441+
(do
442+
(when-not @last-active
443+
(reset! last-active true)
444+
(log-message "Activating spout " component-id ":" (keys task-datas))
445+
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
431446

432-
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
433-
(do
434-
(when @last-active
435-
(reset! last-active false)
436-
(log-message "Deactivating spout " component-id ":" (keys task-datas))
437-
(fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
438-
;; TODO: log that it's getting throttled
439-
(Time/sleep 100))))
440-
0))
447+
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
448+
(do
449+
(when @last-active
450+
(reset! last-active false)
451+
(log-message "Deactivating spout " component-id ":" (keys task-datas))
452+
(fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
453+
;; TODO: log that it's getting throttled
454+
(Time/sleep 100))))
455+
(if (and (= curr-count (.get emitted-count)) active?)
456+
(do (.increment empty-emit-streak)
457+
(.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
458+
(.set empty-emit-streak 0)
459+
))
460+
0 ))
441461
:kill-fn (:report-error-and-die executor-data)
442462
:factory? true
443463
)]

src/clj/backtype/storm/daemon/nimbus.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747

4848
(conf STORM-SCHEDULER)
4949
(do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
50-
(-> (conf STORM-SCHEDULER) (Class/forName) .newInstance))
50+
(-> (conf STORM-SCHEDULER) new-instance))
5151

5252
:else
5353
(do (log-message "Using default scheduler")

src/clj/backtype/storm/disruptor.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
(defn- mk-wait-strategy [spec]
2626
(if (keyword? spec)
2727
((WAIT-STRATEGY spec))
28-
(-> (str spec) Class/forName .newInstance)
28+
(-> (str spec) new-instance)
2929
))
3030

3131
;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.

src/clj/backtype/storm/util.clj

+5
Original file line numberDiff line numberDiff line change
@@ -808,3 +808,8 @@
808808
^List curr (get-with-default ret key (ArrayList.))]
809809
(.add curr e)))
810810
ret ))
811+
812+
(defn new-instance [klass]
813+
(let [klass (if (string? klass) (Class/forName klass) klass)]
814+
(.newInstance klass)
815+
))

src/jvm/backtype/storm/Config.java

+14
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,20 @@ public class Config extends HashMap<String, Object> {
411411
*/
412412
public static String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
413413

414+
/**
415+
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
416+
* triggered in one of two conditions:
417+
*
418+
* 1. nextTuple emits no tuples
419+
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
420+
*/
421+
public static String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
422+
423+
/**
424+
* The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
425+
*/
426+
public static String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
427+
414428
/**
415429
* The maximum amount of time a component gives a source of state to synchronize before it requests
416430
* synchronization again.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package backtype.storm.spout;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* The strategy a spout needs to use when its waiting. Waiting is
7+
* triggered in one of two conditions:
8+
*
9+
* 1. nextTuple emits no tuples
10+
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
11+
*
12+
* The default strategy sleeps for one millisecond.
13+
*/
14+
public interface ISpoutWaitStrategy {
15+
void prepare(Map conf);
16+
void emptyEmit(long streak);
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package backtype.storm.spout;
2+
3+
import java.util.Map;
4+
5+
public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
6+
@Override
7+
public void emptyEmit(long streak) {
8+
}
9+
10+
@Override
11+
public void prepare(Map conf) {
12+
throw new UnsupportedOperationException("Not supported yet.");
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package backtype.storm.spout;
2+
3+
import backtype.storm.Config;
4+
import java.util.Map;
5+
6+
7+
public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {
8+
9+
long sleepMillis;
10+
11+
@Override
12+
public void prepare(Map conf) {
13+
sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
14+
}
15+
16+
@Override
17+
public void emptyEmit(long streak) {
18+
try {
19+
Thread.sleep(sleepMillis);
20+
} catch (InterruptedException e) {
21+
throw new RuntimeException(e);
22+
}
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package backtype.storm.utils;
2+
3+
public class MutableLong {
4+
long val;
5+
6+
public MutableLong(long val) {
7+
this.val = val;
8+
}
9+
10+
public void set(long val) {
11+
this.val = val;
12+
}
13+
14+
public long get() {
15+
return val;
16+
}
17+
18+
public long increment() {
19+
return increment(1);
20+
}
21+
22+
public long increment(long amt) {
23+
val+=amt;
24+
return val;
25+
}
26+
}

0 commit comments

Comments
 (0)