Skip to content

Commit 0b2e3d0

Browse files
author
Nathan Marz
committed
supervisors only download code for topologies assigned to them
1 parent bc2baeb commit 0b2e3d0

File tree

1 file changed

+10
-3
lines changed

1 file changed

+10
-3
lines changed

src/clj/backtype/storm/daemon/supervisor.clj

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
))
3939

4040
(defn- read-assignments
41-
"Returns map from port to struct containing :storm-id and :executors and :master-code-dir"
41+
"Returns map from port to struct containing :storm-id and :executors"
4242
[storm-cluster-state supervisor-id callback]
4343
(let [storm-ids (.assignments storm-cluster-state callback)]
4444
(apply merge-with
@@ -250,6 +250,12 @@
250250
id)))
251251
))
252252

253+
(defn assigned-storm-ids-from-port-assignments [assignment]
254+
(->> assignment
255+
vals
256+
(map :storm-id)
257+
set))
258+
253259
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
254260
(fn this []
255261
(let [conf (:conf supervisor)
@@ -258,14 +264,14 @@
258264
^LocalState local-state (:local-state supervisor)
259265
sync-callback (fn [& ignored] (.add event-manager this))
260266
storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
261-
assigned-storm-ids (set (keys storm-code-map))
262267
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
263268
all-assignment (read-assignments
264269
storm-cluster-state
265270
(:supervisor-id supervisor)
266271
sync-callback)
267272
new-assignment (->> all-assignment
268273
(filter-key #(.confirmAssigned isupervisor %)))
274+
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
269275
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
270276
(log-debug "Synchronizing supervisor")
271277
(log-debug "Storm code map: " storm-code-map)
@@ -278,7 +284,8 @@
278284
;; - should this be done separately from usual monitoring?
279285
;; should we only download when topology is assigned to this supervisor?
280286
(doseq [[storm-id master-code-dir] storm-code-map]
281-
(when-not (downloaded-storm-ids storm-id)
287+
(when (and (not (downloaded-storm-ids storm-id))
288+
(assigned-storm-ids storm-id))
282289
(log-message "Downloading code for storm id "
283290
storm-id
284291
" from "

0 commit comments

Comments
 (0)