4
4
import java .util .Map .Entry ;
5
5
import backtype .storm .tuple .Values ;
6
6
import backtype .storm .generated .GlobalStreamId ;
7
- import backtype .storm .Config ;
8
7
import java .util .Collection ;
9
8
import backtype .storm .Constants ;
10
9
import backtype .storm .generated .Grouping ;
@@ -89,9 +88,9 @@ public void ack(Tuple tuple) {
89
88
if (track != null )
90
89
track .receivedTuples ++;
91
90
}
92
- boolean failed = checkFinishId (tuple );
91
+ boolean failed = checkFinishId (tuple , TupleType . REGULAR );
93
92
if (failed ) {
94
- _delegate .fail (tuple );
93
+ _delegate .fail (tuple );
95
94
} else {
96
95
_delegate .ack (tuple );
97
96
}
@@ -104,6 +103,7 @@ public void fail(Tuple tuple) {
104
103
if (track != null )
105
104
track .failed = true ;
106
105
}
106
+ checkFinishId (tuple , TupleType .REGULAR );
107
107
_delegate .fail (tuple );
108
108
}
109
109
@@ -142,6 +142,7 @@ public static class TrackingInfo {
142
142
Map <Integer , Integer > taskEmittedTuples = new HashMap <Integer , Integer >();
143
143
boolean receivedId = false ;
144
144
boolean finished = false ;
145
+ List <Tuple > ackTuples = new ArrayList <Tuple >();
145
146
146
147
@ Override
147
148
public String toString () {
@@ -213,51 +214,73 @@ public void prepare(Map config, TopologyContext context, OutputCollector collect
213
214
}
214
215
}
215
216
216
- private boolean checkFinishId (Tuple tup ) {
217
- boolean ret = false ;
217
+ private boolean checkFinishId (Tuple tup , TupleType type ) {
218
218
Object id = tup .getValue (0 );
219
+ boolean failed = false ;
220
+
219
221
synchronized (_tracked ) {
220
222
TrackingInfo track = _tracked .get (id );
221
223
try {
222
- // if it timed out, then obviously it failed (hence the null check)
223
- if (track ==null || track .failed ) ret = true ;
224
- if (track !=null
225
- && !track .failed
226
- && track .receivedId
227
- && (_sourceArgs .isEmpty ()
228
- ||
229
- track .reportCount ==_numSourceReports &&
230
- track .expectedTupleCount == track .receivedTuples )) {
231
- if (_delegate instanceof FinishedCallback ) {
232
- ((FinishedCallback )_delegate ).finishedId (id );
224
+ if (track !=null ) {
225
+ boolean delayed = false ;
226
+ if (_idStreamSpec ==null && type == TupleType .COORD || _idStreamSpec !=null && type ==TupleType .ID ) {
227
+ track .ackTuples .add (tup );
228
+ delayed = true ;
233
229
}
234
- if (!(_sourceArgs .isEmpty () ||
235
- tup .getSourceStreamId ().equals (Constants .COORDINATED_STREAM_ID ) ||
236
- (_idStreamSpec !=null && tup .getSourceGlobalStreamid ().equals (_idStreamSpec ._id ))
237
- )) {
238
- throw new IllegalStateException ("Coordination condition met on a non-coordinating tuple. Should be impossible" );
230
+ if (track .failed ) {
231
+ failed = true ;
232
+ for (Tuple t : track .ackTuples ) {
233
+ _collector .fail (t );
234
+ }
235
+ _tracked .remove (id );
236
+ } else if (track .receivedId
237
+ && (_sourceArgs .isEmpty () ||
238
+ track .reportCount ==_numSourceReports &&
239
+ track .expectedTupleCount == track .receivedTuples )){
240
+ if (_delegate instanceof FinishedCallback ) {
241
+ ((FinishedCallback )_delegate ).finishedId (id );
242
+ }
243
+ if (!(_sourceArgs .isEmpty () || type !=TupleType .REGULAR )) {
244
+ throw new IllegalStateException ("Coordination condition met on a non-coordinating tuple. Should be impossible" );
245
+ }
246
+ Iterator <Integer > outTasks = _countOutTasks .iterator ();
247
+ while (outTasks .hasNext ()) {
248
+ int task = outTasks .next ();
249
+ int numTuples = get (track .taskEmittedTuples , task , 0 );
250
+ _collector .emitDirect (task , Constants .COORDINATED_STREAM_ID , tup , new Values (id , numTuples ));
251
+ }
252
+ for (Tuple t : track .ackTuples ) {
253
+ _collector .ack (t );
254
+ }
255
+ track .finished = true ;
256
+ _tracked .remove (id );
239
257
}
240
- Iterator <Integer > outTasks = _countOutTasks .iterator ();
241
- while (outTasks .hasNext ()) {
242
- int task = outTasks .next ();
243
- int numTuples = get (track .taskEmittedTuples , task , 0 );
244
- _collector .emitDirect (task , Constants .COORDINATED_STREAM_ID , tup , new Values (id , numTuples ));
258
+ if (!delayed && type !=TupleType .REGULAR ) {
259
+ if (track .failed ) {
260
+ _collector .fail (tup );
261
+ } else {
262
+ _collector .ack (tup );
263
+ }
245
264
}
246
- track . finished = true ;
247
- _tracked . remove ( id );
265
+ } else {
266
+ if ( type != TupleType . REGULAR ) _collector . fail ( tup );
248
267
}
249
268
} catch (FailedException e ) {
250
269
LOG .error ("Failed to finish batch" , e );
251
- track .failed = true ;
252
- ret = true ;
270
+ for (Tuple t : track .ackTuples ) {
271
+ _collector .fail (t );
272
+ }
273
+ _tracked .remove (id );
274
+ failed = true ;
253
275
}
254
276
}
255
- return ret ;
277
+ return failed ;
256
278
}
257
279
258
280
public void execute (Tuple tuple ) {
259
281
Object id = tuple .getValue (0 );
260
282
TrackingInfo track ;
283
+ TupleType type = getTupleType (tuple );
261
284
synchronized (_tracked ) {
262
285
track = _tracked .get (id );
263
286
if (track ==null ) {
@@ -267,31 +290,18 @@ public void execute(Tuple tuple) {
267
290
}
268
291
}
269
292
270
- if (_idStreamSpec !=null
271
- && tuple .getSourceGlobalStreamid ().equals (_idStreamSpec ._id )) {
293
+ if (type ==TupleType .ID ) {
272
294
synchronized (_tracked ) {
273
295
track .receivedId = true ;
274
296
}
275
- boolean failed = checkFinishId (tuple );
276
- if (failed ) {
277
- _collector .fail (tuple );
278
- } else {
279
- _collector .ack (tuple );
280
- }
281
-
282
- } else if (!_sourceArgs .isEmpty ()
283
- && tuple .getSourceStreamId ().equals (Constants .COORDINATED_STREAM_ID )) {
297
+ checkFinishId (tuple , type );
298
+ } else if (type ==TupleType .COORD ) {
284
299
int count = (Integer ) tuple .getValue (1 );
285
300
synchronized (_tracked ) {
286
301
track .reportCount ++;
287
302
track .expectedTupleCount +=count ;
288
303
}
289
- boolean failed = checkFinishId (tuple );
290
- if (failed ) {
291
- _collector .fail (tuple );
292
- } else {
293
- _collector .ack (tuple );
294
- }
304
+ checkFinishId (tuple , type );
295
305
} else {
296
306
synchronized (_tracked ) {
297
307
_delegate .execute (tuple );
@@ -323,12 +333,31 @@ private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, Track
323
333
@ Override
324
334
public void expire (Object id , TrackingInfo val ) {
325
335
synchronized (_tracked ) {
326
- // make sure we don't time out something that has been finished. the combination of
327
- // the flag and the lock ensure this
336
+ // the combination of the lock and the finished flag ensure that
337
+ // an id is never timed out if it has been finished
338
+ val .failed = true ;
328
339
if (!val .finished ) {
329
340
((TimeoutCallback ) _delegate ).timeoutId (id );
330
341
}
331
342
}
332
343
}
333
- }
344
+ }
345
+
346
+ private TupleType getTupleType (Tuple tuple ) {
347
+ if (_idStreamSpec !=null
348
+ && tuple .getSourceGlobalStreamid ().equals (_idStreamSpec ._id )) {
349
+ return TupleType .ID ;
350
+ } else if (!_sourceArgs .isEmpty ()
351
+ && tuple .getSourceStreamId ().equals (Constants .COORDINATED_STREAM_ID )) {
352
+ return TupleType .COORD ;
353
+ } else {
354
+ return TupleType .REGULAR ;
355
+ }
356
+ }
357
+
358
+ static enum TupleType {
359
+ REGULAR ,
360
+ ID ,
361
+ COORD
362
+ }
334
363
}
0 commit comments