Skip to content

Commit afe8ee8

Browse files
author
Nathan Marz
committed
generalize tuple capturing code so it can be composed
1 parent 743645e commit afe8ee8

File tree

3 files changed

+51
-28
lines changed

3 files changed

+51
-28
lines changed

src/clj/backtype/storm/testing.clj

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -348,26 +348,49 @@
348348
.get_spout_object
349349
deserialized-component-object)))
350350

351+
(defn capture-topology [topology]
352+
(let [topology (.deepCopy topology)
353+
spouts (.get_spouts topology)
354+
bolts (.get_bolts topology)
355+
all-streams (apply concat
356+
(for [[id spec] (merge (clojurify-structure spouts)
357+
(clojurify-structure bolts))]
358+
(for [[stream info] (.. spec get_common get_streams)]
359+
[(GlobalStreamId. id stream) (.is_direct info)])))
360+
capturer (TupleCaptureBolt.)]
361+
(.set_bolts topology
362+
(assoc (clojurify-structure bolts)
363+
(uuid)
364+
(Bolt.
365+
(serialize-component-object capturer)
366+
(mk-plain-component-common (into {} (for [[id direct?] all-streams]
367+
[id (if direct?
368+
(mk-direct-grouping)
369+
(mk-global-grouping))]))
370+
{}
371+
nil))
372+
))
373+
{:topology topology
374+
:capturer capturer}
375+
))
376+
351377
;; TODO: mock-sources needs to be able to mock out state spouts as well
352378
(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {}]
353379
;; TODO: the idea of mocking for transactional topologies should be done an
354380
;; abstraction level above... should have a complete-transactional-topology for this
355-
(let [storm-name (str "topologytest-" (uuid))
381+
(let [{topology :topology capturer :capturer} (capture-topology topology)
382+
storm-name (str "topologytest-" (uuid))
356383
state (:storm-cluster-state cluster-map)
357384
spouts (.get_spouts topology)
358-
bolts (.get_bolts topology)
359385
replacements (map-val (fn [v]
360386
(FixedTupleSpout.
361387
(for [tup v]
362388
(if (map? tup)
363389
(FixedTuple. (:stream tup) (:values tup))
364390
tup))))
365391
mock-sources)
366-
all-streams (apply concat
367-
(for [[id spec] (merge (clojurify-structure spouts) (clojurify-structure bolts))]
368-
(for [[stream info] (.. spec get_common get_streams)]
369-
[(GlobalStreamId. id stream) (.is_direct info)])))
370-
capturer (TupleCaptureBolt. storm-name)
392+
393+
371394
]
372395
(doseq [[id spout] replacements]
373396
(let [spout-spec (get spouts id)]
@@ -378,18 +401,7 @@
378401
(throw (RuntimeException. "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be)"))
379402
))
380403

381-
(.set_bolts topology
382-
(assoc (clojurify-structure bolts)
383-
(uuid)
384-
(Bolt.
385-
(serialize-component-object capturer)
386-
(mk-plain-component-common (into {} (for [[id direct?] all-streams]
387-
[id (if direct?
388-
(mk-direct-grouping)
389-
(mk-global-grouping))]))
390-
{}
391-
nil))
392-
))
404+
393405
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
394406

395407

@@ -403,7 +415,7 @@
403415
(doseq [spout (spout-objects spouts)]
404416
(cleanup spout)))
405417

406-
(.getResults capturer)
418+
(.getAndClearResults capturer)
407419
))
408420

409421
(defn read-tuples

src/jvm/backtype/storm/testing/TupleCaptureBolt.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@
88
import java.util.HashMap;
99
import java.util.List;
1010
import java.util.Map;
11+
import java.util.UUID;
1112

1213

1314
public class TupleCaptureBolt implements IBolt {
1415
public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>();
1516

1617
private String _name;
17-
private Map<String, List<FixedTuple>> _results = null;
1818
private OutputCollector _collector;
1919

20-
public TupleCaptureBolt(String name) {
21-
_name = name;
22-
emitted_tuples.put(name, new HashMap<String, List<FixedTuple>>());
20+
public TupleCaptureBolt() {
21+
_name = UUID.randomUUID().toString();
22+
emitted_tuples.put(_name, new HashMap<String, List<FixedTuple>>());
2323
}
2424

2525
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
@@ -37,13 +37,14 @@ public void execute(Tuple input) {
3737
}
3838

3939
public Map<String, List<FixedTuple>> getResults() {
40-
if(_results==null) {
41-
_results = emitted_tuples.remove(_name);
42-
}
43-
return _results;
40+
return emitted_tuples.get(_name);
4441
}
4542

4643
public void cleanup() {
4744
}
45+
46+
public Map<String, List<FixedTuple>> getAndClearResults() {
47+
return emitted_tuples.remove(_name);
48+
}
4849

4950
}

test/clj/backtype/storm/transactional_test.clj

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,16 @@
317317
(.setBolt "count" (KeyedSummingBatchBolt.) 2)
318318
(.fieldsGrouping "spout" (Fields. ["word"])))
319319

320+
;; make a complex topology, and then make a final bolt at the end that doesn't ack (and lets me manipulate its tuples from here)
321+
322+
;; let it go to completion
323+
;; make sure there are no commits
324+
;; ack tuples for txid = 2
325+
;; make sure there are no commits
326+
;; ack tuples for txid = 1, let it run
327+
;; make sure there are 2 commits + 2 more batch emits
328+
;; extract the code to add a capturing bolt and transform the topology
329+
320330
(add-transactional-data data
321331
{0 [["dog" 3]
322332
["cat" 4]

0 commit comments

Comments
 (0)