|
52 | 52 | []
|
53 | 53 | ))))
|
54 | 54 |
|
| 55 | +(defn normalize-tx-tuple [values] |
| 56 | + (-> values vec (update 0 (memfn getTransactionId)))) |
| 57 | + |
55 | 58 | (defn verify-and-reset! [expected-map emitted-map-atom]
|
56 | 59 | (let [results @emitted-map-atom]
|
57 | 60 | (dorun
|
|
64 | 67 | (is (= expected-map
|
65 | 68 | (map-val
|
66 | 69 | (fn [tuples]
|
67 |
| - (map (comp #(update % 0 (memfn getTransactionId)) |
68 |
| - vec |
| 70 | + (map (comp normalize-tx-tuple |
69 | 71 | #(take 2 %)
|
70 | 72 | :tuple)
|
71 | 73 | tuples))
|
|
393 | 395 | TOPOLOGY-DEBUG true}
|
394 | 396 | (:topology topo-info))
|
395 | 397 |
|
| 398 | + (bind ack-tx (fn [txid] |
| 399 | + (let [[to-ack not-to-ack] (separate |
| 400 | + #(-> % |
| 401 | + (.getValue 0) |
| 402 | + .getTransactionId |
| 403 | + (= txid)) |
| 404 | + @tuples)] |
| 405 | + (reset! tuples not-to-ack) |
| 406 | + (doseq [t to-ack] |
| 407 | + (.ack @collector t))))) |
| 408 | + |
| 409 | + ;; only check default streams |
| 410 | + (bind verify! (fn [expected] |
| 411 | + (let [results (-> topo-info :capturer .getResults)] |
| 412 | + (doseq [[component tuples] expected |
| 413 | + :let [emitted (->> (read-tuples results |
| 414 | + component |
| 415 | + "default") |
| 416 | + (map normalize-tx-tuple))]] |
| 417 | + (is (ms= tuples emitted))) |
| 418 | + (.clear results) |
| 419 | + ))) |
| 420 | + |
396 | 421 | (tracked-wait topo-info 2)
|
397 | 422 | (println "Controlled: " @tuples)
|
398 | 423 | (println "Captured:" (-> topo-info :capturer .getResults))
|
399 |
| - ;; make a complex topology, and then make a final bolt at the end that doesn't ack (and lets me manipulate its tuples from here) |
400 |
| - |
401 |
| - ;; let it go to completion |
402 |
| - ;; make sure there are no commits |
403 |
| - ;; ack tuples for txid = 2 |
404 |
| - ;; make sure there are no commits |
405 |
| - ;; ack tuples for txid = 1, let it run |
406 |
| - ;; make sure there are 2 commits + 2 more batch emits |
407 |
| - ;; extract the code to add a capturing bolt and transform the topology |
| 424 | + ;; check that batch tuples have been emitted for tx 1 and 2 |
| 425 | + ;; check the outputs of all the bolts |
| 426 | + ;; ack the ack the first batch |
| 427 | + ;; check that first batch commits |
| 428 | + ;; ack the commit |
| 429 | + ;; check that third batch is emitted |
| 430 | + ;; ack the third batch |
| 431 | + ;; check that the fourth batch is emitted |
| 432 | + ;; ack the second batch |
| 433 | + ;; check that the second batch is committed |
| 434 | + ;; fail the commit |
| 435 | + ;; check that second batch is emitted again |
| 436 | + ;; ack the second batch |
| 437 | + ;; commit the second batch... |
| 438 | + ;; check that third batch is committed |
408 | 439 |
|
409 |
| - |
410 |
| -;; (println (read-tuples results "count")) |
411 |
| - |
| 440 | + ;; (println (read-tuples results "count")) |
| 441 | + (-> topo-info :capturer .getAndClearResults) |
412 | 442 | ))))
|
0 commit comments