Skip to content

Commit c0dae31

Browse files
author
Nathan Marz
committed
make MemoryTransactionalSpout work on a cluster
1 parent 2f66e58 commit c0dae31

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

src/jvm/backtype/storm/testing/MemoryTransactionalSpout.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<
2626
private String _finishedPartitionsId;
2727
private int _takeAmt;
2828
private Fields _outFields;
29+
private Map<Integer, List<List<Object>>> _initialPartitions;
2930

3031
public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
3132
_id = RegisteredGlobalState.registerState(partitions);
3233
Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
3334
_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
3435
_takeAmt = takeAmt;
3536
_outFields = outFields;
37+
_initialPartitions = partitions;
3638
}
3739

3840
public boolean isExhaustedTuples() {
@@ -149,8 +151,10 @@ public void cleanup() {
149151
RegisteredGlobalState.clearState(_finishedPartitionsId);
150152
}
151153

152-
private Map<Integer, List<List<Object>>> getQueues() {
153-
return (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
154+
private Map<Integer, List<List<Object>>> getQueues() {
155+
Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
156+
if(ret!=null) return ret;
157+
else return _initialPartitions;
154158
}
155159

156160
private Map<Integer, Boolean> getFinishedStatuses() {

0 commit comments

Comments
 (0)