|
6 | 6 | (:import [backtype.storm.transactional.state TransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
|
7 | 7 | (:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout
|
8 | 8 | KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt
|
9 |
| - IdentityBolt]) |
| 9 | + IdentityBolt CountingCommitBolt]) |
10 | 10 | (:use [backtype.storm bootstrap testing])
|
11 | 11 | (:use [backtype.storm.daemon common])
|
12 | 12 | )
|
|
210 | 210 |
|
211 | 211 | (verify-bolt-and-reset! {:ack [[attempt1-1] [attempt1-1] [attempt1-2]
|
212 | 212 | [attempt2-1] [attempt1-1]]
|
213 |
| - "batch" [[attempt1-1 3]]} |
| 213 | + "default" [[attempt1-1 3]]} |
214 | 214 | capture-atom)
|
215 | 215 |
|
216 | 216 | (.execute bolt (test-tuple [attempt1-2]))
|
217 | 217 | (finish! bolt attempt2-1)
|
218 | 218 | (verify-bolt-and-reset! {:ack [[attempt1-2]]
|
219 |
| - "batch" [[attempt2-1 1]]} |
| 219 | + "default" [[attempt2-1 1]]} |
220 | 220 | capture-atom)
|
221 | 221 |
|
222 | 222 | (finish! bolt attempt1-2)
|
223 |
| - (verify-bolt-and-reset! {"batch" [[attempt1-2 2]]} |
| 223 | + (verify-bolt-and-reset! {"default" [[attempt1-2 2]]} |
224 | 224 | capture-atom)
|
225 | 225 | ))
|
226 | 226 |
|
|
322 | 322 | (RegisteredGlobalState/clearState id#)
|
323 | 323 | ))
|
324 | 324 |
|
325 |
| - |
326 | 325 | (deftest test-transactional-topology
|
327 | 326 | (with-tracked-cluster [cluster]
|
328 | 327 | (with-controller-bolt [controller collector tuples]
|
|
343 | 342 | (-> builder
|
344 | 343 | (.setBolt "id2" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
|
345 | 344 | (.shuffleGrouping "spout"))
|
| 345 | + |
| 346 | + (-> builder |
| 347 | + (.setBolt "global" (CountingBatchBolt.) 1) |
| 348 | + (.globalGrouping "spout")) |
| 349 | + |
| 350 | + (-> builder |
| 351 | + (.setBolt "gcommit" (CountingCommitBolt.) 1) |
| 352 | + (.globalGrouping "spout")) |
346 | 353 |
|
347 | 354 | (-> builder
|
348 | 355 | (.setBolt "sum" (KeyedSummingBatchBolt.) 2)
|
|
426 | 433 | [2 "dog" 3]
|
427 | 434 | [2 "zebra" 1]]
|
428 | 435 | "count" []
|
429 |
| - "count2" []}) |
| 436 | + "count2" [] |
| 437 | + "global" [[1 6] |
| 438 | + [2 3]] |
| 439 | + "gcommit" []}) |
430 | 440 | (ack-tx! 1)
|
431 | 441 | (tracked-wait topo-info 1)
|
432 | 442 | (verify! {"sum" []
|
|
437 | 447 | "count2" [[1 "dog" 2]
|
438 | 448 | [1 "cat" 2]
|
439 | 449 | [1 "mango" 2]
|
440 |
| - [1 "happy" 2]]}) |
| 450 | + [1 "happy" 2]] |
| 451 | + "global" [] |
| 452 | + "gcommit" [[1 6]]}) |
441 | 453 |
|
442 | 454 | (add-transactional-data data
|
443 | 455 | {0 [["a" 1]
|
|
458 | 470 | [3 "c" 1]
|
459 | 471 | [3 "e" 7]]
|
460 | 472 | "count" []
|
461 |
| - "count2" []}) |
| 473 | + "count2" [] |
| 474 | + "global" [[3 7]] |
| 475 | + "gcommit" []}) |
462 | 476 | (ack-tx! 3)
|
463 | 477 | (ack-tx! 2)
|
464 | 478 | (tracked-wait topo-info 1)
|
|
468 | 482 | [2 "zebra" 1]]
|
469 | 483 | "count2" [[2 "apple" 2]
|
470 | 484 | [2 "dog" 2]
|
471 |
| - [2 "zebra" 2]]}) |
| 485 | + [2 "zebra" 2]] |
| 486 | + "global" [] |
| 487 | + "gcommit" [[2 3]]}) |
472 | 488 |
|
473 | 489 | (fail-tx! 2)
|
474 | 490 | (tracked-wait topo-info 1)
|
|
477 | 493 | [2 "dog" 3]
|
478 | 494 | [2 "zebra" 1]]
|
479 | 495 | "count" []
|
480 |
| - "count2" []}) |
| 496 | + "count2" [] |
| 497 | + "global" [[2 3]] |
| 498 | + "gcommit" []}) |
481 | 499 | (ack-tx! 2)
|
482 | 500 | (tracked-wait topo-info 1)
|
483 | 501 |
|
|
487 | 505 | [2 "zebra" 1]]
|
488 | 506 | "count2" [[2 "apple" 2]
|
489 | 507 | [2 "dog" 2]
|
490 |
| - [2 "zebra" 2]]}) |
| 508 | + [2 "zebra" 2]] |
| 509 | + "global" [] |
| 510 | + "gcommit" [[2 3]]}) |
491 | 511 |
|
492 | 512 | (ack-tx! 2)
|
493 | 513 |
|
|
502 | 522 | [3 "b" 2]
|
503 | 523 | [3 "d" 2]
|
504 | 524 | [3 "c" 2]
|
505 |
| - [3 "e" 2]]}) |
| 525 | + [3 "e" 2]] |
| 526 | + "global" [[4 2]] |
| 527 | + "gcommit" [[3 7]]}) |
| 528 | + |
| 529 | + (ack-tx! 4) |
| 530 | + (ack-tx! 3) |
| 531 | + (tracked-wait topo-info 2) |
| 532 | + (verify! {"sum" [] |
| 533 | + "count" [[4 "c" 2]] |
| 534 | + "count2" [[4 "c" 2]] |
| 535 | + "global" [[5 0]] |
| 536 | + "gcommit" [[4 2]]}) |
| 537 | + |
| 538 | + (ack-tx! 5) |
| 539 | + (ack-tx! 4) |
| 540 | + (tracked-wait topo-info 2) |
| 541 | + (verify! {"sum" [] |
| 542 | + "count" [] |
| 543 | + "count2" [] |
| 544 | + "global" [[6 0]] |
| 545 | + "gcommit" [[5 0]]}) |
506 | 546 |
|
507 | 547 | (-> topo-info :capturer .getAndClearResults)
|
508 | 548 | ))))
|
509 | 549 |
|
510 |
| -;; TODO: should test that it commits even when receiving no tuples (and test that finishBatch is called before commit in this case) |
| 550 | + |
| 551 | + |
| 552 | + |
511 | 553 | ;; TODO: ;; * Test that it picks up where it left off when restarting the topology
|
512 | 554 | ;; - run topology and restart it
|
0 commit comments