Skip to content

Commit 69f1d2a

Browse files
author
Jason Jackson
committed
MicroBatchIBackingMap: avoid store timeouts on multiput and multiget
1 parent 4cdfc45 commit 69f1d2a

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package storm.trident.state.map;
2+
3+
import java.io.Serializable;
4+
import java.util.ArrayList;
5+
import java.util.LinkedList;
6+
import java.util.List;
7+
8+
public class MicroBatchIBackingMap<T> implements IBackingMap<T> {
9+
IBackingMap<T> _delegate;
10+
Options _options;
11+
12+
13+
public static class Options implements Serializable {
14+
public int maxMultiGetBatchSize = 0; // 0 means delegate batch size = trident batch size.
15+
public int maxMultiPutBatchSize = 0;
16+
}
17+
18+
public MicroBatchIBackingMap(final Options options, final IBackingMap<T> delegate) {
19+
_options = options;
20+
_delegate = delegate;
21+
assert options.maxMultiPutBatchSize >= 0;
22+
assert options.maxMultiGetBatchSize >= 0;
23+
}
24+
25+
@Override
26+
public void multiPut(final List<List<Object>> keys, final List<T> values) {
27+
int thisBatchSize;
28+
if(_options.maxMultiPutBatchSize == 0) { thisBatchSize = keys.size(); }
29+
else { thisBatchSize = _options.maxMultiPutBatchSize; }
30+
31+
LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
32+
LinkedList<T> valuesTodo = new LinkedList<T>(values);
33+
34+
while(!keysTodo.isEmpty()) {
35+
List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
36+
List<T> valuesBatch = new ArrayList<T>(thisBatchSize);
37+
for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
38+
keysBatch.add(keysTodo.removeFirst());
39+
valuesBatch.add(valuesTodo.removeFirst());
40+
}
41+
42+
_delegate.multiPut(keysBatch, valuesBatch);
43+
}
44+
}
45+
46+
@Override
47+
public List<T> multiGet(final List<List<Object>> keys) {
48+
int thisBatchSize;
49+
if(_options.maxMultiGetBatchSize == 0) { thisBatchSize = keys.size(); }
50+
else { thisBatchSize = _options.maxMultiGetBatchSize; }
51+
52+
LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
53+
54+
List<T> ret = new ArrayList<T>(keys.size());
55+
56+
while(!keysTodo.isEmpty()) {
57+
List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
58+
for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
59+
keysBatch.add(keysTodo.removeFirst());
60+
}
61+
62+
List<T> retSubset = _delegate.multiGet(keysBatch);
63+
ret.addAll(retSubset);
64+
}
65+
66+
return ret;
67+
}
68+
}

0 commit comments

Comments
 (0)