Skip to content

Commit facc876

Browse files
author
Nathan Marz
committed
fix max transaction active parsing bug
1 parent b8b4f22 commit facc876

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
5656
_collector = collector;
5757
_coordinator = _spout.getCoordinator(conf, context);
5858
_currTransaction = getStoredCurrTransaction(_state);
59-
if(!conf.containsKey(Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
59+
Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
60+
if(active==null) {
6061
_maxTransactionActive = 1;
6162
} else {
62-
_maxTransactionActive = Utils.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
63+
_maxTransactionActive = Utils.getInt(active);
6364
}
6465
_initializer = new StateInitializer();
6566
}
@@ -113,8 +114,10 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
113114
}
114115

115116
private void sync() {
116-
// TODO: this code might be redundant. can just find the next transaction that needs a batch or commit tuple
117-
// and emit that, instead of iterating through (MAX_SPOUT_PENDING should take care of things)
117+
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
118+
// max_spout_pending = 3
119+
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
120+
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
118121
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
119122
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
120123
maybeCommit.status = AttemptStatus.COMMITTING;

0 commit comments

Comments
 (0)