Skip to content

Commit d15d5a2

Browse files
author
Nathan Marz
committed
fix richspoutbatchexecutor
1 parent 6d9c961 commit d15d5a2

File tree

2 files changed

+100
-51
lines changed

2 files changed

+100
-51
lines changed

src/jvm/backtype/storm/utils/RotatingMap.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,15 @@ public RotatingMap(int numBuckets) {
4848
this(numBuckets, null);
4949
}
5050

51-
public void rotate() {
51+
public Map<K, V> rotate() {
5252
Map<K, V> dead = _buckets.removeLast();
5353
_buckets.addFirst(new HashMap<K, V>());
5454
if(_callback!=null) {
5555
for(Entry<K, V> entry: dead.entrySet()) {
5656
_callback.expire(entry.getKey(), entry.getValue());
5757
}
5858
}
59+
return dead;
5960
}
6061

6162
public boolean containsKey(K key) {

src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,136 @@
11
package storm.trident.spout;
22

3+
import backtype.storm.Config;
34
import backtype.storm.spout.ISpoutOutputCollector;
45
import backtype.storm.spout.SpoutOutputCollector;
56
import backtype.storm.task.TopologyContext;
67
import backtype.storm.topology.IRichSpout;
78
import backtype.storm.tuple.Fields;
9+
import backtype.storm.utils.RotatingMap;
810
import java.util.ArrayList;
911
import java.util.List;
1012
import java.util.Map;
11-
import java.util.SortedMap;
12-
import java.util.TreeMap;
1313
import storm.trident.operation.TridentCollector;
14+
import storm.trident.topology.TransactionAttempt;
1415
import storm.trident.util.TridentUtils;
1516

16-
public class RichSpoutBatchExecutor implements IBatchSpout {
17+
public class RichSpoutBatchExecutor implements ITridentSpout {
1718
public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
1819

1920
IRichSpout _spout;
20-
int _maxBatchSize;
21-
boolean prepared = false;
22-
Map _conf;
23-
TopologyContext _context;
24-
CaptureCollector _collector;
25-
TreeMap<Long, List<Object>> idsMap = new TreeMap();
2621

2722
public RichSpoutBatchExecutor(IRichSpout spout) {
2823
_spout = spout;
2924
}
30-
25+
26+
@Override
27+
public Map getComponentConfiguration() {
28+
return _spout.getComponentConfiguration();
29+
}
30+
31+
@Override
32+
public Fields getOutputFields() {
33+
return TridentUtils.getSingleOutputStreamFields(_spout);
34+
35+
}
36+
3137
@Override
32-
public void open(Map conf, TopologyContext context) {
33-
Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
34-
if(batchSize==null) batchSize = 1000;
35-
_maxBatchSize = batchSize.intValue();
36-
_collector = new CaptureCollector();
38+
public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
39+
return new RichSpoutCoordinator();
3740
}
3841

3942
@Override
40-
public void emitBatch(long txid, TridentCollector collector) {
41-
_collector.reset(collector);
42-
SortedMap<Long, List<Object>> toFail = idsMap.tailMap(txid);
43-
for(long failTx: idsMap.keySet()) {
44-
for(Object id: idsMap.get(failTx)) {
45-
_spout.fail(id);
43+
public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
44+
return new RichSpoutEmitter(conf, context);
45+
}
46+
47+
class RichSpoutEmitter implements ITridentSpout.Emitter<Object> {
48+
int _maxBatchSize;
49+
boolean prepared = false;
50+
CaptureCollector _collector;
51+
RotatingMap<Long, List<Object>> idsMap;
52+
Map _conf;
53+
TopologyContext _context;
54+
long lastRotate = System.currentTimeMillis();
55+
long rotateTime;
56+
57+
public RichSpoutEmitter(Map conf, TopologyContext context) {
58+
_conf = conf;
59+
_context = context;
60+
Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
61+
if(batchSize==null) batchSize = 1000;
62+
_maxBatchSize = batchSize.intValue();
63+
_collector = new CaptureCollector();
64+
idsMap = new RotatingMap(3);
65+
rotateTime = 1000 * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
66+
}
67+
68+
@Override
69+
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
70+
long now = System.currentTimeMillis();
71+
if(now - lastRotate > rotateTime) {
72+
Map<Long, List<Object>> failed = idsMap.rotate();
73+
for(List<Object> failedIds: failed.values()) {
74+
for(Object id: failedIds) {
75+
_spout.fail(id);
76+
}
77+
}
78+
lastRotate = now;
79+
}
80+
81+
_collector.reset(collector);
82+
if(!prepared) {
83+
_spout.open(_conf, _context, new SpoutOutputCollector(_collector));
84+
prepared = true;
85+
}
86+
for(int i=0; i<_maxBatchSize; i++) {
87+
_spout.nextTuple();
88+
if(_collector.numEmitted < i) {
89+
break;
90+
}
4691
}
92+
idsMap.put(tx.getTransactionId(), _collector.ids);
93+
4794
}
48-
toFail.clear();
49-
if(!prepared) {
50-
_spout.open(_conf, _context, new SpoutOutputCollector(_collector));
51-
prepared = true;
95+
96+
@Override
97+
public void success(TransactionAttempt tx) {
98+
ack(tx.getTransactionId());
5299
}
53-
for(int i=0; i<_maxBatchSize; i++) {
54-
_spout.nextTuple();
55-
if(_collector.numEmitted < i) {
56-
break;
100+
101+
private void ack(long batchId) {
102+
List<Object> ids = (List<Object>) idsMap.remove(batchId);
103+
if(ids!=null) {
104+
for(Object id: ids) {
105+
_spout.ack(id);
106+
}
57107
}
58-
idsMap.put(txid, _collector.ids);
108+
}
109+
110+
@Override
111+
public void close() {
59112
}
113+
60114
}
61115

62-
@Override
63-
public void ack(long batchId) {
64-
List<Object> ids = idsMap.remove(batchId);
65-
if(ids!=null) {
66-
for(Object id: ids) {
67-
_spout.ack(id);
68-
}
116+
class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator {
117+
@Override
118+
public Object initializeTransaction(long txid, Object prevMetadata) {
119+
return null;
69120
}
70-
}
71121

72-
@Override
73-
public void close() {
74-
_spout.close();
75-
}
122+
@Override
123+
public void success(long txid) {
124+
}
76125

77-
@Override
78-
public Map getComponentConfiguration() {
79-
return _spout.getComponentConfiguration();
80-
}
126+
@Override
127+
public boolean isReady(long txid) {
128+
return true;
129+
}
81130

82-
@Override
83-
public Fields getOutputFields() {
84-
return TridentUtils.getSingleOutputStreamFields(_spout);
85-
131+
@Override
132+
public void close() {
133+
}
86134
}
87135

88136
static class CaptureCollector implements ISpoutOutputCollector {

0 commit comments

Comments
 (0)