Skip to content

Commit f7fa84a

Browse files
author
Nathan Marz
committed
refactor reading of topology and topology config from supervisor cache
1 parent 7cce193 commit f7fa84a

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

conf/defaults.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,4 @@ topology.max.spout.pending: null
6868
topology.state.synchronization.timeout.secs: 60
6969
topology.stats.sample.rate: 0.05
7070
topology.fall.back.on.java.serialization: true
71+
topology.worker.childopts: nil

src/clj/backtype/storm/config.clj

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@
128128
(defn ^LocalState supervisor-state [conf]
129129
(LocalState. (str (supervisor-local-dir conf) "/localstate")))
130130

131+
(defn read-supervisor-storm-conf [conf storm-id]
132+
(let [stormroot (supervisor-stormdist-root conf storm-id)
133+
conf-path (supervisor-stormconf-path stormroot)
134+
topology-path (supervisor-stormcode-path stormroot)]
135+
(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
136+
))
137+
138+
(defn read-supervisor-topology [conf storm-id]
139+
(let [stormroot (supervisor-stormdist-root conf storm-id)
140+
topology-path (supervisor-stormcode-path stormroot)]
141+
(Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
142+
))
131143

132144
(defn worker-root
133145
([conf]

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,6 @@
2424
assignment))
2525
))
2626

27-
(defn- read-storm-cache [conf storm-id]
28-
(let [stormroot (supervisor-stormdist-root conf storm-id)
29-
conf-path (supervisor-stormconf-path stormroot)
30-
topology-path (supervisor-stormcode-path stormroot)]
31-
[(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
32-
(Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))]
33-
))
34-
3527
(defn do-heartbeat [conf worker-id port storm-id task-ids]
3628
(.put (worker-state conf worker-id)
3729
LS-WORKER-HEARTBEAT
@@ -97,7 +89,8 @@
9789
;; do this here so that the worker process dies if this fails
9890
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
9991
_ (heartbeat-fn)
100-
[storm-conf topology] (read-storm-cache conf storm-id)
92+
storm-conf (read-supervisor-storm-conf conf storm-id)
93+
topology (read-supervisor-topology conf storm-id)
10194
event-manager (event/event-manager true)
10295

10396
task->component (storm-task-info storm-cluster-state storm-id)

0 commit comments

Comments
 (0)