Skip to content

Commit 58ce226

Browse files
author
Nathan Marz
committed
Merge remote-tracking branch 'ooyala/0.8.0-better-hooks'
2 parents d47da10 + 3368d34 commit 58ce226

File tree

7 files changed

+27
-14
lines changed

7 files changed

+27
-14
lines changed

src/clj/backtype/storm/daemon/executor.clj

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,24 @@
275275
)))
276276

277277
(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta]
278-
(let [^ISpout spout (:object task-data)]
278+
(let [^ISpout spout (:object task-data)
279+
task-id (:task-id task-data)]
279280
;;TODO: need to throttle these when there's lots of failures
280281
(log-message "Failing message " msg-id ": " tuple-info)
281282
(.fail spout msg-id)
282-
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id time-delta))
283+
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
283284
(when time-delta
284285
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
285286
)))
286287

287288
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
288289
(let [storm-conf (:storm-conf executor-data)
289-
^ISpout spout (:object task-data)]
290+
^ISpout spout (:object task-data)
291+
task-id (:task-id task-data)]
290292
(when (= true (storm-conf TOPOLOGY-DEBUG))
291293
(log-message "Acking message " msg-id))
292294
(.ack spout msg-id)
293-
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id time-delta))
295+
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
294296
(when time-delta
295297
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
296298
)))
@@ -525,7 +527,7 @@
525527
[root (bit-xor id ack-val)])
526528
))
527529
(let [delta (tuple-time-delta! tuple)]
528-
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
530+
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
529531
(when delta
530532
(stats/bolt-acked-tuple! executor-stats
531533
(.getSourceComponent tuple)
@@ -538,7 +540,7 @@
538540
ACKER-FAIL-STREAM-ID
539541
[root]))
540542
(let [delta (tuple-time-delta! tuple)]
541-
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
543+
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
542544
(when delta
543545
(stats/bolt-failed-tuple! executor-stats
544546
(.getSourceComponent tuple)

src/clj/backtype/storm/daemon/task.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@
103103
stream->component->grouper (:stream->component->grouper executor-data)
104104
user-context (:user-context task-data)
105105
executor-stats (:stats executor-data)
106-
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
106+
debug? (= true (storm-conf TOPOLOGY-DEBUG))
107+
task-id (:task-id task-data)]
107108
(fn ([^Integer out-task-id ^String stream ^List values]
108109
(when debug?
109110
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
@@ -113,7 +114,7 @@
113114
out-task-id (if grouping out-task-id)]
114115
(when (and (not-nil? grouping) (not= :direct grouping))
115116
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
116-
(apply-hooks user-context .emit (EmitInfo. values stream [out-task-id]))
117+
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
117118
(when (emit-sampler)
118119
(stats/emitted-tuple! executor-stats stream)
119120
(if out-task-id
@@ -133,7 +134,7 @@
133134
(.addAll out-tasks comp-tasks)
134135
(.add out-tasks comp-tasks)
135136
)))
136-
(apply-hooks user-context .emit (EmitInfo. values stream out-tasks))
137+
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
137138
(when (emit-sampler)
138139
(stats/emitted-tuple! executor-stats stream)
139140
(stats/transferred-tuples! executor-stats stream (count out-tasks)))

src/jvm/backtype/storm/hooks/info/BoltAckInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
public class BoltAckInfo {
66
public Tuple tuple;
7+
public int ackingTaskId;
78
public Long processLatencyMs; // null if it wasn't sampled
89

9-
public BoltAckInfo(Tuple tuple, Long processLatencyMs) {
10+
public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
1011
this.tuple = tuple;
12+
this.ackingTaskId = ackingTaskId;
1113
this.processLatencyMs = processLatencyMs;
1214
}
1315
}

src/jvm/backtype/storm/hooks/info/BoltFailInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
public class BoltFailInfo {
66
public Tuple tuple;
7+
public int failingTaskId;
78
public Long failLatencyMs; // null if it wasn't sampled
89

9-
public BoltFailInfo(Tuple tuple, Long failLatencyMs) {
10+
public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
1011
this.tuple = tuple;
12+
this.failingTaskId = failingTaskId;
1113
this.failLatencyMs = failLatencyMs;
1214
}
1315
}

src/jvm/backtype/storm/hooks/info/EmitInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
public class EmitInfo {
77
public List<Object> values;
88
public String stream;
9+
public int taskId;
910
public Collection<Integer> outTasks;
1011

11-
public EmitInfo(List<Object> values, String stream, Collection<Integer> outTasks) {
12+
public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
1213
this.values = values;
1314
this.stream = stream;
15+
this.taskId = taskId;
1416
this.outTasks = outTasks;
1517
}
1618
}

src/jvm/backtype/storm/hooks/info/SpoutAckInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
public class SpoutAckInfo {
44
public Object messageId;
5+
public int spoutTaskId;
56
public Long completeLatencyMs; // null if it wasn't sampled
67

7-
public SpoutAckInfo(Object messageId, Long completeLatencyMs) {
8+
public SpoutAckInfo(Object messageId, int spoutTaskId, Long completeLatencyMs) {
89
this.messageId = messageId;
10+
this.spoutTaskId = spoutTaskId;
911
this.completeLatencyMs = completeLatencyMs;
1012
}
1113
}

src/jvm/backtype/storm/hooks/info/SpoutFailInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
public class SpoutFailInfo {
44
public Object messageId;
5+
public int spoutTaskId;
56
public Long failLatencyMs; // null if it wasn't sampled
67

7-
public SpoutFailInfo(Object messageId, Long failLatencyMs) {
8+
public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
89
this.messageId = messageId;
10+
this.spoutTaskId = spoutTaskId;
911
this.failLatencyMs = failLatencyMs;
1012
}
1113
}

0 commit comments

Comments
 (0)