|
23 | 23 | (defn mk-distributed-cluster-state [conf]
|
24 | 24 | (let [zk (zk/mk-client (mk-zk-connect-string (assoc conf STORM-ZOOKEEPER-ROOT "/")))]
|
25 | 25 | (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
|
26 |
| - (if (.isStarted zk) |
27 |
| - (.close zk)) |
28 |
| - ) |
| 26 | + (.close zk)) |
29 | 27 | (let [callbacks (atom {})
|
30 | 28 | active (atom true)
|
31 | 29 | mk-zk #(zk/mk-client (mk-zk-connect-string conf)
|
32 | 30 | (conf STORM-ZOOKEEPER-SESSION-TIMEOUT)
|
33 | 31 | %)
|
34 |
| - zk (atom nil) |
35 |
| - watcher (fn this [state type path] |
| 32 | + zk (mk-zk (fn [state type path] |
36 | 33 | (when @active
|
37 | 34 | (when-not (= :connected state)
|
38 |
| - (log-message "Zookeeper disconnected. Attempting to reconnect") |
39 |
| - ; (reset! zk (mk-zk this)) ;we don't need to reset it anymore,curator will take care of it. |
40 |
| - ) |
| 35 | + (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper.")) |
41 | 36 | (when-not (= :none type)
|
42 | 37 | (doseq [callback (vals @callbacks)]
|
43 | 38 | (callback type path))))
|
44 |
| - )] |
45 |
| - (reset! zk (mk-zk watcher)) |
| 39 | + ))] |
46 | 40 | (reify
|
47 | 41 | ClusterState
|
48 | 42 | (register [this callback]
|
|
53 | 47 | (unregister [this id]
|
54 | 48 | (swap! callbacks dissoc id))
|
55 | 49 | (set-ephemeral-node [this path data]
|
56 |
| - (zk/mkdirs @zk (parent-path path)) |
57 |
| - (if (zk/exists @zk path false) |
58 |
| - (zk/set-data @zk path data) ; should verify that it's ephemeral |
59 |
| - (zk/create-node @zk path data :ephemeral) |
| 50 | + (zk/mkdirs zk (parent-path path)) |
| 51 | + (if (zk/exists zk path false) |
| 52 | + (zk/set-data zk path data) ; should verify that it's ephemeral |
| 53 | + (zk/create-node zk path data :ephemeral) |
60 | 54 | ))
|
61 | 55 |
|
62 | 56 | (set-data [this path data]
|
63 | 57 | ;; note: this does not turn off any existing watches
|
64 |
| - (if (zk/exists @zk path false) |
65 |
| - (zk/set-data @zk path data) |
| 58 | + (if (zk/exists zk path false) |
| 59 | + (zk/set-data zk path data) |
66 | 60 | (do
|
67 |
| - (zk/mkdirs @zk (parent-path path)) |
68 |
| - (zk/create-node @zk path data :persistent) |
| 61 | + (zk/mkdirs zk (parent-path path)) |
| 62 | + (zk/create-node zk path data :persistent) |
69 | 63 | )))
|
70 | 64 |
|
71 | 65 | (delete-node [this path]
|
72 |
| - (zk/delete-recursive @zk path) |
| 66 | + (zk/delete-recursive zk path) |
73 | 67 | )
|
74 | 68 |
|
75 | 69 | (get-data [this path watch?]
|
76 |
| - (zk/get-data @zk path watch?) |
| 70 | + (zk/get-data zk path watch?) |
77 | 71 | )
|
78 | 72 |
|
79 | 73 | (get-children [this path watch?]
|
80 |
| - (zk/get-children @zk path watch?)) |
| 74 | + (zk/get-children zk path watch?)) |
81 | 75 |
|
82 | 76 | (mkdirs [this path]
|
83 |
| - (zk/mkdirs @zk path)) |
| 77 | + (zk/mkdirs zk path)) |
84 | 78 |
|
85 | 79 | (close [this]
|
86 | 80 | (reset! active false)
|
87 |
| - (if (.isStarted @zk) |
88 |
| - (.close @zk))) |
| 81 | + (.close zk)) |
89 | 82 | )))
|
90 | 83 |
|
91 | 84 | (defprotocol StormClusterState
|
|
0 commit comments