|
241 | 241 | (log-message "Failing message " msg-id ": " tuple-info)
|
242 | 242 | (.fail spout msg-id)
|
243 | 243 | (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id time-delta))
|
244 |
| - (when ((:sampler executor-data)) |
| 244 | + (when time-delta |
245 | 245 | (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
|
246 | 246 | )))
|
247 | 247 |
|
|
252 | 252 | (log-message "Acking message " msg-id))
|
253 | 253 | (.ack spout msg-id)
|
254 | 254 | (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id time-delta))
|
255 |
| - (when ((:sampler executor-data)) |
| 255 | + (when time-delta |
256 | 256 | (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
|
257 | 257 | )))
|
258 | 258 |
|
|
280 | 280 | transfer-fn (:transfer-fn executor-data)
|
281 | 281 | report-error-fn (:report-error-fn executor-data)
|
282 | 282 | spouts (map :object (vals task-datas))
|
| 283 | + sampler (:sampler executor-data) |
283 | 284 |
|
284 | 285 | pending (TimeCacheMap.
|
285 | 286 | (int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))
|
286 | 287 | 2 ;; microoptimize for performance of .size method
|
287 | 288 | (reify TimeCacheMap$ExpiredCallback
|
288 | 289 | (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
|
289 |
| - (let [time-delta (time-delta-ms start-time-ms)] |
| 290 | + (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] |
290 | 291 | (.add event-queue #(fail-spout-msg executor-data (task-datas task-id) spout-id tuple-info time-delta)))
|
291 | 292 | )))
|
292 | 293 | tuple-action-fn (fn [task-id ^Tuple tuple]
|
|
295 | 296 | (when spout-id
|
296 | 297 | (when-not (= stored-task-id task-id)
|
297 | 298 | (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
|
298 |
| - (let [time-delta (time-delta-ms start-time-ms)] |
| 299 | + (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] |
299 | 300 | (condp = (.getSourceStreamId tuple)
|
300 | 301 | ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg executor-data (task-datas task-id)
|
301 | 302 | spout-id tuple-finished-info time-delta))
|
|
331 | 332 | (.put pending root-id [task-id
|
332 | 333 | message-id
|
333 | 334 | {:stream out-stream-id :values values}
|
334 |
| - (System/currentTimeMillis)]) |
| 335 | + (if (sampler) (System/currentTimeMillis))]) |
335 | 336 | (task/send-unanchored task-data
|
336 | 337 | ACKER-INIT-STREAM-ID
|
337 | 338 | [root-id (bit-xor-vals out-ids) task-id]))
|
|
386 | 387 | ))
|
387 | 388 |
|
388 | 389 | (defn- tuple-time-delta! [^Map start-times ^Tuple tuple]
|
389 |
| - (time-delta-ms (.remove start-times tuple))) |
| 390 | + (let [ms (.remove start-times tuple)] |
| 391 | + (if ms |
| 392 | + (time-delta-ms ms)))) |
390 | 393 |
|
391 | 394 | (defn put-xor! [^Map pending key id]
|
392 | 395 | (let [curr (or (.get pending key) (long 0))]
|
|
421 | 424 | ;;(log-debug "Received tuple " tuple " at task " task-id)
|
422 | 425 | ;; need to do it this way to avoid reflection
|
423 | 426 | (let [^IBolt bolt-obj (-> task-id task-datas :object)]
|
424 |
| -;; (.put tuple-start-times tuple (System/currentTimeMillis)) |
| 427 | + (when (sampler) |
| 428 | + (.put tuple-start-times tuple (System/currentTimeMillis))) |
425 | 429 | (.execute bolt-obj tuple)))]
|
426 | 430 | (log-message "Preparing bolt " component-id ":" (keys task-datas))
|
427 | 431 | (doseq [[task-id task-data] task-datas
|
|
465 | 469 | ACKER-ACK-STREAM-ID
|
466 | 470 | [root (bit-xor id ack-val)])
|
467 | 471 | ))
|
468 |
| - (let [delta 0 ;; (tuple-time-delta! tuple-start-times tuple) |
469 |
| - ] |
| 472 | + (let [delta (tuple-time-delta! tuple-start-times tuple)] |
470 | 473 | (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
|
471 |
| - (when (sampler) |
| 474 | + (when delta |
472 | 475 | (stats/bolt-acked-tuple! executor-stats
|
473 | 476 | (.getSourceComponent tuple)
|
474 | 477 | (.getSourceStreamId tuple)
|
|
480 | 483 | (task/send-unanchored task-data
|
481 | 484 | ACKER-FAIL-STREAM-ID
|
482 | 485 | [root]))
|
483 |
| - (let [delta 0 ;;(tuple-time-delta! tuple-start-times tuple) |
| 486 | + (let [delta (tuple-time-delta! tuple-start-times tuple) |
484 | 487 | ]
|
485 | 488 | (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
|
486 |
| - (when (sampler) |
| 489 | + (when delta |
487 | 490 | (stats/bolt-failed-tuple! executor-stats
|
488 | 491 | (.getSourceComponent tuple)
|
489 | 492 | (.getSourceStreamId tuple)
|
|
0 commit comments