Skip to content

Commit 436a8ac

Browse files
author
Nathan Marz
committed
fix bug in LinearDRPCTopologyBuilder
1 parent 9fbc42b commit 436a8ac

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,14 @@ private StormTopology createTopology(DRPCSpout spout) {
9191
if(i==0 && component.declarations.size()==0) {
9292
declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
9393
} else {
94+
String prevId;
95+
if(i==0) {
96+
prevId = PREPARE_ID;
97+
} else {
98+
prevId = boltId(i-1);
99+
}
94100
for(InputDeclaration declaration: component.declarations) {
95-
declaration.declare(boltId(i-1), declarer);
101+
declaration.declare(prevId, declarer);
96102
}
97103
}
98104
if(i>0) {

src/jvm/backtype/storm/drpc/PrepareRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
import backtype.storm.tuple.Values;
1010
import java.util.Map;
1111
import java.util.Random;
12+
import backtype.storm.utils.Utils;
1213

1314

1415
public class PrepareRequest implements IBasicBolt {
15-
public static final String ARGS_STREAM = "args";
16+
public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
1617
public static final String RETURN_STREAM = "ret";
1718
public static final String ID_STREAM = "id";
1819

0 commit comments

Comments
 (0)