File tree Expand file tree Collapse file tree 3 files changed +15
-9
lines changed Expand file tree Collapse file tree 3 files changed +15
-9
lines changed Original file line number Diff line number Diff line change @@ -68,3 +68,4 @@ topology.max.spout.pending: null
68
68
topology.state.synchronization.timeout.secs : 60
69
69
topology.stats.sample.rate : 0.05
70
70
topology.fall.back.on.java.serialization : true
71
+ topology.worker.childopts : nil
Original file line number Diff line number Diff line change 128
128
(defn ^LocalState supervisor-state [conf]
129
129
(LocalState. (str (supervisor-local-dir conf) " /localstate" )))
130
130
131
+ (defn read-supervisor-storm-conf [conf storm-id]
132
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
133
+ conf-path (supervisor-stormconf-path stormroot)
134
+ topology-path (supervisor-stormcode-path stormroot)]
135
+ (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
136
+ ))
137
+
138
+ (defn read-supervisor-topology [conf storm-id]
139
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
140
+ topology-path (supervisor-stormcode-path stormroot)]
141
+ (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
142
+ ))
131
143
132
144
(defn worker-root
133
145
([conf]
Original file line number Diff line number Diff line change 24
24
assignment))
25
25
))
26
26
27
- (defn- read-storm-cache [conf storm-id]
28
- (let [stormroot (supervisor-stormdist-root conf storm-id)
29
- conf-path (supervisor-stormconf-path stormroot)
30
- topology-path (supervisor-stormcode-path stormroot)]
31
- [(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
32
- (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))]
33
- ))
34
-
35
27
(defn do-heartbeat [conf worker-id port storm-id task-ids]
36
28
(.put (worker-state conf worker-id)
37
29
LS-WORKER-HEARTBEAT
97
89
; ; do this here so that the worker process dies if this fails
98
90
; ; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
99
91
_ (heartbeat-fn )
100
- [storm-conf topology] (read-storm-cache conf storm-id)
92
+ storm-conf (read-supervisor-storm-conf conf storm-id)
93
+ topology (read-supervisor-topology conf storm-id)
101
94
event-manager (event/event-manager true )
102
95
103
96
task->component (storm-task-info storm-cluster-state storm-id)
You can’t perform that action at this time.
0 commit comments