|
53 | 53 | (-> (reverse-map task->component) (select-keys components) vals)))
|
54 | 54 | ))
|
55 | 55 |
|
56 |
| -(defn mk-transfer-fn [transfer-queue] |
| 56 | +(defn mk-transfer-fn [transfer-queue receive-queue-map serializer] |
57 | 57 | (fn [task ^Tuple tuple]
|
58 |
| - (.put ^LinkedBlockingQueue transfer-queue [task tuple]) |
59 |
| - )) |
| 58 | + (if (contains? receive-queue-map task) |
| 59 | + (.put (receive-queue-map task) tuple) |
| 60 | + (let [tuple (.serialize serializer tuple)] |
| 61 | + (.put ^LinkedBlockingQueue transfer-queue [task tuple])) |
| 62 | + ))) |
60 | 63 |
|
61 | 64 | ;; TODO: should worker even take the storm-id as input? this should be
|
62 | 65 | ;; deducable from cluster state (by searching through assignments)
|
|
122 | 125 | {tid (LinkedBlockingQueue.)}))
|
123 | 126 |
|
124 | 127 | ^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil))
|
125 |
| - transfer-fn (mk-transfer-fn transfer-queue) |
| 128 | + transfer-fn (mk-transfer-fn transfer-queue receive-queue-map serializer) |
126 | 129 | refresh-connections (fn this
|
127 | 130 | ([]
|
128 | 131 | (this (fn [& ignored] (.add event-manager this))))
|
129 | 132 | ([callback]
|
130 | 133 | (let [assignment (.assignment-info storm-cluster-state storm-id callback)
|
131 | 134 | my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
|
132 | 135 | ;; we dont need a connection for the local tasks anymore
|
133 |
| - needed-connections (set (vals (filter #(not (contains? task-ids-set (key %))) my-assignment))) |
| 136 | + needed-connections (->> my-assignment |
| 137 | + (filter #(->> % key (contains? task-ids-set) not)) |
| 138 | + vals |
| 139 | + set) |
134 | 140 | current-connections (set (keys @node+port->socket))
|
135 | 141 | new-connections (set/difference needed-connections current-connections)
|
136 | 142 | remove-connections (set/difference current-connections needed-connections)]
|
|
179 | 185 | )
|
180 | 186 | :priority Thread/MAX_PRIORITY)
|
181 | 187 | suicide-fn (mk-suicide-fn conf active)
|
182 |
| - tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid))) |
| 188 | + tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid))) |
183 | 189 | threads [(async-loop
|
184 | 190 | (fn []
|
185 | 191 | (.add event-manager refresh-connections)
|
|
195 | 201 | (let [node+port->socket @node+port->socket
|
196 | 202 | task->node+port @task->node+port]
|
197 | 203 | (doseq [[task tuple] drainer]
|
198 |
| - ;; if its a local-task, add the tuple to its receive-queue directly |
199 |
| - ;; otherwise, send it through the socket. |
200 |
| - (if (contains? task-ids-set task) |
201 |
| - (let [^LinkedBlockingQueue target-receive-queue (receive-queue-map task)] |
202 |
| - (.put target-receive-queue tuple)) |
203 |
| - (let [socket (node+port->socket (task->node+port task))] |
204 |
| - (msg/send socket task (.serialize serializer tuple)) |
205 |
| - )) |
| 204 | + (let [socket (node+port->socket (task->node+port task))] |
| 205 | + (msg/send socket task tuple) |
| 206 | + ) |
206 | 207 | )))
|
207 | 208 | (.clear drainer)
|
208 | 209 | 0 )
|
|
0 commit comments