Skip to content

Commit ff2e570

Browse files
author
Nathan Marz
committed
fix race condition in coordinatedbolt
1 parent 2e1b640 commit ff2e570

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

src/jvm/backtype/storm/coordination/CoordinatedBolt.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public static class TrackingInfo {
136136
int failedTuples = 0;
137137
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
138138
boolean receivedId = false;
139+
boolean finished = false;
139140

140141
@Override
141142
public String toString() {
@@ -232,6 +233,7 @@ private void checkFinishId(Tuple tup) {
232233
int numTuples = get(track.taskEmittedTuples, task, 0);
233234
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
234235
}
236+
track.finished = true;
235237
_tracked.remove(id);
236238
}
237239
}
@@ -308,7 +310,11 @@ private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, Track
308310
@Override
309311
public void expire(Object id, TrackingInfo val) {
310312
synchronized(_tracked) {
311-
((TimeoutCallback) _delegate).timeoutId(id);
313+
// make sure we don't time out something that has been finished. the combination of
314+
// the flag and the lock ensure this
315+
if(!val.finished) {
316+
((TimeoutCallback) _delegate).timeoutId(id);
317+
}
312318
}
313319
}
314320
}

src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public void execute(Tuple input) {
3838
LOG.warn("Failed to emit batch for transaction", e);
3939
_collector.fail(input);
4040
}
41+
// this is valid here because the batch has been successfully emitted,
42+
// so we can safely delete metadata for prior transactions
4143
_emitter.cleanupBefore((BigInteger) input.getValue(2));
4244
}
4345

0 commit comments

Comments
 (0)