Skip to content

Commit 925d7dc

Browse files
author
Nathan Marz
committed
keep execution metadata (tuple start time and outgoing ack val) in the tuple itself for increased performance
1 parent af4432b commit 925d7dc

File tree

7 files changed

+70
-37
lines changed

7 files changed

+70
-37
lines changed

src/clj/backtype/storm/bootstrap.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
MutableObject]))
1212
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
1313
(import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
14-
(import (quote [backtype.storm.tuple Tuple Fields MessageId]))
14+
(import (quote [backtype.storm.tuple Tuple TupleImpl Fields MessageId]))
1515
(import (quote [backtype.storm.task IBolt IOutputCollector
1616
OutputCollector TopologyContext ShellBolt
1717
GeneralTopologyContext WorkerTopologyContext]))

src/clj/backtype/storm/daemon/executor.clj

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@
261261
(fn [tuple-batch sequence-id end-of-batch?]
262262
;;(log-debug "Processing message " msg)
263263
(doseq [[task-id msg] tuple-batch]
264-
(let [^Tuple tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
264+
(let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
265265
(tuple-action-fn task-id tuple)
266266
)))))
267267

@@ -290,7 +290,7 @@
290290
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
291291
(.add event-queue #(fail-spout-msg executor-data (task-datas task-id) spout-id tuple-info time-delta)))
292292
)))
293-
tuple-action-fn (fn [task-id ^Tuple tuple]
293+
tuple-action-fn (fn [task-id ^TupleImpl tuple]
294294
(let [id (.getValue tuple 0)
295295
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
296296
(when spout-id
@@ -320,11 +320,11 @@
320320
(let [tuple-id (if rooted?
321321
(MessageId/makeRootId root-id id)
322322
(MessageId/makeUnanchored))]
323-
(Tuple. worker-context
324-
values
325-
task-id
326-
out-stream-id
327-
tuple-id)))]
323+
(TupleImpl. worker-context
324+
values
325+
task-id
326+
out-stream-id
327+
tuple-id)))]
328328
(dorun
329329
(map transfer-fn out-tasks out-tuples))
330330
(if rooted?
@@ -386,27 +386,24 @@
386386
]
387387
))
388388

389-
(defn- tuple-time-delta! [^Map start-times ^Tuple tuple]
390-
(let [ms (.remove start-times tuple)]
389+
(defn- tuple-time-delta! [^TupleImpl tuple]
390+
(let [ms (.getSampleStartTime tuple)]
391391
(if ms
392392
(time-delta-ms ms))))
393393

394394
(defn put-xor! [^Map pending key id]
395395
(let [curr (or (.get pending key) (long 0))]
396-
;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
397396
(.put pending key (bit-xor curr id))))
398397

399398
(defmethod mk-threads :bolt [executor-data task-datas]
400399
(let [component-id (:component-id executor-data)
401-
tuple-start-times (HashMap.)
402400
transfer-fn (:transfer-fn executor-data)
403401
worker-context (:worker-context executor-data)
404402
storm-conf (:storm-conf executor-data)
405403
executor-stats (:stats executor-data)
406-
pending-acks (HashMap.)
407404
report-error-fn (:report-error-fn executor-data)
408405
sampler (:sampler executor-data)
409-
tuple-action-fn (fn [task-id ^Tuple tuple]
406+
tuple-action-fn (fn [task-id ^TupleImpl tuple]
410407
;; synchronization needs to be done with a key provided by this bolt, otherwise:
411408
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
412409
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
@@ -425,7 +422,7 @@
425422
;; need to do it this way to avoid reflection
426423
(let [^IBolt bolt-obj (-> task-id task-datas :object)]
427424
(when (sampler)
428-
(.put tuple-start-times tuple (System/currentTimeMillis)))
425+
(.setSampleStartTime tuple (System/currentTimeMillis)))
429426
(.execute bolt-obj tuple)))]
430427
(log-message "Preparing bolt " component-id ":" (keys task-datas))
431428
(doseq [[task-id task-data] task-datas
@@ -438,20 +435,20 @@
438435
(tasks-fn stream values))]
439436
(doseq [t out-tasks
440437
:let [anchors-to-ids (HashMap.)]]
441-
(doseq [^Tuple a anchors
438+
(doseq [^TupleImpl a anchors
442439
:let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]]
443440
(when (pos? (count root-ids))
444441
(let [edge-id (MessageId/generateId)]
445-
(put-xor! pending-acks a edge-id)
442+
(.updateAckVal a edge-id)
446443
(doseq [root-id root-ids]
447444
(put-xor! anchors-to-ids root-id edge-id))
448445
)))
449446
(transfer-fn t
450-
(Tuple. worker-context
451-
values
452-
task-id
453-
stream
454-
(MessageId/makeId anchors-to-ids))))
447+
(TupleImpl. worker-context
448+
values
449+
task-id
450+
stream
451+
(MessageId/makeId anchors-to-ids))))
455452
(or out-tasks [])))]]
456453
(.prepare bolt-obj
457454
storm-conf
@@ -463,13 +460,14 @@
463460
(emitDirect [this task stream anchors values]
464461
(bolt-emit stream anchors values task))
465462
(^void ack [this ^Tuple tuple]
466-
(let [ack-val (or (.remove pending-acks tuple) (long 0))]
463+
(let [^TupleImpl tuple tuple
464+
ack-val (.getAckVal tuple)]
467465
(doseq [[root id] (.. tuple getMessageId getAnchorsToIds)]
468466
(task/send-unanchored task-data
469467
ACKER-ACK-STREAM-ID
470468
[root (bit-xor id ack-val)])
471469
))
472-
(let [delta (tuple-time-delta! tuple-start-times tuple)]
470+
(let [delta (tuple-time-delta! tuple)]
473471
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
474472
(when delta
475473
(stats/bolt-acked-tuple! executor-stats
@@ -478,12 +476,11 @@
478476
delta)
479477
)))
480478
(^void fail [this ^Tuple tuple]
481-
(.remove pending-acks tuple)
482479
(doseq [root (.. tuple getMessageId getAnchors)]
483480
(task/send-unanchored task-data
484481
ACKER-FAIL-STREAM-ID
485482
[root]))
486-
(let [delta (tuple-time-delta! tuple-start-times tuple)
483+
(let [delta (tuple-time-delta! tuple)
487484
]
488485
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
489486
(when delta

src/clj/backtype/storm/daemon/task.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@
7878
transfer-fn (-> task-data :executor-data :transfer-fn)]
7979
(doseq [t (tasks-fn stream values)]
8080
(transfer-fn t
81-
(Tuple. topology-context
82-
values
83-
(.getThisTaskId topology-context)
84-
stream)))))
81+
(TupleImpl. topology-context
82+
values
83+
(.getThisTaskId topology-context)
84+
stream)))))
8585

8686
(defn mk-tasks-fn [task-data]
8787
(let [executor-data (:executor-data task-data)

src/clj/backtype/storm/testing.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
(:import [java.util.concurrent.atomic AtomicInteger])
1212
(:import [java.util.concurrent ConcurrentHashMap])
1313
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
14-
(:import [backtype.storm.tuple Fields Tuple])
14+
(:import [backtype.storm.tuple Fields Tuple TupleImpl])
1515
(:import [backtype.storm.task TopologyContext])
1616
(:import [backtype.storm.generated GlobalStreamId Bolt])
1717
(:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
@@ -548,5 +548,5 @@
548548
{stream fields})
549549
topology (StormTopology. {component spout-spec} {} {})
550550
context (TopologyContext. topology (read-storm-config) {(int 1) component} {component [(int 1)]} "test-storm-id" nil nil (int 1) nil [(int 1)])]
551-
(Tuple. context values 1 stream)
551+
(TupleImpl. context values 1 stream)
552552
))

src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import backtype.storm.task.GeneralTopologyContext;
44
import backtype.storm.tuple.MessageId;
55
import backtype.storm.tuple.Tuple;
6+
import backtype.storm.tuple.TupleImpl;
67
import backtype.storm.utils.WritableUtils;
78
import java.io.ByteArrayInputStream;
89
import java.io.DataInputStream;
@@ -31,7 +32,7 @@ public Tuple deserialize(byte[] ser) {
3132
String streamName = _ids.getStreamName(componentName, streamId);
3233
MessageId id = MessageId.deserialize(in);
3334
List<Object> values = _kryo.deserializeFrom(bin);
34-
return new Tuple(_context, values, taskId, streamName, id);
35+
return new TupleImpl(_context, values, taskId, streamName, id);
3536
} catch(IOException e) {
3637
throw new RuntimeException(e);
3738
}

src/jvm/backtype/storm/tuple/Tuple.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Tuple(GeneralTopologyContext context, List<Object> values, int taskId, St
5656
}
5757
}
5858

59-
public Tuple(TopologyContext context, List<Object> values, int taskId, String streamId) {
59+
public Tuple(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
6060
this(context, values, taskId, streamId, MessageId.makeUnanchored());
6161
}
6262

@@ -257,13 +257,11 @@ public String toString() {
257257

258258
@Override
259259
public boolean equals(Object other) {
260-
// for OutputCollector
261260
return this == other;
262-
}
261+
}
263262

264263
@Override
265264
public int hashCode() {
266-
// for OutputCollector
267265
return System.identityHashCode(this);
268266
}
269267

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package backtype.storm.tuple;
2+
3+
import backtype.storm.task.GeneralTopologyContext;
4+
import backtype.storm.task.TopologyContext;
5+
import java.util.List;
6+
7+
public class TupleImpl extends Tuple {
8+
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
9+
super(context, values, taskId, streamId, id);
10+
}
11+
12+
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
13+
super(context, values, taskId, streamId);
14+
}
15+
16+
Long _sampleStartTime = null;
17+
18+
public void setSampleStartTime(long ms) {
19+
_sampleStartTime = ms;
20+
}
21+
22+
public Long getSampleStartTime() {
23+
return _sampleStartTime;
24+
}
25+
26+
long _outAckVal = 0;
27+
28+
public void updateAckVal(long val) {
29+
_outAckVal = _outAckVal ^ val;
30+
}
31+
32+
public long getAckVal() {
33+
return _outAckVal;
34+
}
35+
36+
37+
}

0 commit comments

Comments
 (0)