Skip to content

Commit 438a858

Browse files
author
Nathan Marz
committed
refactored groupings to return task ids instead of indices, added local-or-shuffle grouping
1 parent 195ddaf commit 438a858

File tree

12 files changed

+162
-47
lines changed

12 files changed

+162
-47
lines changed

src/clj/backtype/storm/daemon/task.clj

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
(ns backtype.storm.daemon.task
22
(:use [backtype.storm.daemon common])
33
(:use [backtype.storm bootstrap])
4+
(:use [clojure.contrib.seq :only [positions]])
45
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
56
(:import [backtype.storm.hooks ITaskHook])
67
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
@@ -9,48 +10,65 @@
910

1011
(bootstrap)
1112

12-
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields num-tasks]
13-
(fn [^List values]
14-
(mod (tuple/list-hash-code (.select out-fields group-fields values))
15-
num-tasks)
16-
))
13+
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks]
14+
(let [num-tasks (count target-tasks)
15+
task-getter (fn [i] (.get target-tasks i))]
16+
(fn [^List values]
17+
(-> (.select out-fields group-fields values)
18+
tuple/list-hash-code
19+
(mod num-tasks)
20+
task-getter))))
1721

18-
(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^Fields out-fields num-tasks]
19-
(.prepare grouping out-fields num-tasks)
22+
(defn- mk-shuffle-grouper [target-tasks]
23+
(let [num-tasks (count target-tasks)
24+
choices (rotating-random-range num-tasks)]
25+
(fn [tuple]
26+
(->> (acquire-random-range-id choices num-tasks)
27+
(.get target-tasks)))))
28+
29+
(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^TopologyContext context ^Fields out-fields target-tasks]
30+
(.prepare grouping context out-fields target-tasks)
2031
(fn [^List values]
21-
(.taskIndices grouping values)
32+
(.chooseTasks grouping values)
2233
))
2334

2435
(defn- mk-grouper
2536
"Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
26-
[^Fields out-fields thrift-grouping num-tasks]
27-
(let [random (Random.)]
37+
[^TopologyContext context ^Fields out-fields thrift-grouping ^List target-tasks]
38+
(let [num-tasks (count target-tasks)
39+
random (Random.)
40+
target-tasks (vec (sort target-tasks))]
2841
(condp = (thrift/grouping-type thrift-grouping)
2942
:fields
3043
(if (thrift/global-grouping? thrift-grouping)
3144
(fn [tuple]
3245
;; It's possible for target to have multiple tasks if it reads multiple sources
33-
0 )
46+
(first target-tasks))
3447
(let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
35-
(mk-fields-grouper out-fields group-fields num-tasks)
48+
(mk-fields-grouper out-fields group-fields target-tasks)
3649
))
3750
:all
38-
(fn [tuple]
39-
(range num-tasks))
51+
(fn [tuple] target-tasks)
4052
:shuffle
41-
(let [choices (rotating-random-range num-tasks)]
42-
(fn [tuple]
43-
(acquire-random-range-id choices num-tasks)
44-
))
53+
(mk-shuffle-grouper target-tasks)
54+
:local-or-shuffle
55+
(let [same-tasks (set/intersection
56+
(set target-tasks)
57+
(set (.getThisWorkerTasks context)))]
58+
(if-not (empty? same-tasks)
59+
(mk-shuffle-grouper (vec same-tasks))
60+
(mk-shuffle-grouper target-tasks)))
4561
:none
4662
(fn [tuple]
47-
(mod (.nextInt random) num-tasks))
63+
(let [i (mod (.nextInt random) num-tasks)]
64+
(.get target-tasks i)
65+
))
4866
:custom-object
4967
(let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
50-
(mk-custom-grouper grouping out-fields num-tasks))
68+
(mk-custom-grouper grouping context out-fields target-tasks))
5169
:custom-serialized
5270
(let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
53-
(mk-custom-grouper grouping out-fields num-tasks))
71+
(mk-custom-grouper grouping context out-fields target-tasks))
5472
:direct
5573
:direct
5674
)))
@@ -88,7 +106,7 @@
88106

89107
(defn outbound-components
90108
"Returns map of stream id to component id to grouper"
91-
[^TopologyContext topology-context]
109+
[^TopologyContext topology-context ^TopologyContext user-context]
92110
(let [output-groupings (clojurify-structure (.getThisTargets topology-context))]
93111
(into {}
94112
(for [[stream-id component->grouping] output-groupings
@@ -98,9 +116,10 @@
98116
[stream-id
99117
(into {}
100118
(for [[component tgrouping] component->grouping]
101-
[component (mk-grouper out-fields
119+
[component (mk-grouper user-context
120+
out-fields
102121
tgrouping
103-
(count (.getComponentTasks topology-context component))
122+
(.getComponentTasks topology-context component)
104123
)]
105124
))]))
106125
))
@@ -155,6 +174,7 @@
155174

156175
report-error-and-die (fn [error]
157176
(report-error error)
177+
(apply-hooks user-context .error error)
158178
(suicide-fn))
159179

160180
;; heartbeat ASAP so nimbus doesn't reassign
@@ -172,7 +192,7 @@
172192
_ (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
173193
(.addTaskHook user-context (-> klass Class/forName .newInstance)))
174194

175-
stream->component->grouper (outbound-components topology-context)
195+
stream->component->grouper (outbound-components topology-context user-context)
176196
component->tasks (reverse-map task-info)
177197
;; important it binds to virtual port before function returns
178198
puller (msg/bind mq-context storm-id task-id)
@@ -207,9 +227,7 @@
207227
(when (= :direct grouper)
208228
;; TODO: this is wrong, need to check how the stream was declared
209229
(throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
210-
(let [tasks (component->tasks out-component)
211-
indices (collectify (grouper values))]
212-
(for [i indices] (tasks i))))
230+
(collectify (grouper values)))
213231
(stream->component->grouper stream))]
214232
(apply-hooks user-context .emit (EmitInfo. values stream out-tasks))
215233
(when (emit-sampler)

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@
9595
(supervisor-stormdist-root conf storm-id))
9696
(worker-pids-root conf worker-id)
9797
%
98-
port)
98+
port
99+
task-ids)
99100
mk-user-context #(TopologyContext. topology
100101
storm-conf
101102
task->component
@@ -104,7 +105,8 @@
104105
(supervisor-stormdist-root conf storm-id))
105106
(worker-pids-root conf worker-id)
106107
%
107-
port)
108+
port
109+
task-ids)
108110
mq-context (if mq-context
109111
mq-context
110112
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)

src/clj/backtype/storm/testing.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,6 @@
542542
spout-spec (mk-spout-spec* (TestWordSpout.)
543543
{stream fields})
544544
topology (StormTopology. {component spout-spec} {} {})
545-
context (TopologyContext. topology (read-storm-config) {1 component} "test-storm-id" nil nil 1 nil)]
545+
context (TopologyContext. topology (read-storm-config) {1 component} "test-storm-id" nil nil 1 nil [1])]
546546
(Tuple. context values 1 stream)
547547
))

src/clj/backtype/storm/thrift.clj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
2929
Grouping$_Fields/CUSTOM_OBJECT :custom-object
3030
Grouping$_Fields/DIRECT :direct
31+
Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle
3132
})
3233

3334
(defn grouping-type [^Grouping grouping]
@@ -100,6 +101,9 @@
100101
(defn mk-shuffle-grouping []
101102
(Grouping/shuffle (NullStruct.)))
102103

104+
(defn mk-local-or-shuffle-grouping []
105+
(Grouping/local_or_shuffle (NullStruct.)))
106+
103107
(defn mk-fields-grouping [fields]
104108
(Grouping/fields fields))
105109

@@ -131,6 +135,7 @@
131135
(instance? JavaObject grouping-spec) (Grouping/custom_object grouping-spec)
132136
(sequential? grouping-spec) (mk-fields-grouping grouping-spec)
133137
(= grouping-spec :shuffle) (mk-shuffle-grouping)
138+
(= grouping-spec :local-or-shuffle) (mk-local-or-shuffle-grouping)
134139
(= grouping-spec :none) (mk-none-grouping)
135140
(= grouping-spec :all) (mk-all-grouping)
136141
(= grouping-spec :global) (mk-global-grouping)

src/jvm/backtype/storm/generated/Grouping.java

Lines changed: 53 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package backtype.storm.grouping;
22

3+
import backtype.storm.task.TopologyContext;
34
import backtype.storm.tuple.Fields;
45
import java.io.Serializable;
56
import java.util.List;
7+
import java.util.Map;
68

79
public interface CustomStreamGrouping extends Serializable {
810

@@ -12,16 +14,14 @@ public interface CustomStreamGrouping extends Serializable {
1214
*
1315
* It also tells the grouping the metadata on the stream this grouping will be used on.
1416
*/
15-
void prepare(Fields outFields, int numTasks);
17+
void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks);
1618

1719
/**
1820
* This function implements a custom stream grouping. It takes in as input
1921
* the number of tasks in the target bolt in prepare and returns the
20-
* indices of the tasks to send the tuple to. Each index must be in the range
21-
* [0, numTargetTasks-1]
22+
* tasks to send the tuples to.
2223
*
23-
* @param tuple the values to group on
24-
* @param numTargetTasks the number of tasks in the target bolt
24+
* @param values the values to group on
2525
*/
26-
List<Integer> taskIndices(List<Object> values);
26+
List<Integer> chooseTasks(List<Object> values);
2727
}

src/jvm/backtype/storm/hooks/BaseTaskHook.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ public void boltAck(BoltAckInfo info) {
3636
@Override
3737
public void boltFail(BoltFailInfo info) {
3838
}
39+
40+
@Override
41+
public void error(Throwable error) {
42+
}
3943
}

src/jvm/backtype/storm/hooks/ITaskHook.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ public interface ITaskHook {
1616
void spoutFail(SpoutFailInfo info);
1717
void boltAck(BoltAckInfo info);
1818
void boltFail(BoltFailInfo info);
19+
void error(Throwable error);
1920
}

0 commit comments

Comments
 (0)