|
348 | 348 | .get_spout_object
|
349 | 349 | deserialized-component-object)))
|
350 | 350 |
|
| 351 | +(defn capture-topology [topology] |
| 352 | + (let [topology (.deepCopy topology) |
| 353 | + spouts (.get_spouts topology) |
| 354 | + bolts (.get_bolts topology) |
| 355 | + all-streams (apply concat |
| 356 | + (for [[id spec] (merge (clojurify-structure spouts) |
| 357 | + (clojurify-structure bolts))] |
| 358 | + (for [[stream info] (.. spec get_common get_streams)] |
| 359 | + [(GlobalStreamId. id stream) (.is_direct info)]))) |
| 360 | + capturer (TupleCaptureBolt.)] |
| 361 | + (.set_bolts topology |
| 362 | + (assoc (clojurify-structure bolts) |
| 363 | + (uuid) |
| 364 | + (Bolt. |
| 365 | + (serialize-component-object capturer) |
| 366 | + (mk-plain-component-common (into {} (for [[id direct?] all-streams] |
| 367 | + [id (if direct? |
| 368 | + (mk-direct-grouping) |
| 369 | + (mk-global-grouping))])) |
| 370 | + {} |
| 371 | + nil)) |
| 372 | + )) |
| 373 | + {:topology topology |
| 374 | + :capturer capturer} |
| 375 | + )) |
| 376 | + |
351 | 377 | ;; TODO: mock-sources needs to be able to mock out state spouts as well
|
352 | 378 | (defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {}]
|
353 | 379 | ;; TODO: the idea of mocking for transactional topologies should be done an
|
354 | 380 | ;; abstraction level above... should have a complete-transactional-topology for this
|
355 |
| - (let [storm-name (str "topologytest-" (uuid)) |
| 381 | + (let [{topology :topology capturer :capturer} (capture-topology topology) |
| 382 | + storm-name (str "topologytest-" (uuid)) |
356 | 383 | state (:storm-cluster-state cluster-map)
|
357 | 384 | spouts (.get_spouts topology)
|
358 |
| - bolts (.get_bolts topology) |
359 | 385 | replacements (map-val (fn [v]
|
360 | 386 | (FixedTupleSpout.
|
361 | 387 | (for [tup v]
|
362 | 388 | (if (map? tup)
|
363 | 389 | (FixedTuple. (:stream tup) (:values tup))
|
364 | 390 | tup))))
|
365 | 391 | mock-sources)
|
366 |
| - all-streams (apply concat |
367 |
| - (for [[id spec] (merge (clojurify-structure spouts) (clojurify-structure bolts))] |
368 |
| - (for [[stream info] (.. spec get_common get_streams)] |
369 |
| - [(GlobalStreamId. id stream) (.is_direct info)]))) |
370 |
| - capturer (TupleCaptureBolt. storm-name) |
| 392 | + |
| 393 | + |
371 | 394 | ]
|
372 | 395 | (doseq [[id spout] replacements]
|
373 | 396 | (let [spout-spec (get spouts id)]
|
|
378 | 401 | (throw (RuntimeException. "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be)"))
|
379 | 402 | ))
|
380 | 403 |
|
381 |
| - (.set_bolts topology |
382 |
| - (assoc (clojurify-structure bolts) |
383 |
| - (uuid) |
384 |
| - (Bolt. |
385 |
| - (serialize-component-object capturer) |
386 |
| - (mk-plain-component-common (into {} (for [[id direct?] all-streams] |
387 |
| - [id (if direct? |
388 |
| - (mk-direct-grouping) |
389 |
| - (mk-global-grouping))])) |
390 |
| - {} |
391 |
| - nil)) |
392 |
| - )) |
| 404 | + |
393 | 405 | (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
|
394 | 406 |
|
395 | 407 |
|
|
403 | 415 | (doseq [spout (spout-objects spouts)]
|
404 | 416 | (cleanup spout)))
|
405 | 417 |
|
406 |
| - (.getResults capturer) |
| 418 | + (.getAndClearResults capturer) |
407 | 419 | ))
|
408 | 420 |
|
409 | 421 | (defn read-tuples
|
|
0 commit comments