Skip to content

Commit e129980

Browse files
author
Nathan Marz
committed
finished most difficult transactional test
1 parent ea8b4b9 commit e129980

File tree

1 file changed

+80
-38
lines changed

1 file changed

+80
-38
lines changed

test/clj/backtype/storm/transactional_test.clj

Lines changed: 80 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
;; what about testing that the coordination is done properly?
2020
;; can check that it receives all the prior tuples before finishbatch is called in the full topology
21-
;; should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case)
21+
;;
2222

2323

2424
;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)
@@ -284,16 +284,6 @@
284284
(.close state)
285285
))))
286286

287-
288-
289-
;; * Test that commit isn't considered successful until the entire tree has been completed (including tuples emitted from commit method)
290-
;; - test the full topology (this is a test of acking/anchoring)
291-
;; * Test that batch isn't considered processed until the entire tuple tree has been completed
292-
;; - test the full topology (this is a test of acking/anchoring)
293-
;; * Test that it picks up where it left off when restarting the topology
294-
;; - run topology and restart it
295-
296-
297287
(defn mk-transactional-source []
298288
(HashMap.))
299289

@@ -332,7 +322,7 @@
332322
(RegisteredGlobalState/clearState id#)
333323
))
334324

335-
;; cluster-map topology :mock-sources {} :storm-conf {}
325+
336326
(deftest test-transactional-topology
337327
(with-tracked-cluster [cluster]
338328
(with-controller-bolt [controller collector tuples]
@@ -378,8 +368,7 @@
378368
{0 [["dog" 3]
379369
["cat" 4]
380370
["apple" 1]
381-
["dog" 3]
382-
["banana" 0]]
371+
["dog" 3]]
383372
1 [["cat" 1]
384373
["mango" 4]]
385374
2 [["happy" 11]
@@ -405,6 +394,17 @@
405394
(doseq [t to-ack]
406395
(.ack @collector t)))))
407396

397+
(bind fail-tx! (fn [txid]
398+
(let [[to-fail not-to-fail] (separate
399+
#(-> %
400+
(.getValue 0)
401+
.getTransactionId
402+
(= txid))
403+
@tuples)]
404+
(reset! tuples not-to-fail)
405+
(doseq [t to-fail]
406+
(.fail @collector t)))))
407+
408408
;; only check default streams
409409
(bind verify! (fn [expected]
410410
(let [results (-> topo-info :capturer .getResults)]
@@ -418,8 +418,6 @@
418418
)))
419419

420420
(tracked-wait topo-info 2)
421-
(println "Controlled: " @tuples)
422-
(println "Captured:" (-> topo-info :capturer .getResults))
423421
(verify! {"sum" [[1 "dog" 3]
424422
[1 "cat" 5]
425423
[1 "mango" 6]
@@ -440,31 +438,75 @@
440438
[1 "cat" 2]
441439
[1 "mango" 2]
442440
[1 "happy" 2]]})
441+
442+
(add-transactional-data data
443+
{0 [["a" 1]
444+
["b" 2]
445+
["c" 3]]
446+
1 [["d" 4]
447+
["c" 1]]
448+
2 [["a" 2]
449+
["e" 7]
450+
["c" 11]]
451+
3 [["a" 2]]})
452+
443453
(ack-tx! 1)
454+
(tracked-wait topo-info 1)
455+
(verify! {"sum" [[3 "a" 5]
456+
[3 "b" 2]
457+
[3 "d" 4]
458+
[3 "c" 1]
459+
[3 "e" 7]]
460+
"count" []
461+
"count2" []})
462+
(ack-tx! 3)
463+
(ack-tx! 2)
464+
(tracked-wait topo-info 1)
465+
(verify! {"sum" []
466+
"count" [[2 "apple" 1]
467+
[2 "dog" 1]
468+
[2 "zebra" 1]]
469+
"count2" [[2 "apple" 2]
470+
[2 "dog" 2]
471+
[2 "zebra" 2]]})
472+
473+
(fail-tx! 2)
474+
(tracked-wait topo-info 1)
475+
476+
(verify! {"sum" [[2 "apple" 1]
477+
[2 "dog" 3]
478+
[2 "zebra" 1]]
479+
"count" []
480+
"count2" []})
481+
(ack-tx! 2)
482+
(tracked-wait topo-info 1)
444483

445-
;; ack the commit
446-
;; check that third batch is emitted
447-
;; ack the third batch
448-
;; check that the fourth batch is emitted
449-
;; ack the second batch
450-
;; check that the second batch is committed
451-
;; fail the commit
452-
;; check that second batch is emitted again
453-
;; ack the second batch
454-
;; commit the second batch...
455-
;; check that third batch is committed
484+
(verify! {"sum" []
485+
"count" [[2 "apple" 1]
486+
[2 "dog" 1]
487+
[2 "zebra" 1]]
488+
"count2" [[2 "apple" 2]
489+
[2 "dog" 2]
490+
[2 "zebra" 2]]})
456491

457-
;; (println (read-tuples results "count"))
492+
(ack-tx! 2)
493+
494+
(tracked-wait topo-info 2)
495+
(verify! {"sum" [[4 "c" 14]]
496+
"count" [[3 "a" 3]
497+
[3 "b" 1]
498+
[3 "d" 1]
499+
[3 "c" 1]
500+
[3 "e" 1]]
501+
"count2" [[3 "a" 2]
502+
[3 "b" 2]
503+
[3 "d" 2]
504+
[3 "c" 2]
505+
[3 "e" 2]]})
506+
458507
(-> topo-info :capturer .getAndClearResults)
459508
))))
460509

461-
{0 [["dog" 3]
462-
["cat" 4]
463-
["apple" 1]
464-
["dog" 3]
465-
["banana" 0]]
466-
1 [["cat" 1]
467-
["mango" 4]]
468-
2 [["happy" 11]
469-
["mango" 2]
470-
["zebra" 1]]}
510+
;; TODO: should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case)
511+
;; TODO: ;; * Test that it picks up where it left off when restarting the topology
512+
;; - run topology and restart it

0 commit comments

Comments
 (0)