Skip to content

Commit 4f63ad5

Browse files
author
Nathan Marz
committed
added localOrShuffleGrouping to java apis
1 parent 438a858 commit 4f63ad5

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<
1313
public LinearDRPCInputDeclarer shuffleGrouping();
1414
public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
1515

16+
public LinearDRPCInputDeclarer localOrShuffleGrouping();
17+
public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
18+
1619
public LinearDRPCInputDeclarer noneGrouping();
1720
public LinearDRPCInputDeclarer noneGrouping(String streamId);
1821

src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,28 @@ public void declare(String prevComponent, InputDeclarer declarer) {
242242
return this;
243243
}
244244

245+
@Override
246+
public LinearDRPCInputDeclarer localOrShuffleGrouping() {
247+
addDeclaration(new InputDeclaration() {
248+
@Override
249+
public void declare(String prevComponent, InputDeclarer declarer) {
250+
declarer.localOrShuffleGrouping(prevComponent);
251+
}
252+
});
253+
return this;
254+
}
255+
256+
@Override
257+
public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) {
258+
addDeclaration(new InputDeclaration() {
259+
@Override
260+
public void declare(String prevComponent, InputDeclarer declarer) {
261+
declarer.localOrShuffleGrouping(prevComponent, streamId);
262+
}
263+
});
264+
return this;
265+
}
266+
245267
@Override
246268
public LinearDRPCInputDeclarer noneGrouping() {
247269
addDeclaration(new InputDeclaration() {

src/jvm/backtype/storm/topology/InputDeclarer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
1616
public T shuffleGrouping(String componentId);
1717
public T shuffleGrouping(String componentId, String streamId);
1818

19+
public T localOrShuffleGrouping(String componentId);
20+
public T localOrShuffleGrouping(String componentId, String streamId);
21+
1922
public T noneGrouping(String componentId);
2023
public T noneGrouping(String componentId, String streamId);
2124

src/jvm/backtype/storm/topology/TopologyBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
276276
return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
277277
}
278278

279+
public BoltDeclarer localOrShuffleGrouping(String componentId) {
280+
return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
281+
}
282+
283+
public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) {
284+
return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct()));
285+
}
286+
279287
public BoltDeclarer noneGrouping(String componentId) {
280288
return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
281289
}

src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,38 @@ public String getComponent() {
303303
return this;
304304
}
305305

306+
@Override
307+
public BoltDeclarer localOrShuffleGrouping(final String component) {
308+
addDeclaration(new InputDeclaration() {
309+
@Override
310+
public void declare(InputDeclarer declarer) {
311+
declarer.localOrShuffleGrouping(component);
312+
}
313+
314+
@Override
315+
public String getComponent() {
316+
return component;
317+
}
318+
});
319+
return this;
320+
}
321+
322+
@Override
323+
public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
324+
addDeclaration(new InputDeclaration() {
325+
@Override
326+
public void declare(InputDeclarer declarer) {
327+
declarer.localOrShuffleGrouping(component, streamId);
328+
}
329+
330+
@Override
331+
public String getComponent() {
332+
return component;
333+
}
334+
});
335+
return this;
336+
}
337+
306338
@Override
307339
public BoltDeclarer noneGrouping(final String component) {
308340
addDeclaration(new InputDeclaration() {

0 commit comments

Comments
 (0)