Skip to content

Commit 32295dd

Browse files
author
Nathan Marz
committed
refactor and optimize interaction between sampling and hooks
1 parent 9692690 commit 32295dd

File tree

5 files changed

+23
-20
lines changed

5 files changed

+23
-20
lines changed

src/clj/backtype/storm/daemon/executor.clj

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@
241241
(log-message "Failing message " msg-id ": " tuple-info)
242242
(.fail spout msg-id)
243243
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id time-delta))
244-
(when ((:sampler executor-data))
244+
(when time-delta
245245
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
246246
)))
247247

@@ -252,7 +252,7 @@
252252
(log-message "Acking message " msg-id))
253253
(.ack spout msg-id)
254254
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id time-delta))
255-
(when ((:sampler executor-data))
255+
(when time-delta
256256
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
257257
)))
258258

@@ -280,13 +280,14 @@
280280
transfer-fn (:transfer-fn executor-data)
281281
report-error-fn (:report-error-fn executor-data)
282282
spouts (map :object (vals task-datas))
283+
sampler (:sampler executor-data)
283284

284285
pending (TimeCacheMap.
285286
(int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))
286287
2 ;; microoptimize for performance of .size method
287288
(reify TimeCacheMap$ExpiredCallback
288289
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
289-
(let [time-delta (time-delta-ms start-time-ms)]
290+
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
290291
(.add event-queue #(fail-spout-msg executor-data (task-datas task-id) spout-id tuple-info time-delta)))
291292
)))
292293
tuple-action-fn (fn [task-id ^Tuple tuple]
@@ -295,7 +296,7 @@
295296
(when spout-id
296297
(when-not (= stored-task-id task-id)
297298
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
298-
(let [time-delta (time-delta-ms start-time-ms)]
299+
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
299300
(condp = (.getSourceStreamId tuple)
300301
ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg executor-data (task-datas task-id)
301302
spout-id tuple-finished-info time-delta))
@@ -331,7 +332,7 @@
331332
(.put pending root-id [task-id
332333
message-id
333334
{:stream out-stream-id :values values}
334-
(System/currentTimeMillis)])
335+
(if (sampler) (System/currentTimeMillis))])
335336
(task/send-unanchored task-data
336337
ACKER-INIT-STREAM-ID
337338
[root-id (bit-xor-vals out-ids) task-id]))
@@ -386,7 +387,9 @@
386387
))
387388

388389
(defn- tuple-time-delta! [^Map start-times ^Tuple tuple]
389-
(time-delta-ms (.remove start-times tuple)))
390+
(let [ms (.remove start-times tuple)]
391+
(if ms
392+
(time-delta-ms ms))))
390393

391394
(defn put-xor! [^Map pending key id]
392395
(let [curr (or (.get pending key) (long 0))]
@@ -421,7 +424,8 @@
421424
;;(log-debug "Received tuple " tuple " at task " task-id)
422425
;; need to do it this way to avoid reflection
423426
(let [^IBolt bolt-obj (-> task-id task-datas :object)]
424-
;; (.put tuple-start-times tuple (System/currentTimeMillis))
427+
(when (sampler)
428+
(.put tuple-start-times tuple (System/currentTimeMillis)))
425429
(.execute bolt-obj tuple)))]
426430
(log-message "Preparing bolt " component-id ":" (keys task-datas))
427431
(doseq [[task-id task-data] task-datas
@@ -465,10 +469,9 @@
465469
ACKER-ACK-STREAM-ID
466470
[root (bit-xor id ack-val)])
467471
))
468-
(let [delta 0 ;; (tuple-time-delta! tuple-start-times tuple)
469-
]
472+
(let [delta (tuple-time-delta! tuple-start-times tuple)]
470473
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
471-
(when (sampler)
474+
(when delta
472475
(stats/bolt-acked-tuple! executor-stats
473476
(.getSourceComponent tuple)
474477
(.getSourceStreamId tuple)
@@ -480,10 +483,10 @@
480483
(task/send-unanchored task-data
481484
ACKER-FAIL-STREAM-ID
482485
[root]))
483-
(let [delta 0 ;;(tuple-time-delta! tuple-start-times tuple)
486+
(let [delta (tuple-time-delta! tuple-start-times tuple)
484487
]
485488
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
486-
(when (sampler)
489+
(when delta
487490
(stats/bolt-failed-tuple! executor-stats
488491
(.getSourceComponent tuple)
489492
(.getSourceStreamId tuple)

src/jvm/backtype/storm/hooks/info/BoltAckInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
public class BoltAckInfo {
66
public Tuple tuple;
7-
public long processLatencyMs;
7+
public Long processLatencyMs; // null if it wasn't sampled
88

9-
public BoltAckInfo(Tuple tuple, long processLatencyMs) {
9+
public BoltAckInfo(Tuple tuple, Long processLatencyMs) {
1010
this.tuple = tuple;
1111
this.processLatencyMs = processLatencyMs;
1212
}

src/jvm/backtype/storm/hooks/info/BoltFailInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
public class BoltFailInfo {
66
public Tuple tuple;
7-
public long failLatencyMs;
7+
public Long failLatencyMs; // null if it wasn't sampled
88

9-
public BoltFailInfo(Tuple tuple, long failLatencyMs) {
9+
public BoltFailInfo(Tuple tuple, Long failLatencyMs) {
1010
this.tuple = tuple;
1111
this.failLatencyMs = failLatencyMs;
1212
}

src/jvm/backtype/storm/hooks/info/SpoutAckInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
public class SpoutAckInfo {
44
public Object messageId;
5-
public long completeLatencyMs;
5+
public Long completeLatencyMs; // null if it wasn't sampled
66

7-
public SpoutAckInfo(Object messageId, long completeLatencyMs) {
7+
public SpoutAckInfo(Object messageId, Long completeLatencyMs) {
88
this.messageId = messageId;
99
this.completeLatencyMs = completeLatencyMs;
1010
}

src/jvm/backtype/storm/hooks/info/SpoutFailInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
public class SpoutFailInfo {
44
public Object messageId;
5-
public long failLatencyMs;
5+
public Long failLatencyMs; // null if it wasn't sampled
66

7-
public SpoutFailInfo(Object messageId, long failLatencyMs) {
7+
public SpoutFailInfo(Object messageId, Long failLatencyMs) {
88
this.messageId = messageId;
99
this.failLatencyMs = failLatencyMs;
1010
}

0 commit comments

Comments
 (0)