|
18 | 18 |
|
19 | 19 | ;; what about testing that the coordination is done properly?
|
20 | 20 | ;; can check that it receives all the prior tuples before finishbatch is called in the full topology
|
21 |
| -;; should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case) |
| 21 | +;; |
22 | 22 |
|
23 | 23 |
|
24 | 24 | ;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)
|
|
284 | 284 | (.close state)
|
285 | 285 | ))))
|
286 | 286 |
|
287 |
| - |
288 |
| - |
289 |
| -;; * Test that commit isn't considered successful until the entire tree has been completed (including tuples emitted from commit method) |
290 |
| -;; - test the full topology (this is a test of acking/anchoring) |
291 |
| -;; * Test that batch isn't considered processed until the entire tuple tree has been completed |
292 |
| -;; - test the full topology (this is a test of acking/anchoring) |
293 |
| -;; * Test that it picks up where it left off when restarting the topology |
294 |
| -;; - run topology and restart it |
295 |
| - |
296 |
| - |
297 | 287 | (defn mk-transactional-source []
|
298 | 288 | (HashMap.))
|
299 | 289 |
|
|
332 | 322 | (RegisteredGlobalState/clearState id#)
|
333 | 323 | ))
|
334 | 324 |
|
335 |
| -;; cluster-map topology :mock-sources {} :storm-conf {} |
| 325 | + |
336 | 326 | (deftest test-transactional-topology
|
337 | 327 | (with-tracked-cluster [cluster]
|
338 | 328 | (with-controller-bolt [controller collector tuples]
|
|
378 | 368 | {0 [["dog" 3]
|
379 | 369 | ["cat" 4]
|
380 | 370 | ["apple" 1]
|
381 |
| - ["dog" 3] |
382 |
| - ["banana" 0]] |
| 371 | + ["dog" 3]] |
383 | 372 | 1 [["cat" 1]
|
384 | 373 | ["mango" 4]]
|
385 | 374 | 2 [["happy" 11]
|
|
405 | 394 | (doseq [t to-ack]
|
406 | 395 | (.ack @collector t)))))
|
407 | 396 |
|
| 397 | + (bind fail-tx! (fn [txid] |
| 398 | + (let [[to-fail not-to-fail] (separate |
| 399 | + #(-> % |
| 400 | + (.getValue 0) |
| 401 | + .getTransactionId |
| 402 | + (= txid)) |
| 403 | + @tuples)] |
| 404 | + (reset! tuples not-to-fail) |
| 405 | + (doseq [t to-fail] |
| 406 | + (.fail @collector t))))) |
| 407 | + |
408 | 408 | ;; only check default streams
|
409 | 409 | (bind verify! (fn [expected]
|
410 | 410 | (let [results (-> topo-info :capturer .getResults)]
|
|
418 | 418 | )))
|
419 | 419 |
|
420 | 420 | (tracked-wait topo-info 2)
|
421 |
| - (println "Controlled: " @tuples) |
422 |
| - (println "Captured:" (-> topo-info :capturer .getResults)) |
423 | 421 | (verify! {"sum" [[1 "dog" 3]
|
424 | 422 | [1 "cat" 5]
|
425 | 423 | [1 "mango" 6]
|
|
440 | 438 | [1 "cat" 2]
|
441 | 439 | [1 "mango" 2]
|
442 | 440 | [1 "happy" 2]]})
|
| 441 | + |
| 442 | + (add-transactional-data data |
| 443 | + {0 [["a" 1] |
| 444 | + ["b" 2] |
| 445 | + ["c" 3]] |
| 446 | + 1 [["d" 4] |
| 447 | + ["c" 1]] |
| 448 | + 2 [["a" 2] |
| 449 | + ["e" 7] |
| 450 | + ["c" 11]] |
| 451 | + 3 [["a" 2]]}) |
| 452 | + |
443 | 453 | (ack-tx! 1)
|
| 454 | + (tracked-wait topo-info 1) |
| 455 | + (verify! {"sum" [[3 "a" 5] |
| 456 | + [3 "b" 2] |
| 457 | + [3 "d" 4] |
| 458 | + [3 "c" 1] |
| 459 | + [3 "e" 7]] |
| 460 | + "count" [] |
| 461 | + "count2" []}) |
| 462 | + (ack-tx! 3) |
| 463 | + (ack-tx! 2) |
| 464 | + (tracked-wait topo-info 1) |
| 465 | + (verify! {"sum" [] |
| 466 | + "count" [[2 "apple" 1] |
| 467 | + [2 "dog" 1] |
| 468 | + [2 "zebra" 1]] |
| 469 | + "count2" [[2 "apple" 2] |
| 470 | + [2 "dog" 2] |
| 471 | + [2 "zebra" 2]]}) |
| 472 | + |
| 473 | + (fail-tx! 2) |
| 474 | + (tracked-wait topo-info 1) |
| 475 | + |
| 476 | + (verify! {"sum" [[2 "apple" 1] |
| 477 | + [2 "dog" 3] |
| 478 | + [2 "zebra" 1]] |
| 479 | + "count" [] |
| 480 | + "count2" []}) |
| 481 | + (ack-tx! 2) |
| 482 | + (tracked-wait topo-info 1) |
444 | 483 |
|
445 |
| - ;; ack the commit |
446 |
| - ;; check that third batch is emitted |
447 |
| - ;; ack the third batch |
448 |
| - ;; check that the fourth batch is emitted |
449 |
| - ;; ack the second batch |
450 |
| - ;; check that the second batch is committed |
451 |
| - ;; fail the commit |
452 |
| - ;; check that second batch is emitted again |
453 |
| - ;; ack the second batch |
454 |
| - ;; commit the second batch... |
455 |
| - ;; check that third batch is committed |
| 484 | + (verify! {"sum" [] |
| 485 | + "count" [[2 "apple" 1] |
| 486 | + [2 "dog" 1] |
| 487 | + [2 "zebra" 1]] |
| 488 | + "count2" [[2 "apple" 2] |
| 489 | + [2 "dog" 2] |
| 490 | + [2 "zebra" 2]]}) |
456 | 491 |
|
457 |
| - ;; (println (read-tuples results "count")) |
| 492 | + (ack-tx! 2) |
| 493 | + |
| 494 | + (tracked-wait topo-info 2) |
| 495 | + (verify! {"sum" [[4 "c" 14]] |
| 496 | + "count" [[3 "a" 3] |
| 497 | + [3 "b" 1] |
| 498 | + [3 "d" 1] |
| 499 | + [3 "c" 1] |
| 500 | + [3 "e" 1]] |
| 501 | + "count2" [[3 "a" 2] |
| 502 | + [3 "b" 2] |
| 503 | + [3 "d" 2] |
| 504 | + [3 "c" 2] |
| 505 | + [3 "e" 2]]}) |
| 506 | + |
458 | 507 | (-> topo-info :capturer .getAndClearResults)
|
459 | 508 | ))))
|
460 | 509 |
|
461 |
| -{0 [["dog" 3] |
462 |
| - ["cat" 4] |
463 |
| - ["apple" 1] |
464 |
| - ["dog" 3] |
465 |
| - ["banana" 0]] |
466 |
| - 1 [["cat" 1] |
467 |
| - ["mango" 4]] |
468 |
| - 2 [["happy" 11] |
469 |
| - ["mango" 2] |
470 |
| - ["zebra" 1]]} |
| 510 | +;; TODO: should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case) |
| 511 | +;; TODO: ;; * Test that it picks up where it left off when restarting the topology |
| 512 | +;; - run topology and restart it |
0 commit comments