Skip to content

Commit d30ad35

Browse files
author
Nathan Marz
committed
cleanup code, fix receive thread killing code, fix zmq url in remote mode
1 parent ca4c0db commit d30ad35

File tree

4 files changed

+20
-22
lines changed

4 files changed

+20
-22
lines changed

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,7 @@
213213
port
214214
receive-queue-map
215215
:kill-fn (fn [t]
216-
(halt-process! 11))
217-
:valid-tasks task-ids))
216+
(halt-process! 11))))
218217

219218
shutdown* (fn []
220219
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)

src/clj/backtype/storm/messaging/loader.clj

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
(ns backtype.storm.messaging.loader
2-
(:use [clojure.contrib.def :only [defnk]])
32
(:use [backtype.storm.bootstrap])
43
(:import [java.util.concurrent LinkedBlockingQueue])
54
(:require [backtype.storm.messaging.local :as local]))
65

76
(bootstrap)
7+
88
(defn mk-local-context []
99
(local/mk-local-context))
1010

@@ -19,26 +19,20 @@
1919
[context storm-id port receive-queue-map
2020
:daemon true
2121
:kill-fn (fn [t] (System/exit 1))
22-
:priority Thread/NORM_PRIORITY
23-
:valid-tasks nil]
24-
(let [valid-tasks (set (map short valid-tasks))
25-
vthread (async-loop
22+
:priority Thread/NORM_PRIORITY]
23+
(let [vthread (async-loop
2624
(fn [socket receive-queue-map]
2725
(let [[task msg] (msg/recv socket)]
2826
(if (= task -1)
2927
(do
3028
(log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
3129
(.close socket)
3230
nil )
33-
(if (or (nil? valid-tasks) (contains? valid-tasks task))
34-
(let [task (int task)
35-
^LinkedBlockingQueue receive-queue (receive-queue-map task)]
36-
;; TODO: probably need to handle multi-part messages here or something
37-
(.put receive-queue msg)
38-
0
39-
)
40-
(log-message "Receiving-thread:[" storm-id ", " port "] received invalid message for unknown task " task ". Dropping...")
41-
))))
31+
(do
32+
(if (contains? receive-queue-map task)
33+
(.put ^LinkedBlockingQueue (receive-queue-map task) msg)
34+
(log-message "Receiving-thread:[" storm-id ", " port "] received invalid message for unknown task " task ". Dropping..."))
35+
0 ))))
4236
:args-fn (fn [] [(msg/bind context storm-id port) receive-queue-map])
4337
:daemon daemon
4438
:kill-fn kill-fn
@@ -49,9 +43,9 @@
4943
(msg/send kill-socket
5044
-1
5145
(byte-array []))
52-
(.close kill-socket)
5346
(log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
5447
(.join vthread)
48+
(.close kill-socket)
5549
(log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
5650
))))
5751

src/clj/backtype/storm/messaging/zmq.clj

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,17 @@
2020
[port msg]
2121
))
2222

23-
(defn get-zmq-url [local? port]
23+
(defn get-bind-zmq-url [local? port]
2424
(if local?
2525
(str "ipc://" port ".ipc")
2626
(str "tcp://*:" port)))
2727

28+
(defn get-connect-zmq-url [local? host port]
29+
(if local?
30+
(str "ipc://" port ".ipc")
31+
(str "tcp://" host ":" port)))
32+
33+
2834
(defprotocol ZMQContextQuery
2935
(zmq-context [this]))
3036

@@ -43,14 +49,14 @@
4349
(bind [this storm-id port]
4450
(-> context
4551
(mq/socket mq/pull)
46-
(mq/bind (get-zmq-url local? port))
52+
(mq/bind (get-bind-zmq-url local? port))
4753
(ZMQConnection.)
4854
))
4955
(connect [this storm-id host port]
5056
(-> context
5157
(mq/socket mq/push)
5258
(mq/set-linger linger-ms)
53-
(mq/connect (get-zmq-url local? port))
59+
(mq/connect (get-connect-zmq-url local? host port))
5460
(ZMQConnection.)))
5561
(term [this]
5662
(.term context))

src/clj/backtype/storm/testing.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@
8787
))
8888

8989
(defn mk-shared-context [conf]
90-
(if (and (= (conf STORM-CLUSTER-MODE) "local")
91-
(not (conf STORM-LOCAL-MODE-ZMQ)))
90+
(if-not (conf STORM-LOCAL-MODE-ZMQ)
9291
(msg-loader/mk-local-context)
9392
))
9493

0 commit comments

Comments
 (0)