Skip to content

Commit 9d33777

Browse files
committed
apply the impovements mentioned in the pull request
1 parent b0ddc4d commit 9d33777

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

src/clj/backtype/storm/daemon/task.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@
158158
(.getThisTaskId topology-context)
159159
stream))))
160160

161-
(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn
161+
(defn mk-task [conf storm-conf topology-context user-context storm-id cluster-state storm-active-atom transfer-fn suicide-fn
162162
receive-queue]
163163
(let [task-id (.getThisTaskId topology-context)
164164
worker-port (.getThisWorkerPort topology-context)

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@
5353
(-> (reverse-map task->component) (select-keys components) vals)))
5454
))
5555

56-
(defn mk-transfer-fn [transfer-queue]
56+
(defn mk-transfer-fn [transfer-queue receive-queue-map serializer]
5757
(fn [task ^Tuple tuple]
58-
(.put ^LinkedBlockingQueue transfer-queue [task tuple])
59-
))
58+
(if (contains? receive-queue-map task)
59+
(.put (receive-queue-map task) tuple)
60+
(let [tuple (.serialize serializer tuple)]
61+
(.put ^LinkedBlockingQueue transfer-queue [task tuple]))
62+
)))
6063

6164
;; TODO: should worker even take the storm-id as input? this should be
6265
;; deducable from cluster state (by searching through assignments)
@@ -122,15 +125,18 @@
122125
{tid (LinkedBlockingQueue.)}))
123126

124127
^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil))
125-
transfer-fn (mk-transfer-fn transfer-queue)
128+
transfer-fn (mk-transfer-fn transfer-queue receive-queue-map serializer)
126129
refresh-connections (fn this
127130
([]
128131
(this (fn [& ignored] (.add event-manager this))))
129132
([callback]
130133
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
131134
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
132135
;; we dont need a connection for the local tasks anymore
133-
needed-connections (set (vals (filter #(not (contains? task-ids-set (key %))) my-assignment)))
136+
needed-connections (->> my-assignment
137+
(filter #(->> % key (contains? task-ids-set) not))
138+
vals
139+
set)
134140
current-connections (set (keys @node+port->socket))
135141
new-connections (set/difference needed-connections current-connections)
136142
remove-connections (set/difference current-connections needed-connections)]
@@ -179,7 +185,7 @@
179185
)
180186
:priority Thread/MAX_PRIORITY)
181187
suicide-fn (mk-suicide-fn conf active)
182-
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid)))
188+
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid)))
183189
threads [(async-loop
184190
(fn []
185191
(.add event-manager refresh-connections)
@@ -195,14 +201,9 @@
195201
(let [node+port->socket @node+port->socket
196202
task->node+port @task->node+port]
197203
(doseq [[task tuple] drainer]
198-
;; if its a local-task, add the tuple to its receive-queue directly
199-
;; otherwise, send it through the socket.
200-
(if (contains? task-ids-set task)
201-
(let [^LinkedBlockingQueue target-receive-queue (receive-queue-map task)]
202-
(.put target-receive-queue tuple))
203-
(let [socket (node+port->socket (task->node+port task))]
204-
(msg/send socket task (.serialize serializer tuple))
205-
))
204+
(let [socket (node+port->socket (task->node+port task))]
205+
(msg/send socket task tuple)
206+
)
206207
)))
207208
(.clear drainer)
208209
0 )

0 commit comments

Comments
 (0)