Skip to content

Commit c73dc01

Browse files
author
Nathan Marz
committed
small fixes... almost working
1 parent f592566 commit c73dc01

File tree

5 files changed

+50
-48
lines changed

5 files changed

+50
-48
lines changed

src/clj/backtype/storm/daemon/acker.clj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,5 @@
6666
(.declareStream declarer ACKER-ACK-STREAM-ID true (Fields. ["id"]))
6767
(.declareStream declarer ACKER-FAIL-STREAM-ID true (Fields. ["id"])))
6868
)))
69+
70+
;; need to create a class

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
(defn mk-task-component-assignments [conf storm-id]
148148
(let [storm-conf (read-storm-conf conf storm-id)
149149
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
150-
topology (system-topology (read-storm-topology conf storm-id))
150+
topology (system-topology storm-conf (read-storm-topology conf storm-id))
151151
slots-to-use (storm-conf TOPOLOGY-WORKERS)
152152
counter (mk-counter)
153153
tasks (concat
@@ -563,7 +563,7 @@
563563
(to-json (read-storm-conf conf id)))
564564

565565
(^StormTopology getTopology [this ^String id]
566-
(system-topology (read-storm-topology conf id)))
566+
(system-topology (read-storm-conf conf id) (read-storm-topology conf id)))
567567

568568
(^ClusterSummary getClusterInfo [this]
569569
(let [assigned (assigned-slots storm-cluster-state)

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
event-manager (event/event-manager true)
9696

9797
task->component (storm-task-info storm-cluster-state storm-id)
98-
mk-topology-context #(TopologyContext. (system-topology topology)
98+
mk-topology-context #(TopologyContext. (system-topology storm-conf topology)
9999
task->component
100100
storm-id
101101
(supervisor-storm-resources-path

test/clj/backtype/storm/drpc_test.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
spout (DRPCSpout. "test" drpc)
2525
cluster (LocalCluster.)
2626
topology (topology
27-
{1 (spout-spec spout)}
28-
{2 (bolt-spec {1 :shuffle}
27+
{"1" (spout-spec spout)}
28+
{"2" (bolt-spec {"1" :shuffle}
2929
exclamation-bolt)
30-
3 (bolt-spec {2 :shuffle}
30+
"3" (bolt-spec {"2" :shuffle}
3131
(ReturnResults.))})]
3232
(.submitTopology cluster "test" {TOPOLOGY-DEBUG true} topology)
3333

test/clj/backtype/storm/integration_test.clj

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
(with-local-cluster [cluster :supervisors 4]
6464
(let [nimbus (:nimbus cluster)
6565
topology (thrift/mk-topology
66-
{1 (thrift/mk-spout-spec (TestWordSpout. false))}
67-
{2 (thrift/mk-shell-bolt-spec {1 :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)}
66+
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
67+
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)}
6868
)]
6969
(submit-local-topology nimbus
7070
"test"
@@ -81,38 +81,38 @@
8181
(with-simulated-time-local-cluster [cluster :supervisors 4
8282
:daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
8383
(let [topology (thrift/mk-topology
84-
{1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
85-
{2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
86-
3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
87-
4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
84+
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
85+
{"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
86+
"3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
87+
"4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
8888
})
8989
results (complete-topology cluster
9090
topology
91-
:mock-sources {1 [["nathan"] ["bob"] ["joey"] ["nathan"]]}
91+
:mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
9292
:storm-conf {TOPOLOGY-DEBUG true
9393
TOPOLOGY-WORKERS 2})]
9494
(is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
95-
(read-tuples results 1)))
95+
(read-tuples results "1")))
9696
(is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
97-
(read-tuples results 2)))
97+
(read-tuples results "2")))
9898
(is (= [[1] [2] [3] [4]]
99-
(read-tuples results 3)))
99+
(read-tuples results "3")))
100100
(is (= [[1] [2] [3] [4]]
101-
(read-tuples results 4)))
101+
(read-tuples results "4")))
102102
))))
103103

104104
(deftest test-shuffle
105105
(with-simulated-time-local-cluster [cluster :supervisors 4]
106106
(let [topology (thrift/mk-topology
107-
{1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
108-
{2 (thrift/mk-bolt-spec {1 :shuffle} (TestGlobalCount.)
107+
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
108+
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
109109
:parallelism-hint 6)
110110
})
111111
results (complete-topology cluster
112112
topology
113113
;; important for test that
114114
;; #tuples = multiple of 4 and 6
115-
:mock-sources {1 [["a"] ["b"]
115+
:mock-sources {"1" [["a"] ["b"]
116116
["a"] ["b"]
117117
["a"] ["b"]
118118
["a"] ["b"]
@@ -127,7 +127,7 @@
127127
]}
128128
)]
129129
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
130-
(read-tuples results 2)))
130+
(read-tuples results "2")))
131131
)))
132132

133133
(defbolt lalala-bolt1 ["word"] [tuple collector]
@@ -165,23 +165,23 @@
165165
(with-simulated-time-local-cluster [cluster :supervisors 4]
166166
(let [nimbus (:nimbus cluster)
167167
topology (thrift/mk-topology
168-
{1 (thrift/mk-spout-spec (TestWordSpout. false))}
169-
{2 (thrift/mk-bolt-spec {1 :shuffle}
168+
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
169+
{"2" (thrift/mk-bolt-spec {"1" :shuffle}
170170
lalala-bolt1)
171-
3 (thrift/mk-bolt-spec {1 :shuffle}
171+
"3" (thrift/mk-bolt-spec {"1" :shuffle}
172172
lalala-bolt2)
173-
4 (thrift/mk-bolt-spec {1 :shuffle}
173+
"4" (thrift/mk-bolt-spec {"1" :shuffle}
174174
(lalala-bolt3 "_nathan_"))}
175175
)
176176
results (complete-topology cluster
177177
topology
178-
:mock-sources {1 [["david"]
178+
:mock-sources {"1" [["david"]
179179
["adam"]
180180
]}
181181
)]
182-
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 2)))
183-
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 3)))
184-
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results 4)))
182+
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
183+
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
184+
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
185185
)))
186186

187187
(defn ack-tracking-feeder [fields]
@@ -230,17 +230,17 @@
230230
[feeder2 checker2] (ack-tracking-feeder ["num"])
231231
[feeder3 checker3] (ack-tracking-feeder ["num"])
232232
tracked (mk-tracked-topology
233-
{1 [feeder1]
234-
2 [feeder2]
235-
3 [feeder3]}
236-
{4 [{1 :shuffle} (branching-bolt 2)]
237-
5 [{2 :shuffle} (branching-bolt 4)]
238-
6 [{3 :shuffle} (branching-bolt 1)]
239-
7 [{4 :shuffle
240-
5 :shuffle
241-
6 :shuffle} (agg-bolt 3)]
242-
8 [{7 :shuffle} (branching-bolt 2)]
243-
9 [{8 :shuffle} ack-bolt]}
233+
{"1" [feeder1]
234+
"2" [feeder2]
235+
"3" [feeder3]}
236+
{"4" [{"1" :shuffle} (branching-bolt 2)]
237+
"5" [{"2" :shuffle} (branching-bolt 4)]
238+
"6" [{"3" :shuffle} (branching-bolt 1)]
239+
"7" [{"4" :shuffle
240+
"5" :shuffle
241+
"6" :shuffle} (agg-bolt 3)]
242+
"8" [{"7" :shuffle} (branching-bolt 2)]
243+
"9" [{"8" :shuffle} ack-bolt]}
244244
)]
245245
(submit-local-topology (:nimbus cluster)
246246
"test"
@@ -275,11 +275,11 @@
275275
(with-tracked-cluster [cluster]
276276
(let [[feeder checker] (ack-tracking-feeder ["num"])
277277
tracked (mk-tracked-topology
278-
{1 [feeder]}
279-
{2 [{1 :shuffle} identity-bolt]
280-
3 [{1 :shuffle} identity-bolt]
281-
4 [{2 :shuffle
282-
3 :shuffle} (agg-bolt 4)]})]
278+
{"1" [feeder]}
279+
{"2" [{"1" :shuffle} identity-bolt]
280+
"3" [{"1" :shuffle} identity-bolt]
281+
"4" [{"2" :shuffle
282+
"3" :shuffle} (agg-bolt 4)]})]
283283
(submit-local-topology (:nimbus cluster)
284284
"test"
285285
{}
@@ -301,9 +301,9 @@
301301
(with-tracked-cluster [cluster]
302302
(let [[feeder checker] (ack-tracking-feeder ["num"])
303303
tracked (mk-tracked-topology
304-
{1 [feeder]}
305-
{2 [{1 :shuffle} dup-anchor]
306-
3 [{2 :shuffle} ack-bolt]})]
304+
{"1" [feeder]}
305+
{"2" [{"1" :shuffle} dup-anchor]
306+
"3" [{"2" :shuffle} ack-bolt]})]
307307
(submit-local-topology (:nimbus cluster)
308308
"test"
309309
{}

0 commit comments

Comments
 (0)