Skip to content

Commit 80c7973

Browse files
author
Nathan Marz
committed
expanded transactional tests to test tasks that receive no tuples
1 parent e129980 commit 80c7973

File tree

3 files changed

+94
-17
lines changed

3 files changed

+94
-17
lines changed

src/jvm/backtype/storm/testing/CountingBatchBolt.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import java.util.Map;
1111

1212
public class CountingBatchBolt extends BaseBatchBolt {
13-
public static final String BATCH_STREAM = "batch";
14-
1513
BatchOutputCollector _collector;
1614
Object _id;
1715
int _count = 0;
@@ -29,12 +27,12 @@ public void execute(Tuple tuple) {
2927

3028
@Override
3129
public void finishBatch() {
32-
_collector.emit(BATCH_STREAM, new Values(_id, _count));
30+
_collector.emit(new Values(_id, _count));
3331
}
3432

3533
@Override
3634
public void declareOutputFields(OutputFieldsDeclarer declarer) {
37-
declarer.declareStream(BATCH_STREAM, new Fields("tx", "count"));
35+
declarer.declare(new Fields("tx", "count"));
3836
}
3937

4038
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.task.TopologyContext;
4+
import backtype.storm.topology.OutputFieldsDeclarer;
5+
import backtype.storm.coordination.BatchOutputCollector;
6+
import backtype.storm.topology.base.BaseCommitterBolt;
7+
import backtype.storm.transactional.TransactionAttempt;
8+
import backtype.storm.tuple.Fields;
9+
import backtype.storm.tuple.Tuple;
10+
import backtype.storm.tuple.Values;
11+
import java.util.Map;
12+
13+
public class CountingCommitBolt extends BaseCommitterBolt {
14+
TransactionAttempt _id;
15+
int _count = 0;
16+
17+
@Override
18+
public void prepare(Map conf, TopologyContext context, TransactionAttempt id) {
19+
_id = id;
20+
}
21+
22+
@Override
23+
public void execute(Tuple tuple) {
24+
_count++;
25+
}
26+
27+
@Override
28+
public void commit(BatchOutputCollector collector) {
29+
collector.emit(new Values(_id, _count));
30+
}
31+
32+
@Override
33+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
34+
declarer.declare(new Fields("tx", "count"));
35+
}
36+
37+
}

test/clj/backtype/storm/transactional_test.clj

+55-13
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
(:import [backtype.storm.transactional.state TransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
77
(:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout
88
KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt
9-
IdentityBolt])
9+
IdentityBolt CountingCommitBolt])
1010
(:use [backtype.storm bootstrap testing])
1111
(:use [backtype.storm.daemon common])
1212
)
@@ -210,17 +210,17 @@
210210

211211
(verify-bolt-and-reset! {:ack [[attempt1-1] [attempt1-1] [attempt1-2]
212212
[attempt2-1] [attempt1-1]]
213-
"batch" [[attempt1-1 3]]}
213+
"default" [[attempt1-1 3]]}
214214
capture-atom)
215215

216216
(.execute bolt (test-tuple [attempt1-2]))
217217
(finish! bolt attempt2-1)
218218
(verify-bolt-and-reset! {:ack [[attempt1-2]]
219-
"batch" [[attempt2-1 1]]}
219+
"default" [[attempt2-1 1]]}
220220
capture-atom)
221221

222222
(finish! bolt attempt1-2)
223-
(verify-bolt-and-reset! {"batch" [[attempt1-2 2]]}
223+
(verify-bolt-and-reset! {"default" [[attempt1-2 2]]}
224224
capture-atom)
225225
))
226226

@@ -322,7 +322,6 @@
322322
(RegisteredGlobalState/clearState id#)
323323
))
324324

325-
326325
(deftest test-transactional-topology
327326
(with-tracked-cluster [cluster]
328327
(with-controller-bolt [controller collector tuples]
@@ -343,6 +342,14 @@
343342
(-> builder
344343
(.setBolt "id2" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
345344
(.shuffleGrouping "spout"))
345+
346+
(-> builder
347+
(.setBolt "global" (CountingBatchBolt.) 1)
348+
(.globalGrouping "spout"))
349+
350+
(-> builder
351+
(.setBolt "gcommit" (CountingCommitBolt.) 1)
352+
(.globalGrouping "spout"))
346353

347354
(-> builder
348355
(.setBolt "sum" (KeyedSummingBatchBolt.) 2)
@@ -426,7 +433,10 @@
426433
[2 "dog" 3]
427434
[2 "zebra" 1]]
428435
"count" []
429-
"count2" []})
436+
"count2" []
437+
"global" [[1 6]
438+
[2 3]]
439+
"gcommit" []})
430440
(ack-tx! 1)
431441
(tracked-wait topo-info 1)
432442
(verify! {"sum" []
@@ -437,7 +447,9 @@
437447
"count2" [[1 "dog" 2]
438448
[1 "cat" 2]
439449
[1 "mango" 2]
440-
[1 "happy" 2]]})
450+
[1 "happy" 2]]
451+
"global" []
452+
"gcommit" [[1 6]]})
441453

442454
(add-transactional-data data
443455
{0 [["a" 1]
@@ -458,7 +470,9 @@
458470
[3 "c" 1]
459471
[3 "e" 7]]
460472
"count" []
461-
"count2" []})
473+
"count2" []
474+
"global" [[3 7]]
475+
"gcommit" []})
462476
(ack-tx! 3)
463477
(ack-tx! 2)
464478
(tracked-wait topo-info 1)
@@ -468,7 +482,9 @@
468482
[2 "zebra" 1]]
469483
"count2" [[2 "apple" 2]
470484
[2 "dog" 2]
471-
[2 "zebra" 2]]})
485+
[2 "zebra" 2]]
486+
"global" []
487+
"gcommit" [[2 3]]})
472488

473489
(fail-tx! 2)
474490
(tracked-wait topo-info 1)
@@ -477,7 +493,9 @@
477493
[2 "dog" 3]
478494
[2 "zebra" 1]]
479495
"count" []
480-
"count2" []})
496+
"count2" []
497+
"global" [[2 3]]
498+
"gcommit" []})
481499
(ack-tx! 2)
482500
(tracked-wait topo-info 1)
483501

@@ -487,7 +505,9 @@
487505
[2 "zebra" 1]]
488506
"count2" [[2 "apple" 2]
489507
[2 "dog" 2]
490-
[2 "zebra" 2]]})
508+
[2 "zebra" 2]]
509+
"global" []
510+
"gcommit" [[2 3]]})
491511

492512
(ack-tx! 2)
493513

@@ -502,11 +522,33 @@
502522
[3 "b" 2]
503523
[3 "d" 2]
504524
[3 "c" 2]
505-
[3 "e" 2]]})
525+
[3 "e" 2]]
526+
"global" [[4 2]]
527+
"gcommit" [[3 7]]})
528+
529+
(ack-tx! 4)
530+
(ack-tx! 3)
531+
(tracked-wait topo-info 2)
532+
(verify! {"sum" []
533+
"count" [[4 "c" 2]]
534+
"count2" [[4 "c" 2]]
535+
"global" [[5 0]]
536+
"gcommit" [[4 2]]})
537+
538+
(ack-tx! 5)
539+
(ack-tx! 4)
540+
(tracked-wait topo-info 2)
541+
(verify! {"sum" []
542+
"count" []
543+
"count2" []
544+
"global" [[6 0]]
545+
"gcommit" [[5 0]]})
506546

507547
(-> topo-info :capturer .getAndClearResults)
508548
))))
509549

510-
;; TODO: should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case)
550+
551+
552+
511553
;; TODO: ;; * Test that it picks up where it left off when restarting the topology
512554
;; - run topology and restart it

0 commit comments

Comments
 (0)