|
8 | 8 | (bootstrap)
|
9 | 9 |
|
10 | 10 |
|
11 |
| -(defmulti virtual-port-url cluster-mode) |
12 |
| -(defmulti connect-url cluster-mode) |
| 11 | +(defmulti mk-context cluster-mode) |
13 | 12 |
|
14 | 13 |
|
15 | 14 | (defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
|
|
102 | 101 | (worker-pids-root conf worker-id)
|
103 | 102 | %)
|
104 | 103 |
|
105 |
| - zmq-context (mq/context (storm-conf ZMQ-THREADS)) |
| 104 | + mq-context (mk-context conf storm-conf) |
106 | 105 | outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
|
107 | 106 | endpoint-socket-lock (mk-rw-lock)
|
108 | 107 | node+port->socket (atom {})
|
|
128 | 127 | (into {}
|
129 | 128 | (dofor [[node port :as endpoint] new-connections]
|
130 | 129 | [endpoint
|
131 |
| - (-> zmq-context |
132 |
| - (mq/socket mq/push) |
133 |
| - (mq/set-linger (storm-conf ZMQ-LINGER-MILLIS)) |
134 |
| - (mq/connect |
135 |
| - (connect-url conf |
136 |
| - ((:node->host assignment) node) |
137 |
| - port))) |
| 130 | + (msg/connect |
| 131 | + mq-context |
| 132 | + ((:node->host assignment) node) |
| 133 | + port) |
138 | 134 | ]
|
139 | 135 | )))
|
140 | 136 | (write-locked endpoint-socket-lock
|
|
166 | 162 | (when @active (heartbeat-fn) (conf WORKER-HEARTBEAT-FREQUENCY-SECS))
|
167 | 163 | )
|
168 | 164 | :priority Thread/MAX_PRIORITY)
|
169 |
| - tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id zmq-context cluster-state storm-active-atom transfer-fn)) |
| 165 | + tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn)) |
170 | 166 | threads [(async-loop
|
171 | 167 | (fn []
|
172 | 168 | (.add event-manager refresh-connections)
|
|
184 | 180 | (doseq [[task ^Tuple tuple] drainer]
|
185 | 181 | (let [socket (node+port->socket (task->node+port task))
|
186 | 182 | ser-tuple (.serialize serializer tuple)]
|
187 |
| - (mqvp/virtual-send socket task ser-tuple) |
| 183 | + (msg/send socket task ser-tuple) |
188 | 184 | ))
|
189 | 185 | ))
|
190 | 186 | (.clear drainer)
|
191 | 187 | 0 )
|
192 | 188 | :args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
|
193 | 189 | heartbeat-thread]
|
194 | 190 | _ (log-message "Launching virtual port for " supervisor-id ":" port)
|
195 |
| - virtual-port-shutdown (mqvp/launch-virtual-port! zmq-context |
196 |
| - (virtual-port-url conf port) |
197 |
| - :kill-fn (fn [] |
198 |
| - (halt-process! 11)) |
199 |
| - :valid-ports task-ids) |
| 191 | + virtual-port-shutdown (if (conf STORM-LOCAL-MODE-ZMQ) |
| 192 | + (msg-loader/launch-virtual-port! |
| 193 | + mq-context |
| 194 | + (str "tcp://*:" port) |
| 195 | + :kill-fn (fn [] |
| 196 | + (halt-process! 11)) |
| 197 | + :valid-ports task-ids)) |
200 | 198 | _ (log-message "Launched virtual port for " supervisor-id ":" port)
|
201 | 199 | shutdown* (fn []
|
202 | 200 | (log-message "Shutting down worker " storm-id " " supervisor-id " " port)
|
|
206 | 204 | ;; this will do best effort flushing since the linger period
|
207 | 205 | ;; was set on creation
|
208 | 206 | (.close socket))
|
209 |
| - (virtual-port-shutdown) |
| 207 | + (if virtual-port-shutdown (virtual-port-shutdown)) |
210 | 208 | (log-message "Terminating zmq context")
|
211 |
| - (.term zmq-context) |
| 209 | + (msg/term mq-context) |
212 | 210 | (log-message "Disconnecting from storm cluster state context")
|
213 | 211 | (log-message "Waiting for heartbeat thread to die")
|
214 | 212 | (doseq [t threads]
|
|
234 | 232 | ret
|
235 | 233 | ))
|
236 | 234 |
|
| 235 | +(defmethod mk-context :local [conf storm-conf] |
| 236 | + (msg-loader/mk-local-context)) |
237 | 237 |
|
238 |
| - |
239 |
| -(defmethod virtual-port-url :local [conf port] |
240 |
| - (str "ipc://" port ".ipc")) |
241 |
| - |
242 |
| -(defmethod virtual-port-url :distributed [conf port] |
243 |
| - (str "tcp://*:" port)) |
244 |
| - |
245 |
| -(defmethod connect-url :local [conf host port] |
246 |
| - (str "ipc://" port ".ipc")) |
247 |
| - |
248 |
| -(defmethod connect-url :distributed [conf host port] |
249 |
| - (str "tcp://" host ":" port)) |
250 |
| - |
| 238 | +(defmethod mk-context :distributed [conf storm-conf] |
| 239 | + (msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS) |
| 240 | + (storm-conf ZMQ-LINGER-MILLIS))) |
| 241 | + |
251 | 242 |
|
252 | 243 | (defn -main [storm-id supervisor-id port-str worker-id]
|
253 | 244 | (let [conf (read-storm-config)]
|
|
0 commit comments