|
5 | 5 | TransactionalTopologyBuilder])
|
6 | 6 | (:import [backtype.storm.transactional.state TransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
|
7 | 7 | (:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout
|
8 |
| - KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt]) |
| 8 | + KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt |
| 9 | + IdentityBolt]) |
9 | 10 | (:use [backtype.storm bootstrap testing])
|
10 | 11 | (:use [backtype.storm.daemon common])
|
11 | 12 | )
|
|
342 | 343 | (Fields. ["word" "amt"])
|
343 | 344 | 2)
|
344 | 345 | 2))
|
| 346 | + |
| 347 | + (-> builder |
| 348 | + (.setBolt "id1" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3) |
| 349 | + (.shuffleGrouping "spout")) |
| 350 | + |
| 351 | + (-> builder |
| 352 | + (.setBolt "id2" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3) |
| 353 | + (.shuffleGrouping "spout")) |
345 | 354 |
|
346 | 355 | (-> builder
|
347 |
| - (.setBolt "count" (KeyedSummingBatchBolt.) 2) |
348 |
| - (.fieldsGrouping "spout" (Fields. ["word"]))) |
| 356 | + (.setBolt "sum" (KeyedSummingBatchBolt.) 2) |
| 357 | + (.fieldsGrouping "id1" (Fields. ["word"]))) |
| 358 | + |
| 359 | + (-> builder |
| 360 | + (.setBolt "count" (KeyedCountingCommitterBolt.) 2) |
| 361 | + (.fieldsGrouping "id2" (Fields. ["word"]))) |
| 362 | + |
| 363 | + (-> builder |
| 364 | + (.setBolt "count2" (KeyedCountingCommitterBolt.) 3) |
| 365 | + (.fieldsGrouping "sum" (Fields. ["key"])) |
| 366 | + (.fieldsGrouping "count" (Fields. ["key"]))) |
349 | 367 |
|
350 | 368 | (bind builder (.buildTopologyBuilder builder))
|
351 | 369 |
|
352 | 370 | (-> builder
|
353 | 371 | (.setBolt "controller" controller 1)
|
354 |
| - (.directGrouping "count" Constants/COORDINATED_STREAM_ID)) |
| 372 | + (.directGrouping "count2" Constants/COORDINATED_STREAM_ID) |
| 373 | + (.directGrouping "sum" Constants/COORDINATED_STREAM_ID)) |
355 | 374 |
|
356 | 375 | (add-transactional-data data
|
357 | 376 | {0 [["dog" 3]
|
|
0 commit comments