Skip to content

Commit b4b4b16

Browse files
author
Nathan Marz
committed
added activate/deactivate lifecycle methods on spout
1 parent 541c59d commit b4b4b16

File tree

9 files changed

+78
-18
lines changed

9 files changed

+78
-18
lines changed

src/clj/backtype/storm/daemon/task.clj

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@
274274
^TopologyContext topology-context ^TopologyContext user-context
275275
task-stats report-error-fn]
276276
(let [wait-fn (fn [] @storm-active-atom)
277+
last-active (atom false)
277278
task-id (.getThisTaskId topology-context)
278279
component-id (.getThisComponentId topology-context)
279280
max-spout-pending (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)
@@ -341,10 +342,20 @@
341342
))
342343
(if (or (not max-spout-pending)
343344
(< (.size pending) max-spout-pending))
344-
(if (wait-fn)
345-
(.nextTuple spout)
346-
(Time/sleep 100))
347-
;; TODO: log that it's getting throttled
345+
(if-let [active? (wait-fn)]
346+
(do
347+
(when-not @last-active
348+
(reset! last-active true)
349+
(log-message "Activating spout " component-id ":" task-id)
350+
(.activate spout))
351+
(.nextTuple spout))
352+
(do
353+
(when @last-active
354+
(reset! last-active false)
355+
(log-message "Deactivating spout " component-id ":" task-id)
356+
(.deactivate spout))
357+
;; TODO: log that it's getting throttled
358+
(Time/sleep 100)))
348359
))
349360
(fn []
350361
(let [^bytes ser-msg (msg/recv puller)]

src/jvm/backtype/storm/clojure/ClojureSpout.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,23 @@ public Map<String, Object> getComponentConfiguration() {
114114
} catch (Exception e) {
115115
throw new RuntimeException(e);
116116
}
117-
}
117+
}
118+
119+
@Override
120+
public void activate() {
121+
try {
122+
_spout.activate();
123+
} catch(AbstractMethodError ame) {
124+
125+
}
126+
}
127+
128+
@Override
129+
public void deactivate() {
130+
try {
131+
_spout.deactivate();
132+
} catch(AbstractMethodError ame) {
133+
134+
}
135+
}
118136
}

src/jvm/backtype/storm/drpc/DRPCSpout.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import backtype.storm.generated.DistributedRPCInvocations;
77
import backtype.storm.spout.SpoutOutputCollector;
88
import backtype.storm.task.TopologyContext;
9-
import backtype.storm.topology.IRichSpout;
109
import backtype.storm.topology.OutputFieldsDeclarer;
10+
import backtype.storm.topology.base.BaseRichSpout;
1111
import backtype.storm.tuple.Fields;
1212
import backtype.storm.tuple.Values;
1313
import backtype.storm.utils.ServiceRegistry;
@@ -20,7 +20,7 @@
2020
import org.apache.thrift7.TException;
2121
import org.json.simple.JSONValue;
2222

23-
public class DRPCSpout implements IRichSpout {
23+
public class DRPCSpout extends BaseRichSpout {
2424
public static Logger LOG = Logger.getLogger(DRPCSpout.class);
2525

2626
SpoutOutputCollector _collector;
@@ -146,9 +146,4 @@ public void fail(Object msgId) {
146146
public void declareOutputFields(OutputFieldsDeclarer declarer) {
147147
declarer.declare(new Fields("args", "return-info"));
148148
}
149-
150-
@Override
151-
public Map<String, Object> getComponentConfiguration() {
152-
return null;
153-
}
154149
}

src/jvm/backtype/storm/spout/ISpout.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ public interface ISpout extends Serializable {
4747
* killed when running Storm in local mode.</p>
4848
*/
4949
void close();
50+
51+
/**
52+
* Called when a spout has been activated out of a deactivated mode.
53+
* nextTuple will be called on this spout soon. A spout can become activated
54+
* after having been deactivated when the topology is manipulated using the
55+
* `storm` client.
56+
*/
57+
void activate();
58+
59+
/**
60+
* Called when a spout has been deactivated. nextTuple will not be called while
61+
* a spout is deactivated. The spout may or may not be reactivated in the future.
62+
*/
63+
void deactivate();
5064

5165
/**
5266
* When this method is called, Storm is requesting that the Spout emit tuples to the

src/jvm/backtype/storm/spout/ShellSpout.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.log4j.Logger;
1111
import org.json.simple.JSONObject;
1212

13-
import static backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING;
1413

1514
public class ShellSpout implements ISpout {
1615
public static Logger LOG = Logger.getLogger(ShellSpout.class);
@@ -105,4 +104,12 @@ private void querySubprocess(Object query) {
105104
throw new RuntimeException(e);
106105
}
107106
}
107+
108+
@Override
109+
public void activate() {
110+
}
111+
112+
@Override
113+
public void deactivate() {
114+
}
108115
}

src/jvm/backtype/storm/testing/FeederSpout.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.Map;
55
import backtype.storm.spout.SpoutOutputCollector;
66
import backtype.storm.task.TopologyContext;
7-
import backtype.storm.topology.IRichSpout;
7+
import backtype.storm.topology.base.BaseRichSpout;
88
import backtype.storm.tuple.Fields;
99
import backtype.storm.utils.InprocMessaging;
1010
import backtype.storm.utils.Utils;
@@ -13,7 +13,7 @@
1313
import java.util.UUID;
1414

1515

16-
public class FeederSpout implements IRichSpout {
16+
public class FeederSpout extends BaseRichSpout {
1717
private int _id;
1818
private Fields _outFields;
1919
private SpoutOutputCollector _collector;

src/jvm/backtype/storm/testing/FixedTupleSpout.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,12 @@ public void fail(Object msgId) {
131131
failed.put(_id, curr+1);
132132
}
133133
}
134+
135+
@Override
136+
public void activate() {
137+
}
138+
139+
@Override
140+
public void deactivate() {
141+
}
134142
}

src/jvm/backtype/storm/topology/base/BaseRichSpout.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout
1515
public void close() {
1616
}
1717

18+
@Override
19+
public void activate() {
20+
}
21+
22+
@Override
23+
public void deactivate() {
24+
}
25+
1826
@Override
1927
public void ack(Object msgId) {
2028
}

src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import backtype.storm.spout.SpoutOutputCollector;
55
import backtype.storm.task.TopologyContext;
66
import backtype.storm.topology.FailedException;
7-
import backtype.storm.topology.IRichSpout;
87
import backtype.storm.topology.OutputFieldsDeclarer;
8+
import backtype.storm.topology.base.BaseRichSpout;
99
import backtype.storm.transactional.state.RotatingTransactionalState;
1010
import backtype.storm.transactional.state.TransactionalState;
1111
import backtype.storm.tuple.Fields;
@@ -16,8 +16,7 @@
1616
import java.util.TreeMap;
1717
import org.apache.log4j.Logger;
1818

19-
// TODO: Need to change this to replay EVERYTHING after a failure and ignore acks/fails for unknown attempts
20-
public class TransactionalSpoutCoordinator implements IRichSpout {
19+
public class TransactionalSpoutCoordinator extends BaseRichSpout {
2120
public static final Logger LOG = Logger.getLogger(TransactionalSpoutCoordinator.class);
2221

2322
public static final BigInteger INIT_TXID = BigInteger.ONE;

0 commit comments

Comments
 (0)