Skip to content

Commit 7989c58

Browse files
author
Nathan Marz
committed
test topology for transactional topologies
1 parent d56be3f commit 7989c58

File tree

3 files changed

+48
-5
lines changed

3 files changed

+48
-5
lines changed

src/jvm/backtype/storm/testing/CountingBatchBolt.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import backtype.storm.task.TopologyContext;
44
import backtype.storm.topology.OutputFieldsDeclarer;
55
import backtype.storm.topology.base.BaseBatchBolt;
6-
import backtype.storm.transactional.TransactionAttempt;
76
import backtype.storm.coordination.BatchOutputCollector;
87
import backtype.storm.tuple.Fields;
98
import backtype.storm.tuple.Tuple;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.topology.BasicOutputCollector;
4+
import backtype.storm.topology.OutputFieldsDeclarer;
5+
import backtype.storm.topology.base.BaseBasicBolt;
6+
import backtype.storm.tuple.Fields;
7+
import backtype.storm.tuple.Tuple;
8+
9+
public class IdentityBolt extends BaseBasicBolt {
10+
Fields _fields;
11+
12+
public IdentityBolt(Fields fields) {
13+
_fields = fields;
14+
}
15+
16+
@Override
17+
public void execute(Tuple input, BasicOutputCollector collector) {
18+
collector.emit(input.getValues());
19+
}
20+
21+
@Override
22+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
23+
declarer.declare(_fields);
24+
}
25+
}

test/clj/backtype/storm/transactional_test.clj

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
TransactionalTopologyBuilder])
66
(:import [backtype.storm.transactional.state TransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
77
(:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout
8-
KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt])
8+
KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt
9+
IdentityBolt])
910
(:use [backtype.storm bootstrap testing])
1011
(:use [backtype.storm.daemon common])
1112
)
@@ -342,16 +343,34 @@
342343
(Fields. ["word" "amt"])
343344
2)
344345
2))
346+
347+
(-> builder
348+
(.setBolt "id1" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
349+
(.shuffleGrouping "spout"))
350+
351+
(-> builder
352+
(.setBolt "id2" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
353+
(.shuffleGrouping "spout"))
345354

346355
(-> builder
347-
(.setBolt "count" (KeyedSummingBatchBolt.) 2)
348-
(.fieldsGrouping "spout" (Fields. ["word"])))
356+
(.setBolt "sum" (KeyedSummingBatchBolt.) 2)
357+
(.fieldsGrouping "id1" (Fields. ["word"])))
358+
359+
(-> builder
360+
(.setBolt "count" (KeyedCountingCommitterBolt.) 2)
361+
(.fieldsGrouping "id2" (Fields. ["word"])))
362+
363+
(-> builder
364+
(.setBolt "count2" (KeyedCountingCommitterBolt.) 3)
365+
(.fieldsGrouping "sum" (Fields. ["key"]))
366+
(.fieldsGrouping "count" (Fields. ["key"])))
349367

350368
(bind builder (.buildTopologyBuilder builder))
351369

352370
(-> builder
353371
(.setBolt "controller" controller 1)
354-
(.directGrouping "count" Constants/COORDINATED_STREAM_ID))
372+
(.directGrouping "count2" Constants/COORDINATED_STREAM_ID)
373+
(.directGrouping "sum" Constants/COORDINATED_STREAM_ID))
355374

356375
(add-transactional-data data
357376
{0 [["dog" 3]

0 commit comments

Comments
 (0)