File tree Expand file tree Collapse file tree 2 files changed +11
-0
lines changed
storm-core/src/jvm/backtype/storm Expand file tree Collapse file tree 2 files changed +11
-0
lines changed Original file line number Diff line number Diff line change @@ -726,6 +726,12 @@ public class Config extends HashMap<String, Object> {
726
726
public static final String TOPOLOGY_NAME ="topology.name" ;
727
727
public static final Object TOPOLOGY_NAME_SCHEMA = String .class ;
728
728
729
+ /**
730
+ * Max pending tuples in one ShellBolt
731
+ */
732
+ public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING ="topology.shellbolt.max.pending" ;
733
+ public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number .class ;
734
+
729
735
/**
730
736
* The root directory in ZooKeeper for metadata about TransactionalSpouts.
731
737
*/
Original file line number Diff line number Diff line change 1
1
package backtype .storm .task ;
2
2
3
+ import backtype .storm .Config ;
3
4
import backtype .storm .generated .ShellComponent ;
4
5
import backtype .storm .tuple .MessageId ;
5
6
import backtype .storm .tuple .Tuple ;
@@ -71,6 +72,10 @@ public ShellBolt(String... command) {
71
72
72
73
public void prepare (Map stormConf , TopologyContext context ,
73
74
final OutputCollector collector ) {
75
+ Object maxPending = stormConf .get (Config .TOPOLOGY_SHELLBOLT_MAX_PENDING );
76
+ if (maxPending != null && maxPending instanceof Number ) {
77
+ this ._pendingWrites = new LinkedBlockingQueue (((Number )maxPending ).intValue ());
78
+ }
74
79
_rand = new Random ();
75
80
_process = new ShellProcess (_command );
76
81
_collector = collector ;
You can’t perform that action at this time.
0 commit comments