|
223 | 223 | (select-keys outbound-tasks)
|
224 | 224 | (#(map-val endpoint->string %)))
|
225 | 225 | ;; we dont need a connection for the local tasks anymore
|
226 |
| - needed-connections (->> my-assignment |
227 |
| - (filter-key (complement (-> worker :task-ids set))) |
228 |
| - vals |
229 |
| - set) |
| 226 | + needed-assignment (->> my-assignment |
| 227 | + (filter-key (complement (-> worker :task-ids set)))) |
| 228 | + needed-connections (-> needed-assignment vals set) |
| 229 | + needed-tasks (-> needed-assignment keys) |
| 230 | + |
230 | 231 | current-connections (set (keys @(:cached-node+port->socket worker)))
|
231 | 232 | new-connections (set/difference needed-connections current-connections)
|
232 | 233 | remove-connections (set/difference current-connections needed-connections)]
|
|
252 | 253 | (:cached-node+port->socket worker)
|
253 | 254 | #(HashMap. (dissoc (into {} %1) %&))
|
254 | 255 | remove-connections)
|
255 |
| - ))))) |
| 256 | + |
| 257 | + (let [missing-tasks (->> needed-tasks |
| 258 | + (filter (complement my-assignment)))] |
| 259 | + (when-not (empty? missing-tasks) |
| 260 | + (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks)) |
| 261 | + ))))))) |
256 | 262 |
|
257 | 263 | (defn refresh-storm-active
|
258 | 264 | ([worker]
|
|
286 | 292 | (fast-list-iter [[task ser-tuple] drainer]
|
287 | 293 | ;; TODO: consider write a batch of tuples here to every target worker
|
288 | 294 | ;; group by node+port, do multipart send
|
289 |
| - (let [socket (get node+port->socket (get task->node+port task))] |
290 |
| - (msg/send socket task ser-tuple) |
291 |
| - )))) |
| 295 | + (let [node-port (get task->node+port task)] |
| 296 | + (when node-port |
| 297 | + (msg/send (get node+port->socket node-port) task ser-tuple)) |
| 298 | + )))) |
292 | 299 | (.clear drainer))))))
|
293 | 300 |
|
294 | 301 | (defn launch-receive-thread [worker]
|
|
0 commit comments