Skip to content

Commit 45aebfe

Browse files
committed
add shell bolt pending limit in fresh branch
1 parent 6efadbe commit 45aebfe

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

storm-core/src/jvm/backtype/storm/Config.java

+6
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,12 @@ public class Config extends HashMap<String, Object> {
726726
public static final String TOPOLOGY_NAME="topology.name";
727727
public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
728728

729+
/**
730+
* Max pending tuples in one ShellBolt
731+
*/
732+
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
733+
public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number.class;
734+
729735
/**
730736
* The root directory in ZooKeeper for metadata about TransactionalSpouts.
731737
*/

storm-core/src/jvm/backtype/storm/task/ShellBolt.java

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package backtype.storm.task;
22

3+
import backtype.storm.Config;
34
import backtype.storm.generated.ShellComponent;
45
import backtype.storm.tuple.MessageId;
56
import backtype.storm.tuple.Tuple;
@@ -71,6 +72,10 @@ public ShellBolt(String... command) {
7172

7273
public void prepare(Map stormConf, TopologyContext context,
7374
final OutputCollector collector) {
75+
Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
76+
if (maxPending != null && maxPending instanceof Number) {
77+
this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
78+
}
7479
_rand = new Random();
7580
_process = new ShellProcess(_command);
7681
_collector = collector;

0 commit comments

Comments
 (0)