Skip to content

Commit dc8a3d1

Browse files
committed
fix memorymapstate and lrumemorymapstate to avoid concurrent access to same map, update trident integration test of getting all tuples to properly do broadcast
1 parent 45d0fae commit dc8a3d1

File tree

3 files changed

+3
-2
lines changed

3 files changed

+3
-2
lines changed

src/jvm/storm/trident/testing/LRUMemoryMapState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public Factory(int maxSize) {
7272

7373
@Override
7474
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
75-
return new LRUMemoryMapState(_maxSize, _id);
75+
return new LRUMemoryMapState(_maxSize, _id + partitionIndex);
7676
}
7777
}
7878

src/jvm/storm/trident/testing/MemoryMapState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Factory() {
6969

7070
@Override
7171
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
72-
return new MemoryMapState(_id);
72+
return new MemoryMapState(_id + partitionIndex);
7373
}
7474
}
7575

test/clj/storm/trident/integration_test.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
))
2727
(-> topo
2828
(.newDRPCStream "all-tuples" drpc)
29+
(.broadcast)
2930
(.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
3031
(.project (fields "word" "count")))
3132
(with-topology [cluster topo]

0 commit comments

Comments
 (0)