Skip to content

Commit a527240

Browse files
author
Nathan Marz
committed
cleanup code
1 parent d30ad35 commit a527240

File tree

4 files changed

+14
-20
lines changed

4 files changed

+14
-20
lines changed

src/clj/backtype/storm/daemon/task.clj

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -294,13 +294,10 @@
294294
(defn mk-task-receiver [^LinkedBlockingQueue receive-queue ^KryoTupleDeserializer deserializer tuple-action-fn]
295295
(fn []
296296
(let [msg (.take receive-queue)
297-
is-ser-msg? (not (instance? Tuple msg))
298-
is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))]
299-
(when-not is-empty-msg? ; skip empty messages (used during shutdown)
300-
(log-debug "Processing message")
301-
(let [^Tuple tuple (if is-ser-msg?
302-
(.deserialize deserializer msg)
303-
msg)]
297+
is-tuple? (instance? Tuple msg)]
298+
(when (or is-tuple? (not (empty? msg))) ; skip empty messages (used during shutdown)
299+
(log-debug "Processing message " msg)
300+
(let [^Tuple tuple (if is-tuple? msg (.deserialize deserializer msg))]
304301
(tuple-action-fn tuple)
305302
))
306303
)))
@@ -381,6 +378,7 @@
381378
(log-message "Opening spout " component-id ":" task-id)
382379
(.open spout storm-conf user-context (SpoutOutputCollector. output-collector))
383380
(log-message "Opened spout " component-id ":" task-id)
381+
;; TODO: should redesign this to only use one thread
384382
[(fn []
385383
;; This design requires that spouts be non-blocking
386384
(loop []

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@
7070
storm-active-atom (atom false)
7171
cluster-state (cluster/mk-distributed-cluster-state conf)
7272
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
73-
task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
74-
task-ids-set (set task-ids)
73+
task-ids (set (read-worker-task-ids storm-cluster-state storm-id supervisor-id port))
7574
;; because in local mode, its not a separate
7675
;; process. supervisor will register it in this case
7776
_ (when (= :distributed (cluster-mode conf))
@@ -94,7 +93,7 @@
9493
(worker-pids-root conf worker-id)
9594
%
9695
port
97-
task-ids)
96+
(vec task-ids))
9897
mk-user-context #(TopologyContext. topology
9998
storm-conf
10099
task->component
@@ -104,7 +103,7 @@
104103
(worker-pids-root conf worker-id)
105104
%
106105
port
107-
task-ids)
106+
(vec task-ids))
108107
mq-context (if mq-context
109108
mq-context
110109
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
@@ -129,7 +128,7 @@
129128
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
130129
;; we dont need a connection for the local tasks anymore
131130
needed-connections (->> my-assignment
132-
(filter #(->> % key (contains? task-ids-set) not))
131+
(filter-key (complement task-ids))
133132
vals
134133
set)
135134
current-connections (set (keys @node+port->socket))
@@ -195,9 +194,9 @@
195194
(read-locked endpoint-socket-lock
196195
(let [node+port->socket @node+port->socket
197196
task->node+port @task->node+port]
198-
(doseq [[task tuple] drainer]
197+
(doseq [[task ser-tuple] drainer]
199198
(let [socket (node+port->socket (task->node+port task))]
200-
(msg/send socket task tuple)
199+
(msg/send socket task ser-tuple)
201200
)
202201
)))
203202
(.clear drainer)

src/clj/backtype/storm/messaging/loader.clj

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
(ns backtype.storm.messaging.loader
2-
(:use [backtype.storm.bootstrap])
2+
(:use [backtype.storm util log])
33
(:import [java.util.concurrent LinkedBlockingQueue])
4-
(:require [backtype.storm.messaging.local :as local]))
5-
6-
(bootstrap)
4+
(:require [backtype.storm.messaging [local :as local] [protocol :as msg]]))
75

86
(defn mk-local-context []
97
(local/mk-local-context))

src/clj/backtype/storm/messaging/local.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
(recv [this]
1717
(when-not queue
1818
(throw (IllegalArgumentException. "Cannot receive on this socket")))
19-
(let [[port tuple] (.take queue)]
20-
[(short port) tuple]))
19+
(.take queue))
2120
(send [this task message]
2221
(let [send-queue (add-queue! queues-map lock storm-id port)]
2322
(.put send-queue [task message])

0 commit comments

Comments
 (0)