Skip to content

Commit 0eb696b

Browse files
author
Nathan Marz
committed
cleanup curator integration and don't close multiple times in tests
1 parent df33534 commit 0eb696b

File tree

4 files changed

+21
-28
lines changed

4 files changed

+21
-28
lines changed

src/clj/backtype/storm/cluster.clj

+17-24
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,20 @@
2323
(defn mk-distributed-cluster-state [conf]
2424
(let [zk (zk/mk-client (mk-zk-connect-string (assoc conf STORM-ZOOKEEPER-ROOT "/")))]
2525
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
26-
(if (.isStarted zk)
27-
(.close zk))
28-
)
26+
(.close zk))
2927
(let [callbacks (atom {})
3028
active (atom true)
3129
mk-zk #(zk/mk-client (mk-zk-connect-string conf)
3230
(conf STORM-ZOOKEEPER-SESSION-TIMEOUT)
3331
%)
34-
zk (atom nil)
35-
watcher (fn this [state type path]
32+
zk (mk-zk (fn [state type path]
3633
(when @active
3734
(when-not (= :connected state)
38-
(log-message "Zookeeper disconnected. Attempting to reconnect")
39-
; (reset! zk (mk-zk this)) ;we don't need to reset it anymore,curator will take care of it.
40-
)
35+
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
4136
(when-not (= :none type)
4237
(doseq [callback (vals @callbacks)]
4338
(callback type path))))
44-
)]
45-
(reset! zk (mk-zk watcher))
39+
))]
4640
(reify
4741
ClusterState
4842
(register [this callback]
@@ -53,39 +47,38 @@
5347
(unregister [this id]
5448
(swap! callbacks dissoc id))
5549
(set-ephemeral-node [this path data]
56-
(zk/mkdirs @zk (parent-path path))
57-
(if (zk/exists @zk path false)
58-
(zk/set-data @zk path data) ; should verify that it's ephemeral
59-
(zk/create-node @zk path data :ephemeral)
50+
(zk/mkdirs zk (parent-path path))
51+
(if (zk/exists zk path false)
52+
(zk/set-data zk path data) ; should verify that it's ephemeral
53+
(zk/create-node zk path data :ephemeral)
6054
))
6155

6256
(set-data [this path data]
6357
;; note: this does not turn off any existing watches
64-
(if (zk/exists @zk path false)
65-
(zk/set-data @zk path data)
58+
(if (zk/exists zk path false)
59+
(zk/set-data zk path data)
6660
(do
67-
(zk/mkdirs @zk (parent-path path))
68-
(zk/create-node @zk path data :persistent)
61+
(zk/mkdirs zk (parent-path path))
62+
(zk/create-node zk path data :persistent)
6963
)))
7064

7165
(delete-node [this path]
72-
(zk/delete-recursive @zk path)
66+
(zk/delete-recursive zk path)
7367
)
7468

7569
(get-data [this path watch?]
76-
(zk/get-data @zk path watch?)
70+
(zk/get-data zk path watch?)
7771
)
7872

7973
(get-children [this path watch?]
80-
(zk/get-children @zk path watch?))
74+
(zk/get-children zk path watch?))
8175

8276
(mkdirs [this path]
83-
(zk/mkdirs @zk path))
77+
(zk/mkdirs zk path))
8478

8579
(close [this]
8680
(reset! active false)
87-
(if (.isStarted @zk)
88-
(.close @zk)))
81+
(.close zk))
8982
)))
9083

9184
(defprotocol StormClusterState

src/clj/backtype/storm/log.clj

+3
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@
1212

1313
(defmacro log-warn-error [e & args]
1414
`(log/warn (str ~@args) ~e))
15+
16+
(defmacro log-warn [& args]
17+
`(log/warn (str ~@args)))

src/clj/backtype/storm/zookeeper.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
session-timeout
3636
15000
3737
;;TODO: make retry times could be configured.
38-
(RetryNTimes. 5 1000))]
38+
(RetryNTimes. 5 1000))]
3939
(.. fk (getCuratorListenable) (addListener (reify CuratorListener
4040
(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
4141
(let [^WatchedEvent event (.getWatchedEvent e)]

test/clj/backtype/storm/cluster_test.clj

-3
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@
6868
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
6969
(.close state1)
7070
(is (= nil (.get-data state2 "/a" false)))
71-
(.close state1)
7271
(.close state2)
73-
(.close state3)
7472
)))
7573

7674
(defn mk-callback-tester []
@@ -224,7 +222,6 @@
224222
(.disconnect state2)
225223
(is (= #{"1"} (set (.supervisors state1 nil))))
226224
(.disconnect state1)
227-
(.disconnect state2)
228225
)))
229226

230227
(deftest test-storm-state-callbacks

0 commit comments

Comments
 (0)