|
275 | 275 | )))
|
276 | 276 |
|
277 | 277 | (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta]
|
278 |
| - (let [^ISpout spout (:object task-data)] |
| 278 | + (let [^ISpout spout (:object task-data) |
| 279 | + task-id (:task-id task-data)] |
279 | 280 | ;;TODO: need to throttle these when there's lots of failures
|
280 | 281 | (log-message "Failing message " msg-id ": " tuple-info)
|
281 | 282 | (.fail spout msg-id)
|
282 |
| - (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id time-delta)) |
| 283 | + (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) |
283 | 284 | (when time-delta
|
284 | 285 | (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
|
285 | 286 | )))
|
286 | 287 |
|
287 | 288 | (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
|
288 | 289 | (let [storm-conf (:storm-conf executor-data)
|
289 |
| - ^ISpout spout (:object task-data)] |
| 290 | + ^ISpout spout (:object task-data) |
| 291 | + task-id (:task-id task-data)] |
290 | 292 | (when (= true (storm-conf TOPOLOGY-DEBUG))
|
291 | 293 | (log-message "Acking message " msg-id))
|
292 | 294 | (.ack spout msg-id)
|
293 |
| - (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id time-delta)) |
| 295 | + (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) |
294 | 296 | (when time-delta
|
295 | 297 | (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
|
296 | 298 | )))
|
|
525 | 527 | [root (bit-xor id ack-val)])
|
526 | 528 | ))
|
527 | 529 | (let [delta (tuple-time-delta! tuple)]
|
528 |
| - (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta)) |
| 530 | + (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) |
529 | 531 | (when delta
|
530 | 532 | (stats/bolt-acked-tuple! executor-stats
|
531 | 533 | (.getSourceComponent tuple)
|
|
538 | 540 | ACKER-FAIL-STREAM-ID
|
539 | 541 | [root]))
|
540 | 542 | (let [delta (tuple-time-delta! tuple)]
|
541 |
| - (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta)) |
| 543 | + (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) |
542 | 544 | (when delta
|
543 | 545 | (stats/bolt-failed-tuple! executor-stats
|
544 | 546 | (.getSourceComponent tuple)
|
|
0 commit comments