Skip to content

Commit 25e8d8a

Browse files
author
Nathan Marz
committed
finished extracting zmq code so that local mode can be run without native deps
1 parent 2ce3acf commit 25e8d8a

File tree

7 files changed

+66
-53
lines changed

7 files changed

+66
-53
lines changed

src/clj/backtype/storm/bootstrap.clj

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
TopologyContext ShellBolt
1515
CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt]))
1616
(import (quote [backtype.storm.daemon Shutdownable]))
17+
(require (quote [backtype.storm.messaging.loader :as msg-loader]))
18+
(require (quote [backtype.storm.messaging.protocol :as msg]))
1719
(use (quote [backtype.storm config util log clojure]))
1820
(use (quote [clojure.contrib.seq :only [find-first]]))
1921
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
2022
[event :as event] [process-simulator :as psim]]))
2123
(require (quote [clojure.set :as set]))
22-
(require (quote [zilch [mq :as mq]]))
23-
(require (quote [zilch [virtual-port :as mqvp]]))
2424
(require (quote [backtype.storm [stats :as stats]]))
2525
(import (quote [org.apache.log4j PropertyConfigurator Logger]))
2626

@@ -34,5 +34,4 @@
3434
(import (quote [java.util List Random Map HashMap]))
3535
(import (quote [org.apache.commons.io FileUtils]))
3636
(import (quote [java.util ArrayList]))
37-
(mq/zeromq-imports)
3837
))

src/clj/backtype/storm/daemon/task.clj

+8-10
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
ACKER-ACK-STREAM-ID))
170170
)))
171171

172-
(defn mk-task [conf storm-conf topology-context storm-id zmq-context cluster-state storm-active-atom transfer-fn]
172+
(defn mk-task [conf storm-conf topology-context storm-id mq-context cluster-state storm-active-atom transfer-fn]
173173
(let [task-id (.getThisTaskId topology-context)
174174
component-id (.getThisComponentId topology-context)
175175
task-info (.getTaskToComponent topology-context)
@@ -200,7 +200,7 @@
200200
stream->component->grouper (outbound-components topology-context)
201201
component->tasks (reverse-map task-info)
202202
;; important it binds to virtual port before function returns
203-
^ZMQ$Socket puller (-> zmq-context (mq/socket mq/pull) (mqvp/virtual-bind task-id))
203+
puller (msg/bind mq-context task-id)
204204

205205
;; TODO: consider DRYing things up and moving stats / tuple -> multiple components code here
206206
task-transfer-fn (fn [task ^Tuple tuple]
@@ -285,10 +285,8 @@
285285
[this]
286286
(log-message "Shutting down task " storm-id ":" task-id)
287287
(reset! active false)
288-
(let [pusher (-> zmq-context (mq/socket mq/push) (mqvp/virtual-connect task-id))]
289-
;; empty messages are skip messages (this unblocks the socket)
290-
(mq/send pusher (mq/barr))
291-
(.close pusher))
288+
;; empty messages are skip messages (this unblocks the socket)
289+
(msg/send-local-task-empty mq-context task-id)
292290
(doseq [t all-threads]
293291
(.interrupt t)
294292
(.join t))
@@ -319,7 +317,7 @@
319317
(stats/spout-acked-tuple! task-stats (.getSourceStreamId tuple) time-delta)
320318
))
321319

322-
(defmethod mk-executors ISpout [^ISpout spout storm-conf ^ZMQ$Socket puller send-fn storm-active-atom
320+
(defmethod mk-executors ISpout [^ISpout spout storm-conf puller send-fn storm-active-atom
323321
^TopologyContext topology-context task-stats report-error-fn]
324322
(let [wait-fn (fn [] @storm-active-atom)
325323
max-spout-pending (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)
@@ -384,7 +382,7 @@
384382
;; TODO: log that it's getting throttled
385383
))
386384
(fn []
387-
(let [^bytes ser-msg (mq/recv puller)]
385+
(let [^bytes ser-msg (msg/recv puller)]
388386
;; skip empty messages (used during shutdown)
389387
(when-not (empty? ser-msg)
390388
(let [tuple (.deserialize deserializer ser-msg)
@@ -409,7 +407,7 @@
409407
(time-delta-ms start-time))
410408
))
411409

412-
(defmethod mk-executors IBolt [^IBolt bolt storm-conf ^ZMQ$Socket puller send-fn storm-active-atom
410+
(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller send-fn storm-active-atom
413411
^TopologyContext topology-context task-stats report-error-fn]
414412
(let [deserializer (TupleDeserializer. storm-conf topology-context)
415413
task-id (.getThisTaskId topology-context)
@@ -469,7 +467,7 @@
469467
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
470468
;; or just timeout the sync messages that are coming in until full sync is hit from that task
471469
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
472-
(let [^bytes ser (mq/recv puller)]
470+
(let [^bytes ser (msg/recv puller)]
473471
(when-not (empty? ser) ; skip empty messages (used during shutdown)
474472
(log-debug "Processing message")
475473
(let [tuple (.deserialize deserializer ser)]

src/clj/backtype/storm/daemon/worker.clj

+23-32
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
(bootstrap)
99

1010

11-
(defmulti virtual-port-url cluster-mode)
12-
(defmulti connect-url cluster-mode)
11+
(defmulti mk-context cluster-mode)
1312

1413

1514
(defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
@@ -102,7 +101,7 @@
102101
(worker-pids-root conf worker-id)
103102
%)
104103

105-
zmq-context (mq/context (storm-conf ZMQ-THREADS))
104+
mq-context (mk-context conf storm-conf)
106105
outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
107106
endpoint-socket-lock (mk-rw-lock)
108107
node+port->socket (atom {})
@@ -128,13 +127,10 @@
128127
(into {}
129128
(dofor [[node port :as endpoint] new-connections]
130129
[endpoint
131-
(-> zmq-context
132-
(mq/socket mq/push)
133-
(mq/set-linger (storm-conf ZMQ-LINGER-MILLIS))
134-
(mq/connect
135-
(connect-url conf
136-
((:node->host assignment) node)
137-
port)))
130+
(msg/connect
131+
mq-context
132+
((:node->host assignment) node)
133+
port)
138134
]
139135
)))
140136
(write-locked endpoint-socket-lock
@@ -166,7 +162,7 @@
166162
(when @active (heartbeat-fn) (conf WORKER-HEARTBEAT-FREQUENCY-SECS))
167163
)
168164
:priority Thread/MAX_PRIORITY)
169-
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id zmq-context cluster-state storm-active-atom transfer-fn))
165+
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn))
170166
threads [(async-loop
171167
(fn []
172168
(.add event-manager refresh-connections)
@@ -184,19 +180,21 @@
184180
(doseq [[task ^Tuple tuple] drainer]
185181
(let [socket (node+port->socket (task->node+port task))
186182
ser-tuple (.serialize serializer tuple)]
187-
(mqvp/virtual-send socket task ser-tuple)
183+
(msg/send socket task ser-tuple)
188184
))
189185
))
190186
(.clear drainer)
191187
0 )
192188
:args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
193189
heartbeat-thread]
194190
_ (log-message "Launching virtual port for " supervisor-id ":" port)
195-
virtual-port-shutdown (mqvp/launch-virtual-port! zmq-context
196-
(virtual-port-url conf port)
197-
:kill-fn (fn []
198-
(halt-process! 11))
199-
:valid-ports task-ids)
191+
virtual-port-shutdown (if (conf STORM-LOCAL-MODE-ZMQ)
192+
(msg-loader/launch-virtual-port!
193+
mq-context
194+
(str "tcp://*:" port)
195+
:kill-fn (fn []
196+
(halt-process! 11))
197+
:valid-ports task-ids))
200198
_ (log-message "Launched virtual port for " supervisor-id ":" port)
201199
shutdown* (fn []
202200
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
@@ -206,9 +204,9 @@
206204
;; this will do best effort flushing since the linger period
207205
;; was set on creation
208206
(.close socket))
209-
(virtual-port-shutdown)
207+
(if virtual-port-shutdown (virtual-port-shutdown))
210208
(log-message "Terminating zmq context")
211-
(.term zmq-context)
209+
(msg/term mq-context)
212210
(log-message "Disconnecting from storm cluster state context")
213211
(log-message "Waiting for heartbeat thread to die")
214212
(doseq [t threads]
@@ -234,20 +232,13 @@
234232
ret
235233
))
236234

235+
(defmethod mk-context :local [conf storm-conf]
236+
(msg-loader/mk-local-context))
237237

238-
239-
(defmethod virtual-port-url :local [conf port]
240-
(str "ipc://" port ".ipc"))
241-
242-
(defmethod virtual-port-url :distributed [conf port]
243-
(str "tcp://*:" port))
244-
245-
(defmethod connect-url :local [conf host port]
246-
(str "ipc://" port ".ipc"))
247-
248-
(defmethod connect-url :distributed [conf host port]
249-
(str "tcp://" host ":" port))
250-
238+
(defmethod mk-context :distributed [conf storm-conf]
239+
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
240+
(storm-conf ZMQ-LINGER-MILLIS)))
241+
251242

252243
(defn -main [storm-id supervisor-id port-str worker-id]
253244
(let [conf (read-storm-config)]

src/clj/backtype/storm/messaging/loader.clj

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
var-get)]
1212
(afn num-threads linger)))
1313

14-
(defn launch-virtual-port! [& args]
14+
(defn launch-virtual-port! [context & args]
1515
(require '[zilch.virtual-port :as mqvp])
16+
(require '[backtype.storm.messaging.zmq :as zmq])
1617
(let [afn (-> 'zilch.virtual-port/launch-virtual-port!
1718
find-var
18-
var-get)]
19-
(apply afn args)))
20-
19+
var-get)
20+
]
21+
(apply afn (cons (.zmq-context context) args))))

src/clj/backtype/storm/messaging/local.clj

+9-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
(send [this task message]
1414
(let [send-queue (@queues-map task)]
1515
(.put send-queue message)
16-
)))
16+
))
17+
(close [this]
18+
))
1719

1820
(defn add-queue! [queues-map lock port]
1921
(locking lock
@@ -26,6 +28,12 @@
2628
(LocalConnection. queues-map (add-queue! queues-map lock virtual-port)))
2729
(connect [this host port]
2830
(LocalConnection. queues-map nil)
31+
)
32+
(send-local-task-empty [this virtual-port]
33+
(let [queue (add-queue! queues-map lock virtual-port)]
34+
(.put queue (byte-array []))
35+
))
36+
(term [this]
2937
))
3038

3139
(defn mk-local-context []

src/clj/backtype/storm/messaging/protocol.clj

+3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
(defprotocol Connection
66
(recv [conn])
77
(send [conn task message])
8+
(close [conn])
89
)
910

1011
(defprotocol Context
1112
(bind [context virtual-port])
1213
(connect [context host port])
14+
(send-local-task-empty [context virtual-port])
15+
(term [context])
1316
)
1417

src/clj/backtype/storm/messaging/zmq.clj

+16-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
(:require [zilch.mq :as mq]
55
[zilch.virtual-port :as mqvp]))
66

7+
(defprotocol ZMQContextQuery
8+
(zmq-context [this]))
9+
710
(deftype ZMQConnection [socket]
811
Connection
912
(recv [this]
1013
(mq/recv socket))
1114
(send [this task message]
12-
(mqvp/virtual-send socket task message)
15+
(mqvp/virtual-send socket task message))
16+
(close [this]
17+
(.close socket)
1318
))
1419

1520
(deftype ZMQContext [context linger-ms]
@@ -24,8 +29,16 @@
2429
(-> context
2530
(mq/socket mq/push)
2631
(mq/set-linger linger-ms)
27-
(mq/connect url))
28-
)))
32+
(mq/connect url))))
33+
(send-local-task-empty [this virtual-port]
34+
(let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))]
35+
(mq/send pusher (mq/barr))
36+
(.close pusher)))
37+
(term [this]
38+
(.term context))
39+
ZMQContextQuery
40+
(zmq-context [this]
41+
context))
2942

3043

3144
(defn mk-zmq-context [num-threads linger]

0 commit comments

Comments
 (0)