Skip to content

Commit cfdb01b

Browse files
author
Nathan Marz
committed
Merge branch 'master' into 0.8.2
2 parents c405490 + 7bfe363 commit cfdb01b

File tree

3 files changed

+14
-4
lines changed

3 files changed

+14
-4
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
* Added MultiScheme interface (thanks sritchie)
3030
* Added MockTridentTuple for testing (thanks emblem)
3131
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
32+
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
3233
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
3334
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
3435
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
36+
* Bug fix: fixed NPE when emitting during emit method of Aggregator
3537

3638
## 0.8.1
3739

src/jvm/storm/trident/operation/impl/GroupedAggregator.java

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collect
5656
} else {
5757
curr = val.get(group);
5858
}
59+
groupColl.currGroup = group;
5960
_agg.aggregate(curr, input, groupColl);
6061

6162
}

src/jvm/storm/trident/tuple/ComboList.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
import java.io.Serializable;
44
import java.util.AbstractList;
55
import java.util.List;
6+
import org.apache.commons.lang.builder.ToStringBuilder;
67

78

89
public class ComboList extends AbstractList<Object> {
910
public static class Factory implements Serializable {
1011
Pointer[] index;
11-
int numLists;
12+
int[] sizes;
1213

1314
public Factory(int... sizes) {
14-
numLists = sizes.length;
15+
this.sizes = sizes;
1516
int total = 0;
1617
for(int size: sizes) {
1718
total+=size;
@@ -29,8 +30,14 @@ public Factory(int... sizes) {
2930
}
3031

3132
public ComboList create(List[] delegates) {
32-
if(delegates.length!=numLists) {
33-
throw new RuntimeException("Expected " + numLists + " lists, but instead got " + delegates.length + " lists");
33+
if(delegates.length!=sizes.length) {
34+
throw new RuntimeException("Expected " + sizes.length + " lists, but instead got " + delegates.length + " lists");
35+
}
36+
for(int i=0; i<delegates.length; i++) {
37+
List l = delegates[i];
38+
if(l==null || l.size() != sizes[i]) {
39+
throw new RuntimeException("Got unexpected delegates to ComboList: " + ToStringBuilder.reflectionToString(delegates));
40+
}
3441
}
3542
return new ComboList(delegates, index);
3643
}

0 commit comments

Comments
 (0)