@@ -26,13 +26,15 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
26
26
private String _finishedPartitionsId ;
27
27
private int _takeAmt ;
28
28
private Fields _outFields ;
29
+ private Map <Integer , List <List <Object >>> _initialPartitions ;
29
30
30
31
public MemoryTransactionalSpout (Map <Integer , List <List <Object >>> partitions , Fields outFields , int takeAmt ) {
31
32
_id = RegisteredGlobalState .registerState (partitions );
32
33
Map <Integer , Boolean > finished = Collections .synchronizedMap (new HashMap <Integer , Boolean >());
33
34
_finishedPartitionsId = RegisteredGlobalState .registerState (finished );
34
35
_takeAmt = takeAmt ;
35
36
_outFields = outFields ;
37
+ _initialPartitions = partitions ;
36
38
}
37
39
38
40
public boolean isExhaustedTuples () {
@@ -149,8 +151,10 @@ public void cleanup() {
149
151
RegisteredGlobalState .clearState (_finishedPartitionsId );
150
152
}
151
153
152
- private Map <Integer , List <List <Object >>> getQueues () {
153
- return (Map <Integer , List <List <Object >>>) RegisteredGlobalState .getState (_id );
154
+ private Map <Integer , List <List <Object >>> getQueues () {
155
+ Map <Integer , List <List <Object >>> ret = (Map <Integer , List <List <Object >>>) RegisteredGlobalState .getState (_id );
156
+ if (ret !=null ) return ret ;
157
+ else return _initialPartitions ;
154
158
}
155
159
156
160
private Map <Integer , Boolean > getFinishedStatuses () {
0 commit comments