Skip to content

Commit bfb22f9

Browse files
author
Nathan Marz
committed
changed FailedBatchException to FailedException and moved it to topology namespace so it could be used with basicbolt as well
1 parent 69bad92 commit bfb22f9

File tree

8 files changed

+39
-30
lines changed

8 files changed

+39
-30
lines changed

src/jvm/backtype/storm/coordination/BatchBoltExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
55
import backtype.storm.task.OutputCollector;
66
import backtype.storm.task.TopologyContext;
7+
import backtype.storm.topology.FailedException;
78
import backtype.storm.topology.IRichBolt;
89
import backtype.storm.topology.OutputFieldsDeclarer;
910
import backtype.storm.tuple.Tuple;
@@ -40,7 +41,7 @@ public void execute(Tuple input) {
4041
try {
4142
bolt.execute(input);
4243
_collector.ack(input);
43-
} catch(FailedBatchException e) {
44+
} catch(FailedException e) {
4445
LOG.error("Failed to process tuple in batch", e);
4546
_collector.fail(input);
4647
}

src/jvm/backtype/storm/coordination/CoordinatedBolt.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package backtype.storm.coordination;
22

3+
import backtype.storm.topology.FailedException;
34
import java.util.Map.Entry;
45
import backtype.storm.tuple.Values;
56
import backtype.storm.generated.GlobalStreamId;
@@ -238,7 +239,7 @@ private boolean checkFinishId(Tuple tup) {
238239
track.finished = true;
239240
_tracked.remove(id);
240241
}
241-
} catch(FailedBatchException e) {
242+
} catch(FailedException e) {
242243
LOG.error("Failed to finish batch", e);
243244
track.failed = true;
244245
ret = true;

src/jvm/backtype/storm/coordination/FailedBatchException.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/jvm/backtype/storm/topology/BasicBoltExecutor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import backtype.storm.task.OutputCollector;
44
import backtype.storm.task.TopologyContext;
55
import backtype.storm.tuple.Tuple;
6-
import java.util.HashMap;
76
import java.util.Map;
7+
import org.apache.log4j.Logger;
88

99
public class BasicBoltExecutor implements IRichBolt {
10+
public static Logger LOG = Logger.getLogger(BasicBoltExecutor.class);
1011

1112
private IBasicBolt _bolt;
1213
private transient BasicOutputCollector _collector;
@@ -27,8 +28,13 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
2728

2829
public void execute(Tuple input) {
2930
_collector.setContext(input);
30-
_bolt.execute(input, _collector);
31-
_collector.getOutputter().ack(input);
31+
try {
32+
_bolt.execute(input, _collector);
33+
_collector.getOutputter().ack(input);
34+
} catch(FailedException e) {
35+
LOG.warn("Failed to process tuple", e);
36+
_collector.getOutputter().fail(input);
37+
}
3238
}
3339

3440
public void cleanup() {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package backtype.storm.topology;
2+
3+
public class FailedException extends RuntimeException {
4+
public FailedException() {
5+
super();
6+
}
7+
8+
public FailedException(String msg) {
9+
super(msg);
10+
}
11+
12+
public FailedException(String msg, Throwable cause) {
13+
super(msg, cause);
14+
}
15+
16+
public FailedException(Throwable cause) {
17+
super(cause);
18+
}
19+
}

src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package backtype.storm.transactional;
22

3-
import backtype.storm.coordination.FailedBatchException;
43
import backtype.storm.coordination.BatchOutputCollectorImpl;
54
import backtype.storm.task.OutputCollector;
65
import backtype.storm.task.TopologyContext;
6+
import backtype.storm.topology.FailedException;
77
import backtype.storm.topology.IRichBolt;
88
import backtype.storm.topology.OutputFieldsDeclarer;
99
import backtype.storm.tuple.Tuple;
@@ -34,7 +34,7 @@ public void execute(Tuple input) {
3434
try {
3535
_emitter.emitBatch(attempt, input.getValue(1), _collector);
3636
_collector.ack(input);
37-
} catch(FailedBatchException e) {
37+
} catch(FailedException e) {
3838
LOG.warn("Failed to emit batch for transaction", e);
3939
_collector.fail(input);
4040
}

src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package backtype.storm.transactional;
22

3-
import backtype.storm.coordination.FailedBatchException;
43
import backtype.storm.Config;
54
import backtype.storm.spout.SpoutOutputCollector;
65
import backtype.storm.task.TopologyContext;
6+
import backtype.storm.topology.FailedException;
77
import backtype.storm.topology.IRichSpout;
88
import backtype.storm.topology.OutputFieldsDeclarer;
99
import backtype.storm.transactional.state.RotatingTransactionalState;
@@ -137,7 +137,7 @@ private void sync() {
137137
curr = nextTransactionId(curr);
138138
}
139139
}
140-
} catch(FailedBatchException e) {
140+
} catch(FailedException e) {
141141
LOG.warn("Failed to get metadata for a transaction", e);
142142
}
143143
}

test/clj/backtype/storm/drpc_test.clj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
(:use [clojure test])
33
(:import [backtype.storm.drpc ReturnResults DRPCSpout
44
LinearDRPCTopologyBuilder])
5-
(:import [backtype.storm.coordination CoordinatedBolt$FinishedCallback FailedBatchException])
5+
(:import [backtype.storm.topology FailedException])
6+
(:import [backtype.storm.coordination CoordinatedBolt$FinishedCallback])
67
(:import [backtype.storm LocalDRPC LocalCluster])
78
(:import [backtype.storm.tuple Fields])
89
(:import [backtype.storm.generated DRPCExecutionException])
@@ -182,7 +183,7 @@
182183
(ack! collector tuple))
183184
CoordinatedBolt$FinishedCallback
184185
(finishedId [this id]
185-
(throw (FailedBatchException.))
186+
(throw (FailedException.))
186187
)))
187188

188189
(deftest test-drpc-fail-finish

0 commit comments

Comments
 (0)