|
2 | 2 | (:use [clojure test])
|
3 | 3 | (:import [backtype.storm.drpc ReturnResults DRPCSpout
|
4 | 4 | LinearDRPCTopologyBuilder])
|
5 |
| - (:import [backtype.storm.coordination CoordinatedBolt$FinishedCallback]) |
| 5 | + (:import [backtype.storm.coordination CoordinatedBolt$FinishedCallback FailedBatchException]) |
6 | 6 | (:import [backtype.storm LocalDRPC LocalCluster])
|
7 | 7 | (:import [backtype.storm.tuple Fields])
|
| 8 | + (:import [backtype.storm.generated DRPCExecutionException]) |
8 | 9 | (:use [backtype.storm bootstrap testing])
|
9 | 10 | (:use [backtype.storm.daemon common])
|
10 | 11 | (:use [backtype.storm clojure])
|
|
29 | 30 | exclamation-bolt)
|
30 | 31 | "3" (bolt-spec {"2" :shuffle}
|
31 | 32 | (ReturnResults.))})]
|
32 |
| - (.submitTopology cluster "test" {TOPOLOGY-DEBUG true} topology) |
| 33 | + (.submitTopology cluster "test" {} topology) |
33 | 34 |
|
34 | 35 | (is (= "aaa!!!" (.execute drpc "test" "aaa")))
|
35 | 36 | (is (= "b!!!" (.execute drpc "test" "b")))
|
|
55 | 56 | (.addBolt builder exclamation-bolt-drpc 3)
|
56 | 57 | (.submitTopology cluster
|
57 | 58 | "builder-test"
|
58 |
| - {TOPOLOGY-DEBUG true} |
| 59 | + {} |
59 | 60 | (.createLocalTopology builder drpc))
|
60 | 61 | (is (= "aaa!!!" (.execute drpc "test" "aaa")))
|
61 | 62 | (is (= "b!!!" (.execute drpc "test" "b")))
|
|
173 | 174 | (.shutdown cluster)
|
174 | 175 | (.shutdown drpc)
|
175 | 176 | ))
|
| 177 | + |
| 178 | +(defbolt fail-finish-bolt ["request" "result"] {:prepare true} |
| 179 | + [conf context collector] |
| 180 | + (bolt |
| 181 | + (execute [tuple] |
| 182 | + (ack! collector tuple)) |
| 183 | + CoordinatedBolt$FinishedCallback |
| 184 | + (finishedId [this id] |
| 185 | + (throw (FailedBatchException.)) |
| 186 | + ))) |
| 187 | + |
| 188 | +(deftest test-drpc-fail-finish |
| 189 | + (let [drpc (LocalDRPC.) |
| 190 | + cluster (LocalCluster.) |
| 191 | + builder (LinearDRPCTopologyBuilder. "fail2") |
| 192 | + ] |
| 193 | + (.addBolt builder fail-finish-bolt 3) |
| 194 | + |
| 195 | + (.submitTopology cluster |
| 196 | + "fail2" |
| 197 | + {} |
| 198 | + (.createLocalTopology builder drpc)) |
| 199 | + |
| 200 | + (is (thrown? DRPCExecutionException (.execute drpc "fail2" "2"))) |
| 201 | + |
| 202 | + (.shutdown cluster) |
| 203 | + (.shutdown drpc) |
| 204 | + )) |
0 commit comments