|
1 | 1 | package storm.trident.spout;
|
2 | 2 |
|
| 3 | +import backtype.storm.Config; |
3 | 4 | import backtype.storm.spout.ISpoutOutputCollector;
|
4 | 5 | import backtype.storm.spout.SpoutOutputCollector;
|
5 | 6 | import backtype.storm.task.TopologyContext;
|
6 | 7 | import backtype.storm.topology.IRichSpout;
|
7 | 8 | import backtype.storm.tuple.Fields;
|
| 9 | +import backtype.storm.utils.RotatingMap; |
8 | 10 | import java.util.ArrayList;
|
9 | 11 | import java.util.List;
|
10 | 12 | import java.util.Map;
|
11 |
| -import java.util.SortedMap; |
12 |
| -import java.util.TreeMap; |
13 | 13 | import storm.trident.operation.TridentCollector;
|
| 14 | +import storm.trident.topology.TransactionAttempt; |
14 | 15 | import storm.trident.util.TridentUtils;
|
15 | 16 |
|
16 |
| -public class RichSpoutBatchExecutor implements IBatchSpout { |
| 17 | +public class RichSpoutBatchExecutor implements ITridentSpout { |
17 | 18 | public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
|
18 | 19 |
|
19 | 20 | IRichSpout _spout;
|
20 |
| - int _maxBatchSize; |
21 |
| - boolean prepared = false; |
22 |
| - Map _conf; |
23 |
| - TopologyContext _context; |
24 |
| - CaptureCollector _collector; |
25 |
| - TreeMap<Long, List<Object>> idsMap = new TreeMap(); |
26 | 21 |
|
27 | 22 | public RichSpoutBatchExecutor(IRichSpout spout) {
|
28 | 23 | _spout = spout;
|
29 | 24 | }
|
30 |
| - |
| 25 | + |
| 26 | + @Override |
| 27 | + public Map getComponentConfiguration() { |
| 28 | + return _spout.getComponentConfiguration(); |
| 29 | + } |
| 30 | + |
| 31 | + @Override |
| 32 | + public Fields getOutputFields() { |
| 33 | + return TridentUtils.getSingleOutputStreamFields(_spout); |
| 34 | + |
| 35 | + } |
| 36 | + |
31 | 37 | @Override
|
32 |
| - public void open(Map conf, TopologyContext context) { |
33 |
| - Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); |
34 |
| - if(batchSize==null) batchSize = 1000; |
35 |
| - _maxBatchSize = batchSize.intValue(); |
36 |
| - _collector = new CaptureCollector(); |
| 38 | + public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) { |
| 39 | + return new RichSpoutCoordinator(); |
37 | 40 | }
|
38 | 41 |
|
39 | 42 | @Override
|
40 |
| - public void emitBatch(long txid, TridentCollector collector) { |
41 |
| - _collector.reset(collector); |
42 |
| - SortedMap<Long, List<Object>> toFail = idsMap.tailMap(txid); |
43 |
| - for(long failTx: idsMap.keySet()) { |
44 |
| - for(Object id: idsMap.get(failTx)) { |
45 |
| - _spout.fail(id); |
| 43 | + public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) { |
| 44 | + return new RichSpoutEmitter(conf, context); |
| 45 | + } |
| 46 | + |
| 47 | + class RichSpoutEmitter implements ITridentSpout.Emitter<Object> { |
| 48 | + int _maxBatchSize; |
| 49 | + boolean prepared = false; |
| 50 | + CaptureCollector _collector; |
| 51 | + RotatingMap<Long, List<Object>> idsMap; |
| 52 | + Map _conf; |
| 53 | + TopologyContext _context; |
| 54 | + long lastRotate = System.currentTimeMillis(); |
| 55 | + long rotateTime; |
| 56 | + |
| 57 | + public RichSpoutEmitter(Map conf, TopologyContext context) { |
| 58 | + _conf = conf; |
| 59 | + _context = context; |
| 60 | + Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); |
| 61 | + if(batchSize==null) batchSize = 1000; |
| 62 | + _maxBatchSize = batchSize.intValue(); |
| 63 | + _collector = new CaptureCollector(); |
| 64 | + idsMap = new RotatingMap(3); |
| 65 | + rotateTime = 1000 * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); |
| 66 | + } |
| 67 | + |
| 68 | + @Override |
| 69 | + public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) { |
| 70 | + long now = System.currentTimeMillis(); |
| 71 | + if(now - lastRotate > rotateTime) { |
| 72 | + Map<Long, List<Object>> failed = idsMap.rotate(); |
| 73 | + for(List<Object> failedIds: failed.values()) { |
| 74 | + for(Object id: failedIds) { |
| 75 | + _spout.fail(id); |
| 76 | + } |
| 77 | + } |
| 78 | + lastRotate = now; |
| 79 | + } |
| 80 | + |
| 81 | + _collector.reset(collector); |
| 82 | + if(!prepared) { |
| 83 | + _spout.open(_conf, _context, new SpoutOutputCollector(_collector)); |
| 84 | + prepared = true; |
| 85 | + } |
| 86 | + for(int i=0; i<_maxBatchSize; i++) { |
| 87 | + _spout.nextTuple(); |
| 88 | + if(_collector.numEmitted < i) { |
| 89 | + break; |
| 90 | + } |
46 | 91 | }
|
| 92 | + idsMap.put(tx.getTransactionId(), _collector.ids); |
| 93 | + |
47 | 94 | }
|
48 |
| - toFail.clear(); |
49 |
| - if(!prepared) { |
50 |
| - _spout.open(_conf, _context, new SpoutOutputCollector(_collector)); |
51 |
| - prepared = true; |
| 95 | + |
| 96 | + @Override |
| 97 | + public void success(TransactionAttempt tx) { |
| 98 | + ack(tx.getTransactionId()); |
52 | 99 | }
|
53 |
| - for(int i=0; i<_maxBatchSize; i++) { |
54 |
| - _spout.nextTuple(); |
55 |
| - if(_collector.numEmitted < i) { |
56 |
| - break; |
| 100 | + |
| 101 | + private void ack(long batchId) { |
| 102 | + List<Object> ids = (List<Object>) idsMap.remove(batchId); |
| 103 | + if(ids!=null) { |
| 104 | + for(Object id: ids) { |
| 105 | + _spout.ack(id); |
| 106 | + } |
57 | 107 | }
|
58 |
| - idsMap.put(txid, _collector.ids); |
| 108 | + } |
| 109 | + |
| 110 | + @Override |
| 111 | + public void close() { |
59 | 112 | }
|
| 113 | + |
60 | 114 | }
|
61 | 115 |
|
62 |
| - @Override |
63 |
| - public void ack(long batchId) { |
64 |
| - List<Object> ids = idsMap.remove(batchId); |
65 |
| - if(ids!=null) { |
66 |
| - for(Object id: ids) { |
67 |
| - _spout.ack(id); |
68 |
| - } |
| 116 | + class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator { |
| 117 | + @Override |
| 118 | + public Object initializeTransaction(long txid, Object prevMetadata) { |
| 119 | + return null; |
69 | 120 | }
|
70 |
| - } |
71 | 121 |
|
72 |
| - @Override |
73 |
| - public void close() { |
74 |
| - _spout.close(); |
75 |
| - } |
| 122 | + @Override |
| 123 | + public void success(long txid) { |
| 124 | + } |
76 | 125 |
|
77 |
| - @Override |
78 |
| - public Map getComponentConfiguration() { |
79 |
| - return _spout.getComponentConfiguration(); |
80 |
| - } |
| 126 | + @Override |
| 127 | + public boolean isReady(long txid) { |
| 128 | + return true; |
| 129 | + } |
81 | 130 |
|
82 |
| - @Override |
83 |
| - public Fields getOutputFields() { |
84 |
| - return TridentUtils.getSingleOutputStreamFields(_spout); |
85 |
| - |
| 131 | + @Override |
| 132 | + public void close() { |
| 133 | + } |
86 | 134 | }
|
87 | 135 |
|
88 | 136 | static class CaptureCollector implements ISpoutOutputCollector {
|
|
0 commit comments