Skip to content

Commit 84f9bca

Browse files
author
Nathan Marz
committed
worker checks and warns for missing outbound connections from assignment, now drops messages for which doesn't have outbound connection
1 parent 57fb6ce commit 84f9bca

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,11 @@
223223
(select-keys outbound-tasks)
224224
(#(map-val endpoint->string %)))
225225
;; we dont need a connection for the local tasks anymore
226-
needed-connections (->> my-assignment
227-
(filter-key (complement (-> worker :task-ids set)))
228-
vals
229-
set)
226+
needed-assignment (->> my-assignment
227+
(filter-key (complement (-> worker :task-ids set))))
228+
needed-connections (-> needed-assignment vals set)
229+
needed-tasks (-> needed-assignment keys)
230+
230231
current-connections (set (keys @(:cached-node+port->socket worker)))
231232
new-connections (set/difference needed-connections current-connections)
232233
remove-connections (set/difference current-connections needed-connections)]
@@ -252,7 +253,12 @@
252253
(:cached-node+port->socket worker)
253254
#(HashMap. (dissoc (into {} %1) %&))
254255
remove-connections)
255-
)))))
256+
257+
(let [missing-tasks (->> needed-tasks
258+
(filter (complement my-assignment)))]
259+
(when-not (empty? missing-tasks)
260+
(log-warn "Missing assignment for following tasks: " (pr-str missing-tasks))
261+
)))))))
256262

257263
(defn refresh-storm-active
258264
([worker]
@@ -286,9 +292,10 @@
286292
(fast-list-iter [[task ser-tuple] drainer]
287293
;; TODO: consider write a batch of tuples here to every target worker
288294
;; group by node+port, do multipart send
289-
(let [socket (get node+port->socket (get task->node+port task))]
290-
(msg/send socket task ser-tuple)
291-
))))
295+
(let [node-port (get task->node+port task)]
296+
(when node-port
297+
(msg/send (get node+port->socket node-port) task ser-tuple))
298+
))))
292299
(.clear drainer))))))
293300

294301
(defn launch-receive-thread [worker]

0 commit comments

Comments
 (0)