Skip to content

Commit f66ad09

Browse files
author
Nathan Marz
committed
fix bug where committer spouts (including opaque spouts) could cause processing to freeze due to commit being interpreted as new batch of processing
1 parent 09f86bd commit f66ad09

File tree

9 files changed

+201
-12
lines changed

9 files changed

+201
-12
lines changed

src/clj/backtype/storm/util.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,8 @@
488488
(defn collectify [obj]
489489
(if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
490490

491-
(defn to-json [^Map m]
492-
(JSONValue/toJSONString m))
491+
(defn to-json [obj]
492+
(JSONValue/toJSONString obj))
493493

494494
(defn from-json [^String str]
495495
(if str

src/clj/storm/trident/testing.clj

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
(ns storm.trident.testing
2-
(:import [storm.trident.testing FeederBatchSpout MemoryMapState MemoryMapState$Factory])
2+
(:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
33
(:import [backtype.storm LocalDRPC])
44
(:import [backtype.storm.tuple Fields])
55
(:import [backtype.storm.generated KillOptions])
@@ -14,9 +14,15 @@
1414
(let [res (.execute drpc function-name args)]
1515
(from-json res)))
1616

17+
(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
18+
(exec-drpc drpc function-name (to-json tuples)))
19+
1720
(defn feeder-spout [fields]
1821
(FeederBatchSpout. fields))
1922

23+
(defn feeder-committer-spout [fields]
24+
(FeederCommitterBatchSpout. fields))
25+
2026
(defn feed [feeder tuples]
2127
(.feed feeder tuples))
2228

@@ -46,3 +52,11 @@
4652
(import 'storm.trident.TridentTopology)
4753
(import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN])
4854
)
55+
56+
(defn drpc-tuples-input [topology function-name drpc outfields]
57+
(-> topology
58+
(.newDRPCStream function-name drpc)
59+
(.each (fields "args") (TuplifyArgs.) outfields)
60+
))
61+
62+

src/jvm/storm/trident/spout/TridentSpoutExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public void execute(BatchInfo info, Tuple input) {
4848
// there won't be a BatchInfo for the success stream
4949
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
5050
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
51-
_collector.setBatch(info.batchId);
5251
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
5352
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
5453
_activeBatches.remove(attempt.getTransactionId());

src/jvm/storm/trident/testing/FeederBatchSpout.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import storm.trident.topology.TransactionAttempt;
1616
import storm.trident.topology.TridentTopologyBuilder;
1717

18-
public class FeederBatchSpout implements ITridentSpout {
18+
public class FeederBatchSpout implements ITridentSpout, IFeeder {
1919

2020
String _id;
2121
String _semaphoreId;
2222
Fields _outFields;
23+
boolean _waitToEmit = true;
2324

2425

2526
public FeederBatchSpout(List<String> fields) {
@@ -28,6 +29,10 @@ public FeederBatchSpout(List<String> fields) {
2829
_semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
2930
}
3031

32+
public void setWaitToEmit(boolean trueIfWait) {
33+
_waitToEmit = trueIfWait;
34+
}
35+
3136
public void feed(Object tuples) {
3237
Semaphore sem = new Semaphore(0);
3338
((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
@@ -93,6 +98,7 @@ public void success(long txid) {
9398

9499
@Override
95100
public boolean isReady(long txid) {
101+
if(!_waitToEmit) return true;
96102
List allBatches = (List) RegisteredGlobalState.getState(_id);
97103
if(allBatches.size() > _masterEmitted) {
98104
_masterEmitted++;
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package storm.trident.testing;
2+
3+
import backtype.storm.task.TopologyContext;
4+
import backtype.storm.tuple.Fields;
5+
import java.util.List;
6+
import java.util.Map;
7+
import storm.trident.operation.TridentCollector;
8+
import storm.trident.spout.ICommitterTridentSpout;
9+
import storm.trident.spout.ITridentSpout;
10+
import storm.trident.topology.TransactionAttempt;
11+
12+
13+
public class FeederCommitterBatchSpout implements ICommitterTridentSpout, IFeeder {
14+
15+
FeederBatchSpout _spout;
16+
17+
public FeederCommitterBatchSpout(List<String> fields) {
18+
_spout = new FeederBatchSpout(fields);
19+
}
20+
21+
public void setWaitToEmit(boolean trueIfWait) {
22+
_spout.setWaitToEmit(trueIfWait);
23+
}
24+
25+
static class CommitterEmitter implements ICommitterTridentSpout.Emitter {
26+
ITridentSpout.Emitter _emitter;
27+
28+
29+
public CommitterEmitter(ITridentSpout.Emitter e) {
30+
_emitter = e;
31+
}
32+
33+
@Override
34+
public void commit(TransactionAttempt attempt) {
35+
}
36+
37+
@Override
38+
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
39+
_emitter.emitBatch(tx, coordinatorMeta, collector);
40+
}
41+
42+
@Override
43+
public void success(TransactionAttempt tx) {
44+
_emitter.success(tx);
45+
}
46+
47+
@Override
48+
public void close() {
49+
_emitter.close();
50+
}
51+
52+
}
53+
54+
@Override
55+
public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
56+
return new CommitterEmitter(_spout.getEmitter(txStateId, conf, context));
57+
}
58+
59+
@Override
60+
public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
61+
return _spout.getCoordinator(txStateId, conf, context);
62+
}
63+
64+
@Override
65+
public Fields getOutputFields() {
66+
return _spout.getOutputFields();
67+
}
68+
69+
@Override
70+
public Map getComponentConfiguration() {
71+
return _spout.getComponentConfiguration();
72+
}
73+
74+
@Override
75+
public void feed(Object tuples) {
76+
_spout.feed(tuples);
77+
}
78+
79+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package storm.trident.testing;
2+
3+
4+
public interface IFeeder {
5+
void feed(Object tuples);
6+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package storm.trident.testing;
2+
3+
import java.util.List;
4+
import org.json.simple.JSONValue;
5+
import storm.trident.operation.BaseFunction;
6+
import storm.trident.operation.TridentCollector;
7+
import storm.trident.tuple.TridentTuple;
8+
9+
public class TuplifyArgs extends BaseFunction {
10+
11+
@Override
12+
public void execute(TridentTuple input, TridentCollector collector) {
13+
String args = input.getString(0);
14+
List<List<Object>> tuples = (List) JSONValue.parse(args);
15+
for(List<Object> tuple: tuples) {
16+
collector.emit(tuple);
17+
}
18+
}
19+
20+
}

src/jvm/storm/trident/topology/TridentTopologyBuilder.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ public static String spoutIdFromCoordinatorId(String coordId) {
8383
return coordId.substring(SPOUT_COORD_PREFIX.length());
8484
}
8585

86-
Map<GlobalStreamId, String> fleshOutStreamBatchIds() {
86+
Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
8787
Map<GlobalStreamId, String> ret = new HashMap<GlobalStreamId, String>(_batchIds);
8888
Set<String> allBatches = new HashSet(_batchIds.values());
8989
for(String b: allBatches) {
9090
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b);
91-
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
91+
if(includeCommitStream) {
92+
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
93+
}
9294
// DO NOT include the success stream as part of the batch. it should not trigger coordination tuples,
9395
// and is just a metadata tuple to assist in cleanup, should not trigger batch tracking
9496
}
@@ -111,7 +113,8 @@ Map<GlobalStreamId, String> fleshOutStreamBatchIds() {
111113

112114
public StormTopology buildTopology() {
113115
TopologyBuilder builder = new TopologyBuilder();
114-
Map<GlobalStreamId, String> batchIds = fleshOutStreamBatchIds();
116+
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
117+
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
115118

116119
Map<String, List<String>> batchesToCommitIds = new HashMap<String, List<String>>();
117120
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<String, List<ITridentSpout>>();
@@ -152,7 +155,7 @@ public StormTopology buildTopology() {
152155
c.commitStateId,
153156
c.streamName,
154157
((ITridentSpout) c.spout)),
155-
batchIds,
158+
batchIdsForSpouts,
156159
specs),
157160
c.parallelism);
158161
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
@@ -186,7 +189,7 @@ public StormTopology buildTopology() {
186189
Map<String, CoordSpec> specs = new HashMap();
187190

188191
for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
189-
String batch = batchIds.get(s);
192+
String batch = batchIdsForBolts.get(s);
190193
if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
191194
CoordSpec spec = specs.get(batch);
192195
CoordType ct;
@@ -202,7 +205,7 @@ public StormTopology buildTopology() {
202205
specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
203206
}
204207

205-
BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIds, specs), c.parallelism);
208+
BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
206209
for(Map conf: c.componentConfs) {
207210
d.addConfigurations(conf);
208211
}

test/clj/storm/trident/integration_test.clj

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,46 @@
3838
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
3939
)))))
4040

41+
;; this test reproduces a bug where committer spouts freeze processing when
42+
;; there's at least one repartitioning after the spout
43+
(deftest test-word-count-committer-spout
44+
(t/with-local-cluster [cluster]
45+
(with-drpc [drpc]
46+
(letlocals
47+
(bind topo (TridentTopology.))
48+
(bind feeder (feeder-committer-spout ["sentence"]))
49+
(.setWaitToEmit feeder false) ;;this causes lots of empty batches
50+
(bind word-counts
51+
(-> topo
52+
(.newStream "tester" feeder)
53+
(.parallelismHint 2)
54+
(.each (fields "sentence") (Split.) (fields "word"))
55+
(.groupBy (fields "word"))
56+
(.persistentAggregate (memory-map-state) (Count.) (fields "count"))
57+
(.parallelismHint 6)
58+
))
59+
(-> topo
60+
(.newDRPCStream "words" drpc)
61+
(.each (fields "args") (Split.) (fields "word"))
62+
(.groupBy (fields "word"))
63+
(.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
64+
(.aggregate (fields "count") (Sum.) (fields "sum"))
65+
(.project (fields "sum")))
66+
(with-topology [cluster topo]
67+
(feed feeder [["hello the man said"] ["the"]])
68+
(is (= [[2]] (exec-drpc drpc "words" "the")))
69+
(is (= [[1]] (exec-drpc drpc "words" "hello")))
70+
(Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
71+
(feed feeder [["the man on the moon"] ["where are you"]])
72+
(is (= [[4]] (exec-drpc drpc "words" "the")))
73+
(is (= [[2]] (exec-drpc drpc "words" "man")))
74+
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
75+
(feed feeder [["the the"]])
76+
(is (= [[6]] (exec-drpc drpc "words" "the")))
77+
(feed feeder [["the"]])
78+
(is (= [[7]] (exec-drpc drpc "words" "the")))
79+
)))))
80+
4181

4282
(deftest test-count-agg
4383
(t/with-local-cluster [cluster]
@@ -76,4 +116,26 @@
76116
(with-topology [cluster topo]
77117
(is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
78118
(is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
79-
)))))
119+
)))))
120+
121+
122+
;; (deftest test-split-merge
123+
;; (t/with-local-cluster [cluster]
124+
;; (with-drpc [drpc]
125+
;; (letlocals
126+
;; (bind topo (TridentTopology.))
127+
;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
128+
;; (bind s1
129+
;; (-> drpc-stream
130+
;; (.each (fields "args") (Split.) (fields "word"))
131+
;; (.project (fields "word"))))
132+
;; (bind s2
133+
;; (-> drpc-stream
134+
;; (.each (fields "args") (StringLength.) (fields "len"))
135+
;; (.project (fields "len"))))
136+
;;
137+
;; (.merge topo [s1 s2])
138+
;; (with-topology [cluster topo]
139+
;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
140+
;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
141+
;; )))))

0 commit comments

Comments
 (0)