Skip to content

Commit 68672ce

Browse files
author
Nathan Marz
committed
fix bug where tuple tree could ack even if tuples weren't delivered (because coordination tuples always ack). now it always keeps a coordination tuple until the condition is met before it acks them
1 parent 2508f49 commit 68672ce

File tree

1 file changed

+80
-51
lines changed

1 file changed

+80
-51
lines changed

src/jvm/backtype/storm/coordination/CoordinatedBolt.java

Lines changed: 80 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.Map.Entry;
55
import backtype.storm.tuple.Values;
66
import backtype.storm.generated.GlobalStreamId;
7-
import backtype.storm.Config;
87
import java.util.Collection;
98
import backtype.storm.Constants;
109
import backtype.storm.generated.Grouping;
@@ -89,9 +88,9 @@ public void ack(Tuple tuple) {
8988
if (track != null)
9089
track.receivedTuples++;
9190
}
92-
boolean failed = checkFinishId(tuple);
91+
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
9392
if(failed) {
94-
_delegate.fail(tuple);
93+
_delegate.fail(tuple);
9594
} else {
9695
_delegate.ack(tuple);
9796
}
@@ -104,6 +103,7 @@ public void fail(Tuple tuple) {
104103
if (track != null)
105104
track.failed = true;
106105
}
106+
checkFinishId(tuple, TupleType.REGULAR);
107107
_delegate.fail(tuple);
108108
}
109109

@@ -142,6 +142,7 @@ public static class TrackingInfo {
142142
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
143143
boolean receivedId = false;
144144
boolean finished = false;
145+
List<Tuple> ackTuples = new ArrayList<Tuple>();
145146

146147
@Override
147148
public String toString() {
@@ -213,51 +214,73 @@ public void prepare(Map config, TopologyContext context, OutputCollector collect
213214
}
214215
}
215216

216-
private boolean checkFinishId(Tuple tup) {
217-
boolean ret = false;
217+
private boolean checkFinishId(Tuple tup, TupleType type) {
218218
Object id = tup.getValue(0);
219+
boolean failed = false;
220+
219221
synchronized(_tracked) {
220222
TrackingInfo track = _tracked.get(id);
221223
try {
222-
// if it timed out, then obviously it failed (hence the null check)
223-
if(track==null || track.failed) ret = true;
224-
if(track!=null
225-
&& !track.failed
226-
&& track.receivedId
227-
&& (_sourceArgs.isEmpty()
228-
||
229-
track.reportCount==_numSourceReports &&
230-
track.expectedTupleCount == track.receivedTuples)) {
231-
if(_delegate instanceof FinishedCallback) {
232-
((FinishedCallback)_delegate).finishedId(id);
224+
if(track!=null) {
225+
boolean delayed = false;
226+
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
227+
track.ackTuples.add(tup);
228+
delayed = true;
233229
}
234-
if(!(_sourceArgs.isEmpty() ||
235-
tup.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID) ||
236-
(_idStreamSpec!=null && tup.getSourceGlobalStreamid().equals(_idStreamSpec._id))
237-
)) {
238-
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
230+
if(track.failed) {
231+
failed = true;
232+
for(Tuple t: track.ackTuples) {
233+
_collector.fail(t);
234+
}
235+
_tracked.remove(id);
236+
} else if(track.receivedId
237+
&& (_sourceArgs.isEmpty() ||
238+
track.reportCount==_numSourceReports &&
239+
track.expectedTupleCount == track.receivedTuples)){
240+
if(_delegate instanceof FinishedCallback) {
241+
((FinishedCallback)_delegate).finishedId(id);
242+
}
243+
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
244+
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
245+
}
246+
Iterator<Integer> outTasks = _countOutTasks.iterator();
247+
while(outTasks.hasNext()) {
248+
int task = outTasks.next();
249+
int numTuples = get(track.taskEmittedTuples, task, 0);
250+
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
251+
}
252+
for(Tuple t: track.ackTuples) {
253+
_collector.ack(t);
254+
}
255+
track.finished = true;
256+
_tracked.remove(id);
239257
}
240-
Iterator<Integer> outTasks = _countOutTasks.iterator();
241-
while(outTasks.hasNext()) {
242-
int task = outTasks.next();
243-
int numTuples = get(track.taskEmittedTuples, task, 0);
244-
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
258+
if(!delayed && type!=TupleType.REGULAR) {
259+
if(track.failed) {
260+
_collector.fail(tup);
261+
} else {
262+
_collector.ack(tup);
263+
}
245264
}
246-
track.finished = true;
247-
_tracked.remove(id);
265+
} else {
266+
if(type!=TupleType.REGULAR) _collector.fail(tup);
248267
}
249268
} catch(FailedException e) {
250269
LOG.error("Failed to finish batch", e);
251-
track.failed = true;
252-
ret = true;
270+
for(Tuple t: track.ackTuples) {
271+
_collector.fail(t);
272+
}
273+
_tracked.remove(id);
274+
failed = true;
253275
}
254276
}
255-
return ret;
277+
return failed;
256278
}
257279

258280
public void execute(Tuple tuple) {
259281
Object id = tuple.getValue(0);
260282
TrackingInfo track;
283+
TupleType type = getTupleType(tuple);
261284
synchronized(_tracked) {
262285
track = _tracked.get(id);
263286
if(track==null) {
@@ -267,31 +290,18 @@ public void execute(Tuple tuple) {
267290
}
268291
}
269292

270-
if(_idStreamSpec!=null
271-
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
293+
if(type==TupleType.ID) {
272294
synchronized(_tracked) {
273295
track.receivedId = true;
274296
}
275-
boolean failed = checkFinishId(tuple);
276-
if(failed) {
277-
_collector.fail(tuple);
278-
} else {
279-
_collector.ack(tuple);
280-
}
281-
282-
} else if(!_sourceArgs.isEmpty()
283-
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
297+
checkFinishId(tuple, type);
298+
} else if(type==TupleType.COORD) {
284299
int count = (Integer) tuple.getValue(1);
285300
synchronized(_tracked) {
286301
track.reportCount++;
287302
track.expectedTupleCount+=count;
288303
}
289-
boolean failed = checkFinishId(tuple);
290-
if(failed) {
291-
_collector.fail(tuple);
292-
} else {
293-
_collector.ack(tuple);
294-
}
304+
checkFinishId(tuple, type);
295305
} else {
296306
synchronized(_tracked) {
297307
_delegate.execute(tuple);
@@ -323,12 +333,31 @@ private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, Track
323333
@Override
324334
public void expire(Object id, TrackingInfo val) {
325335
synchronized(_tracked) {
326-
// make sure we don't time out something that has been finished. the combination of
327-
// the flag and the lock ensure this
336+
// the combination of the lock and the finished flag ensure that
337+
// an id is never timed out if it has been finished
338+
val.failed = true;
328339
if(!val.finished) {
329340
((TimeoutCallback) _delegate).timeoutId(id);
330341
}
331342
}
332343
}
333-
}
344+
}
345+
346+
private TupleType getTupleType(Tuple tuple) {
347+
if(_idStreamSpec!=null
348+
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
349+
return TupleType.ID;
350+
} else if(!_sourceArgs.isEmpty()
351+
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
352+
return TupleType.COORD;
353+
} else {
354+
return TupleType.REGULAR;
355+
}
356+
}
357+
358+
static enum TupleType {
359+
REGULAR,
360+
ID,
361+
COORD
362+
}
334363
}

0 commit comments

Comments
 (0)