Skip to content

Commit 51d59ec

Browse files
author
Nathan Marz
committed
infrastructure functions for transactional topology testing
1 parent 7989c58 commit 51d59ec

File tree

2 files changed

+47
-14
lines changed

2 files changed

+47
-14
lines changed

src/clj/backtype/storm/util.clj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@
110110
amap
111111
)))
112112

113+
(defn separate [pred aseq]
114+
[(filter pred aseq) (filter (complement pred) aseq)])
115+
113116
(defn full-path [parent name]
114117
(let [toks (tokenize-path parent)]
115118
(toks->path (conj toks name))

test/clj/backtype/storm/transactional_test.clj

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
[]
5353
))))
5454

55+
(defn normalize-tx-tuple [values]
56+
(-> values vec (update 0 (memfn getTransactionId))))
57+
5558
(defn verify-and-reset! [expected-map emitted-map-atom]
5659
(let [results @emitted-map-atom]
5760
(dorun
@@ -64,8 +67,7 @@
6467
(is (= expected-map
6568
(map-val
6669
(fn [tuples]
67-
(map (comp #(update % 0 (memfn getTransactionId))
68-
vec
70+
(map (comp normalize-tx-tuple
6971
#(take 2 %)
7072
:tuple)
7173
tuples))
@@ -393,20 +395,48 @@
393395
TOPOLOGY-DEBUG true}
394396
(:topology topo-info))
395397

398+
(bind ack-tx (fn [txid]
399+
(let [[to-ack not-to-ack] (separate
400+
#(-> %
401+
(.getValue 0)
402+
.getTransactionId
403+
(= txid))
404+
@tuples)]
405+
(reset! tuples not-to-ack)
406+
(doseq [t to-ack]
407+
(.ack @collector t)))))
408+
409+
;; only check default streams
410+
(bind verify! (fn [expected]
411+
(let [results (-> topo-info :capturer .getResults)]
412+
(doseq [[component tuples] expected
413+
:let [emitted (->> (read-tuples results
414+
component
415+
"default")
416+
(map normalize-tx-tuple))]]
417+
(is (ms= tuples emitted)))
418+
(.clear results)
419+
)))
420+
396421
(tracked-wait topo-info 2)
397422
(println "Controlled: " @tuples)
398423
(println "Captured:" (-> topo-info :capturer .getResults))
399-
;; make a complex topology, and then make a final bolt at the end that doesn't ack (and lets me manipulate its tuples from here)
400-
401-
;; let it go to completion
402-
;; make sure there are no commits
403-
;; ack tuples for txid = 2
404-
;; make sure there are no commits
405-
;; ack tuples for txid = 1, let it run
406-
;; make sure there are 2 commits + 2 more batch emits
407-
;; extract the code to add a capturing bolt and transform the topology
424+
;; check that batch tuples have been emitted for tx 1 and 2
425+
;; check the outputs of all the bolts
426+
;; ack the ack the first batch
427+
;; check that first batch commits
428+
;; ack the commit
429+
;; check that third batch is emitted
430+
;; ack the third batch
431+
;; check that the fourth batch is emitted
432+
;; ack the second batch
433+
;; check that the second batch is committed
434+
;; fail the commit
435+
;; check that second batch is emitted again
436+
;; ack the second batch
437+
;; commit the second batch...
438+
;; check that third batch is committed
408439

409-
410-
;; (println (read-tuples results "count"))
411-
440+
;; (println (read-tuples results "count"))
441+
(-> topo-info :capturer .getAndClearResults)
412442
))))

0 commit comments

Comments
 (0)