|
7 | 7 |
|
8 | 8 | (bootstrap)
|
9 | 9 |
|
10 |
| - |
11 |
| -(defmulti mk-context cluster-mode) |
| 10 | +(defn local-mode-zmq? [conf] |
| 11 | + (or (= (conf STORM-CLUSTER-MODE) "distributed") |
| 12 | + (conf STORM-LOCAL-MODE-ZMQ))) |
12 | 13 |
|
13 | 14 |
|
14 | 15 | (defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
|
|
74 | 75 | ;; what about if there's inconsistency in assignments? -> but nimbus
|
75 | 76 | ;; should guarantee this consistency
|
76 | 77 | ;; TODO: consider doing worker heartbeating rather than task heartbeating to reduce the load on zookeeper
|
77 |
| -(defserverfn mk-worker [conf storm-id supervisor-id port worker-id] |
| 78 | +(defserverfn mk-worker [conf mq-context storm-id supervisor-id port worker-id] |
78 | 79 | (log-message "Launching worker for " storm-id " on " supervisor-id ":" port " with id " worker-id)
|
79 | 80 | (let [active (atom true)
|
80 | 81 | storm-active-atom (atom false)
|
|
101 | 102 | (worker-pids-root conf worker-id)
|
102 | 103 | %)
|
103 | 104 |
|
104 |
| - mq-context (mk-context conf storm-conf) |
| 105 | + mq-context (if mq-context |
| 106 | + mq-context |
| 107 | + (msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS) |
| 108 | + (storm-conf ZMQ-LINGER-MILLIS) |
| 109 | + (= (conf STORM-CLUSTER-MODE) "local"))) |
105 | 110 | outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
|
106 | 111 | endpoint-socket-lock (mk-rw-lock)
|
107 | 112 | node+port->socket (atom {})
|
|
187 | 192 | 0 )
|
188 | 193 | :args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
|
189 | 194 | heartbeat-thread]
|
190 |
| - _ (log-message "Launching virtual port for " supervisor-id ":" port) |
191 |
| - virtual-port-shutdown (if (conf STORM-LOCAL-MODE-ZMQ) |
| 195 | + virtual-port-shutdown (when (local-mode-zmq? conf) |
| 196 | + (log-message "Launching virtual port for " supervisor-id ":" port) |
192 | 197 | (msg-loader/launch-virtual-port!
|
| 198 | + (= (conf STORM-CLUSTER-MODE) "local") |
193 | 199 | mq-context
|
194 |
| - (str "tcp://*:" port) |
195 |
| - :kill-fn (fn [] |
| 200 | + port |
| 201 | + :kill-fn (fn [t] |
196 | 202 | (halt-process! 11))
|
197 | 203 | :valid-ports task-ids))
|
198 |
| - _ (log-message "Launched virtual port for " supervisor-id ":" port) |
199 | 204 | shutdown* (fn []
|
200 | 205 | (log-message "Shutting down worker " storm-id " " supervisor-id " " port)
|
201 | 206 | (reset! active false)
|
|
231 | 236 | (log-message "Worker " worker-id " for storm " storm-id " on " supervisor-id ":" port " has finished loading")
|
232 | 237 | ret
|
233 | 238 | ))
|
234 |
| - |
235 |
| -(defmethod mk-context :local [conf storm-conf] |
236 |
| - (msg-loader/mk-local-context)) |
237 |
| - |
238 |
| -(defmethod mk-context :distributed [conf storm-conf] |
239 |
| - (msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS) |
240 |
| - (storm-conf ZMQ-LINGER-MILLIS))) |
241 | 239 |
|
242 | 240 |
|
243 | 241 | (defn -main [storm-id supervisor-id port-str worker-id]
|
244 | 242 | (let [conf (read-storm-config)]
|
245 | 243 | (validate-distributed-mode! conf)
|
246 |
| - (mk-worker conf storm-id supervisor-id (Integer/parseInt port-str) worker-id))) |
| 244 | + (mk-worker conf nil storm-id supervisor-id (Integer/parseInt port-str) worker-id))) |
0 commit comments