|
1 | 1 | (ns backtype.storm.daemon.task
|
2 | 2 | (:use [backtype.storm.daemon common])
|
3 | 3 | (:use [backtype.storm bootstrap])
|
| 4 | + (:use [clojure.contrib.seq :only [positions]]) |
4 | 5 | (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
|
5 | 6 | (:import [backtype.storm.hooks ITaskHook])
|
6 | 7 | (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
|
|
9 | 10 |
|
10 | 11 | (bootstrap)
|
11 | 12 |
|
12 |
| -(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields num-tasks] |
13 |
| - (fn [^List values] |
14 |
| - (mod (tuple/list-hash-code (.select out-fields group-fields values)) |
15 |
| - num-tasks) |
16 |
| - )) |
| 13 | +(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks] |
| 14 | + (let [num-tasks (count target-tasks) |
| 15 | + task-getter (fn [i] (.get target-tasks i))] |
| 16 | + (fn [^List values] |
| 17 | + (-> (.select out-fields group-fields values) |
| 18 | + tuple/list-hash-code |
| 19 | + (mod num-tasks) |
| 20 | + task-getter)))) |
17 | 21 |
|
18 |
| -(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^Fields out-fields num-tasks] |
19 |
| - (.prepare grouping out-fields num-tasks) |
| 22 | +(defn- mk-shuffle-grouper [target-tasks] |
| 23 | + (let [num-tasks (count target-tasks) |
| 24 | + choices (rotating-random-range num-tasks)] |
| 25 | + (fn [tuple] |
| 26 | + (->> (acquire-random-range-id choices num-tasks) |
| 27 | + (.get target-tasks))))) |
| 28 | + |
| 29 | +(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^TopologyContext context ^Fields out-fields target-tasks] |
| 30 | + (.prepare grouping context out-fields target-tasks) |
20 | 31 | (fn [^List values]
|
21 |
| - (.taskIndices grouping values) |
| 32 | + (.chooseTasks grouping values) |
22 | 33 | ))
|
23 | 34 |
|
24 | 35 | (defn- mk-grouper
|
25 | 36 | "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
|
26 |
| - [^Fields out-fields thrift-grouping num-tasks] |
27 |
| - (let [random (Random.)] |
| 37 | + [^TopologyContext context ^Fields out-fields thrift-grouping ^List target-tasks] |
| 38 | + (let [num-tasks (count target-tasks) |
| 39 | + random (Random.) |
| 40 | + target-tasks (vec (sort target-tasks))] |
28 | 41 | (condp = (thrift/grouping-type thrift-grouping)
|
29 | 42 | :fields
|
30 | 43 | (if (thrift/global-grouping? thrift-grouping)
|
31 | 44 | (fn [tuple]
|
32 | 45 | ;; It's possible for target to have multiple tasks if it reads multiple sources
|
33 |
| - 0 ) |
| 46 | + (first target-tasks)) |
34 | 47 | (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
|
35 |
| - (mk-fields-grouper out-fields group-fields num-tasks) |
| 48 | + (mk-fields-grouper out-fields group-fields target-tasks) |
36 | 49 | ))
|
37 | 50 | :all
|
38 |
| - (fn [tuple] |
39 |
| - (range num-tasks)) |
| 51 | + (fn [tuple] target-tasks) |
40 | 52 | :shuffle
|
41 |
| - (let [choices (rotating-random-range num-tasks)] |
42 |
| - (fn [tuple] |
43 |
| - (acquire-random-range-id choices num-tasks) |
44 |
| - )) |
| 53 | + (mk-shuffle-grouper target-tasks) |
| 54 | + :local-or-shuffle |
| 55 | + (let [same-tasks (set/intersection |
| 56 | + (set target-tasks) |
| 57 | + (set (.getThisWorkerTasks context)))] |
| 58 | + (if-not (empty? same-tasks) |
| 59 | + (mk-shuffle-grouper (vec same-tasks)) |
| 60 | + (mk-shuffle-grouper target-tasks))) |
45 | 61 | :none
|
46 | 62 | (fn [tuple]
|
47 |
| - (mod (.nextInt random) num-tasks)) |
| 63 | + (let [i (mod (.nextInt random) num-tasks)] |
| 64 | + (.get target-tasks i) |
| 65 | + )) |
48 | 66 | :custom-object
|
49 | 67 | (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
|
50 |
| - (mk-custom-grouper grouping out-fields num-tasks)) |
| 68 | + (mk-custom-grouper grouping context out-fields target-tasks)) |
51 | 69 | :custom-serialized
|
52 | 70 | (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
|
53 |
| - (mk-custom-grouper grouping out-fields num-tasks)) |
| 71 | + (mk-custom-grouper grouping context out-fields target-tasks)) |
54 | 72 | :direct
|
55 | 73 | :direct
|
56 | 74 | )))
|
|
88 | 106 |
|
89 | 107 | (defn outbound-components
|
90 | 108 | "Returns map of stream id to component id to grouper"
|
91 |
| - [^TopologyContext topology-context] |
| 109 | + [^TopologyContext topology-context ^TopologyContext user-context] |
92 | 110 | (let [output-groupings (clojurify-structure (.getThisTargets topology-context))]
|
93 | 111 | (into {}
|
94 | 112 | (for [[stream-id component->grouping] output-groupings
|
|
98 | 116 | [stream-id
|
99 | 117 | (into {}
|
100 | 118 | (for [[component tgrouping] component->grouping]
|
101 |
| - [component (mk-grouper out-fields |
| 119 | + [component (mk-grouper user-context |
| 120 | + out-fields |
102 | 121 | tgrouping
|
103 |
| - (count (.getComponentTasks topology-context component)) |
| 122 | + (.getComponentTasks topology-context component) |
104 | 123 | )]
|
105 | 124 | ))]))
|
106 | 125 | ))
|
|
155 | 174 |
|
156 | 175 | report-error-and-die (fn [error]
|
157 | 176 | (report-error error)
|
| 177 | + (apply-hooks user-context .error error) |
158 | 178 | (suicide-fn))
|
159 | 179 |
|
160 | 180 | ;; heartbeat ASAP so nimbus doesn't reassign
|
|
172 | 192 | _ (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
|
173 | 193 | (.addTaskHook user-context (-> klass Class/forName .newInstance)))
|
174 | 194 |
|
175 |
| - stream->component->grouper (outbound-components topology-context) |
| 195 | + stream->component->grouper (outbound-components topology-context user-context) |
176 | 196 | component->tasks (reverse-map task-info)
|
177 | 197 | ;; important it binds to virtual port before function returns
|
178 | 198 | puller (msg/bind mq-context storm-id task-id)
|
|
207 | 227 | (when (= :direct grouper)
|
208 | 228 | ;; TODO: this is wrong, need to check how the stream was declared
|
209 | 229 | (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
|
210 |
| - (let [tasks (component->tasks out-component) |
211 |
| - indices (collectify (grouper values))] |
212 |
| - (for [i indices] (tasks i)))) |
| 230 | + (collectify (grouper values))) |
213 | 231 | (stream->component->grouper stream))]
|
214 | 232 | (apply-hooks user-context .emit (EmitInfo. values stream out-tasks))
|
215 | 233 | (when (emit-sampler)
|
|
0 commit comments