|
38 | 38 | ))
|
39 | 39 |
|
40 | 40 | (defn- read-assignments
|
41 |
| - "Returns map from port to struct containing :storm-id and :executors and :master-code-dir" |
| 41 | + "Returns map from port to struct containing :storm-id and :executors" |
42 | 42 | [storm-cluster-state supervisor-id callback]
|
43 | 43 | (let [storm-ids (.assignments storm-cluster-state callback)]
|
44 | 44 | (apply merge-with
|
|
250 | 250 | id)))
|
251 | 251 | ))
|
252 | 252 |
|
| 253 | +(defn assigned-storm-ids-from-port-assignments [assignment] |
| 254 | + (->> assignment |
| 255 | + vals |
| 256 | + (map :storm-id) |
| 257 | + set)) |
| 258 | + |
253 | 259 | (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
|
254 | 260 | (fn this []
|
255 | 261 | (let [conf (:conf supervisor)
|
|
258 | 264 | ^LocalState local-state (:local-state supervisor)
|
259 | 265 | sync-callback (fn [& ignored] (.add event-manager this))
|
260 | 266 | storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
|
261 |
| - assigned-storm-ids (set (keys storm-code-map)) |
262 | 267 | downloaded-storm-ids (set (read-downloaded-storm-ids conf))
|
263 | 268 | all-assignment (read-assignments
|
264 | 269 | storm-cluster-state
|
265 | 270 | (:supervisor-id supervisor)
|
266 | 271 | sync-callback)
|
267 | 272 | new-assignment (->> all-assignment
|
268 | 273 | (filter-key #(.confirmAssigned isupervisor %)))
|
| 274 | + assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) |
269 | 275 | existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
|
270 | 276 | (log-debug "Synchronizing supervisor")
|
271 | 277 | (log-debug "Storm code map: " storm-code-map)
|
|
278 | 284 | ;; - should this be done separately from usual monitoring?
|
279 | 285 | ;; should we only download when topology is assigned to this supervisor?
|
280 | 286 | (doseq [[storm-id master-code-dir] storm-code-map]
|
281 |
| - (when-not (downloaded-storm-ids storm-id) |
| 287 | + (when (and (not (downloaded-storm-ids storm-id)) |
| 288 | + (assigned-storm-ids storm-id)) |
282 | 289 | (log-message "Downloading code for storm id "
|
283 | 290 | storm-id
|
284 | 291 | " from "
|
|
0 commit comments