Skip to content

Commit 7ae40de

Browse files
author
Nathan Marz
committed
make isReady only apply to transactions that haven't been emitted before
1 parent b699f0c commit 7ae40de

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,11 @@ private void sync() {
122122
}
123123

124124
try {
125-
if(_coordinator.isReady() && _activeTx.size() < _maxTransactionActive) {
125+
if(_activeTx.size() < _maxTransactionActive) {
126126
BigInteger curr = _currTransaction;
127127
for(int i=0; i<_maxTransactionActive; i++) {
128-
if(!_activeTx.containsKey(curr)) {
128+
if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
129+
&& !_activeTx.containsKey(curr)) {
129130
TransactionAttempt attempt = new TransactionAttempt(curr, Utils.randomLong());
130131
Object state = _coordinatorState.getState(curr, _initializer);
131132
_activeTx.put(curr, new TransactionStatus(attempt));

src/jvm/backtype/storm/transactional/state/RotatingTransactionalState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public Object getState(BigInteger txid, StateInitializer init) {
8888
}
8989
return _curr.get(txid);
9090
}
91+
92+
public boolean hasCache(BigInteger txid) {
93+
return _curr.containsKey(txid);
94+
}
9195

9296
/**
9397
* Returns null if it was created, the value otherwise.

test/clj/backtype/storm/transactional_test.clj

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
nil)
2525
(getCoordinator [this conf context]
2626
(reify ITransactionalSpout$Coordinator
27-
(isReady [this] true)
27+
(isReady [this] (not (nil? @atom)))
2828
(initializeTransaction [this txid prevMetadata]
2929
@atom )
3030
(close [this]
@@ -115,12 +115,17 @@
115115
(.ack coordinator commit-id)
116116
(bind commit-id (get-commit emit-capture))
117117
(verify-and-reset! {COMMIT-STREAM [[2]] BATCH-STREAM [[5 12]]} emit-capture)
118+
(reset! coordinator-state nil)
118119
(.ack coordinator commit-id)
119-
(verify-and-reset! {BATCH-STREAM [[6 12]]} emit-capture)
120+
(verify-and-reset! {} emit-capture)
120121

121122
(.fail coordinator (nth attempts 1))
122123
(bind attempts (get-attempts emit-capture BATCH-STREAM))
123-
(verify-and-reset! {BATCH-STREAM [[3 10] [4 10] [5 12] [6 12]]} emit-capture)
124+
(verify-and-reset! {BATCH-STREAM [[3 10] [4 10] [5 12]]} emit-capture)
125+
126+
(reset! coordinator-state 12)
127+
(.nextTuple coordinator)
128+
(verify-and-reset! {BATCH-STREAM [[6 12]]} emit-capture)
124129

125130
(.ack coordinator (first attempts))
126131
(bind commit-id (get-commit emit-capture))

0 commit comments

Comments
 (0)