Skip to content

Commit e223f93

Browse files
author
Nathan Marz
committed
finished test for restarting a transactional topology
1 parent facc876 commit e223f93

File tree

4 files changed

+84
-19
lines changed

4 files changed

+84
-19
lines changed

src/clj/backtype/storm/testing.clj

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -308,39 +308,46 @@
308308

309309
(defprotocol CompletableSpout
310310
(exhausted? [this] "Whether all the tuples for this spout have been completed.")
311-
(cleanup [this] "Cleanup any global state kept"))
311+
(cleanup [this] "Cleanup any global state kept")
312+
(startup [this] "Prepare the spout (globally) before starting the topology"))
312313

313314
(extend-type FixedTupleSpout
314315
CompletableSpout
315316
(exhausted? [this]
316317
(= (-> this .getSourceTuples count)
317318
(.getCompleted this)))
318319
(cleanup [this]
319-
(.cleanup this)))
320+
(.cleanup this))
321+
(startup [this]
322+
))
320323

321324
(extend-type TransactionalSpoutCoordinator
322325
CompletableSpout
323326
(exhausted? [this]
324327
(exhausted? (.getSpout this)))
325328
(cleanup [this]
326-
(cleanup (.getSpout this))
327-
))
329+
(cleanup (.getSpout this)))
330+
(startup [this]
331+
(startup (.getSpout this))))
328332

329333
(extend-type PartitionedTransactionalSpoutExecutor
330334
CompletableSpout
331335
(exhausted? [this]
332336
(exhausted? (.getPartitionedSpout this)))
333337
(cleanup [this]
334-
(cleanup (.getPartitionedSpout this))
338+
(cleanup (.getPartitionedSpout this)))
339+
(startup [this]
340+
(startup (.getPartitionedSpout this))
335341
))
336342

337343
(extend-type MemoryTransactionalSpout
338344
CompletableSpout
339345
(exhausted? [this]
340346
(.isExhaustedTuples this))
341347
(cleanup [this]
342-
(.cleanup this)
343-
))
348+
(.cleanup this))
349+
(startup [this]
350+
(.startup this)))
344351

345352
(defn spout-objects [spec-map]
346353
(for [[_ spout-spec] spec-map]
@@ -375,7 +382,7 @@
375382
))
376383

377384
;; TODO: mock-sources needs to be able to mock out state spouts as well
378-
(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {}]
385+
(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true]
379386
;; TODO: the idea of mocking for transactional topologies should be done an
380387
;; abstraction level above... should have a complete-transactional-topology for this
381388
(let [{topology :topology capturer :capturer} (capture-topology topology)
@@ -400,7 +407,9 @@
400407
(when-not (extends? CompletableSpout (.getClass spout))
401408
(throw (RuntimeException. "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be)"))
402409
))
403-
410+
411+
(doseq [spout (spout-objects spouts)]
412+
(startup spout))
404413

405414
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
406415

@@ -412,10 +421,13 @@
412421
(.killTopology (:nimbus cluster-map) storm-name)
413422
(while (.assignment-info state storm-id nil)
414423
(simulate-wait cluster-map))
415-
(doseq [spout (spout-objects spouts)]
416-
(cleanup spout)))
424+
(when cleanup-state
425+
(doseq [spout (spout-objects spouts)]
426+
(cleanup spout))))
417427

418-
(.getAndClearResults capturer)
428+
(if cleanup-state
429+
(.getAndRemoveResults capturer)
430+
(.getAndClearResults capturer))
419431
))
420432

421433
(defn read-tuples

src/jvm/backtype/storm/testing/MemoryTransactionalSpout.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ public Map<String, Object> getComponentConfiguration() {
132132
return conf;
133133
}
134134

135+
public void startup() {
136+
getFinishedStatuses().clear();
137+
}
138+
135139
public void cleanup() {
136140
RegisteredGlobalState.clearState(_id);
137141
RegisteredGlobalState.clearState(_finishedPartitionsId);

src/jvm/backtype/storm/testing/TupleCaptureBolt.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ public Map<String, List<FixedTuple>> getResults() {
4444
public void cleanup() {
4545
}
4646

47-
public Map<String, List<FixedTuple>> getAndClearResults() {
47+
public Map<String, List<FixedTuple>> getAndRemoveResults() {
4848
return emitted_tuples.remove(_name);
4949
}
5050

51+
public Map<String, List<FixedTuple>> getAndClearResults() {
52+
Map<String, List<FixedTuple>> ret = new HashMap<String, List<FixedTuple>>(emitted_tuples.get(_name));
53+
emitted_tuples.get(_name).clear();
54+
return ret;
55+
}
56+
5157
@Override
5258
public void declareOutputFields(OutputFieldsDeclarer declarer) {
5359
}

test/clj/backtype/storm/transactional_test.clj

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)
1818
;; * Test that partitioned state emits nothing for the partition if it has seen a future transaction for that partition (test partitioned emitter on its own)
1919

20-
21-
20+
;; TODO: test that FailedBatchException works
2221

2322
(defn mk-coordinator-state-changer [atom]
2423
(TransactionalSpoutCoordinator.
@@ -540,8 +539,52 @@
540539
(-> topo-info :capturer .getAndClearResults)
541540
))))
542541

542+
(deftest test-transactional-topology-restart
543+
(with-simulated-time-local-cluster [cluster]
544+
(letlocals
545+
(bind data (mk-transactional-source))
546+
(bind builder (TransactionalTopologyBuilder.
547+
"id"
548+
"spout"
549+
(MemoryTransactionalSpout. data
550+
(Fields. ["word"])
551+
3)
552+
2))
553+
554+
(-> builder
555+
(.setBolt "count" (CountingCommitBolt.) 2)
556+
(.globalGrouping "spout"))
557+
558+
(add-transactional-data data
559+
{0 [["a"]
560+
["b"]
561+
["c"]
562+
["d"]]
563+
1 [["d"]
564+
["c"]]
565+
})
566+
567+
(bind results (complete-topology cluster
568+
(.buildTopology builder)
569+
:cleanup-state false))
570+
571+
(is (ms= [[5] [0] [1] [0]] (->> (read-tuples results "count")
572+
(take 4)
573+
(map (partial drop 1))
574+
)))
575+
576+
(add-transactional-data data
577+
{0 [["a"]
578+
["b"]]
579+
})
580+
581+
(bind results (complete-topology cluster (.buildTopology builder)))
582+
583+
(println results)
584+
;; need to do it this way (check for nothing transaction) because there is one transaction already saved up before that emits nothing (because of how memorytransctionalspout detects partition completion)
585+
(is (ms= [[0] [0] [2] [0]] (->> (read-tuples results "count")
586+
(take 4)
587+
(map (partial drop 1))
588+
)))
589+
)))
543590

544-
545-
546-
;; TODO: ;; * Test that it picks up where it left off when restarting the topology
547-
;; - run topology and restart it

0 commit comments

Comments
 (0)