Skip to content

Commit 75c78e9

Browse files
committed
KAFKA-5668; fetch across stores in CompositeReadOnlyWindowStore & CompositeReadOnlySessionStore
Fix range queries in `CompositeReadOnlyWindowStore` and `CompositeReadOnlySessionStore` to fetch across all stores (was previously just looking in the first store) Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#3685 from dguy/kafka-5668
1 parent 3457c47 commit 75c78e9

File tree

9 files changed

+197
-121
lines changed

9 files changed

+197
-121
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.streams.KeyValue;
20+
import org.apache.kafka.streams.state.KeyValueIterator;
21+
22+
import java.util.Iterator;
23+
import java.util.NoSuchElementException;
24+
25+
class CompositeKeyValueIterator<K, V, StoreType> implements KeyValueIterator<K, V> {
26+
27+
private final Iterator<StoreType> storeIterator;
28+
private final NextIteratorFunction<K, V, StoreType> nextIteratorFunction;
29+
30+
private KeyValueIterator<K, V> current;
31+
32+
CompositeKeyValueIterator(final Iterator<StoreType> underlying,
33+
final NextIteratorFunction<K, V, StoreType> nextIteratorFunction) {
34+
this.storeIterator = underlying;
35+
this.nextIteratorFunction = nextIteratorFunction;
36+
}
37+
38+
@Override
39+
public void close() {
40+
if (current != null) {
41+
current.close();
42+
current = null;
43+
}
44+
}
45+
46+
@Override
47+
public K peekNextKey() {
48+
throw new UnsupportedOperationException("peekNextKey not supported");
49+
}
50+
51+
@Override
52+
public boolean hasNext() {
53+
while ((current == null || !current.hasNext())
54+
&& storeIterator.hasNext()) {
55+
close();
56+
current = nextIteratorFunction.apply(storeIterator.next());
57+
}
58+
return current != null && current.hasNext();
59+
}
60+
61+
62+
@Override
63+
public KeyValue<K, V> next() {
64+
if (!hasNext()) {
65+
throw new NoSuchElementException();
66+
}
67+
return current.next();
68+
}
69+
70+
@Override
71+
public void remove() {
72+
throw new UnsupportedOperationException("Remove not supported");
73+
}
74+
}

streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java

Lines changed: 4 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616
*/
1717
package org.apache.kafka.streams.state.internals;
1818

19-
import org.apache.kafka.streams.KeyValue;
2019
import org.apache.kafka.streams.errors.InvalidStateStoreException;
2120
import org.apache.kafka.streams.state.KeyValueIterator;
2221
import org.apache.kafka.streams.state.QueryableStoreType;
2322
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
2423

25-
import java.util.Iterator;
2624
import java.util.List;
27-
import java.util.NoSuchElementException;
2825
import java.util.Objects;
2926

3027
/**
@@ -71,7 +68,7 @@ public V get(final K key) {
7168
public KeyValueIterator<K, V> range(final K from, final K to) {
7269
Objects.requireNonNull(from);
7370
Objects.requireNonNull(to);
74-
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
71+
final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
7572
@Override
7673
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
7774
try {
@@ -82,12 +79,12 @@ public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
8279
}
8380
};
8481
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
85-
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
82+
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
8683
}
8784

8885
@Override
8986
public KeyValueIterator<K, V> all() {
90-
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
87+
final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
9188
@Override
9289
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
9390
try {
@@ -98,7 +95,7 @@ public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
9895
}
9996
};
10097
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
101-
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
98+
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
10299
}
103100

104101
@Override
@@ -111,61 +108,6 @@ public long approximateNumEntries() {
111108
return total < 0 ? Long.MAX_VALUE : total;
112109
}
113110

114-
interface NextIteratorFunction<K, V> {
115111

116-
KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store);
117-
}
118-
119-
120-
private class CompositeKeyValueIterator implements KeyValueIterator<K, V> {
121-
122-
private final Iterator<ReadOnlyKeyValueStore<K, V>> storeIterator;
123-
private final NextIteratorFunction<K, V> nextIteratorFunction;
124-
125-
private KeyValueIterator<K, V> current;
126-
127-
CompositeKeyValueIterator(final Iterator<ReadOnlyKeyValueStore<K, V>> underlying,
128-
final NextIteratorFunction<K, V> nextIteratorFunction) {
129-
this.storeIterator = underlying;
130-
this.nextIteratorFunction = nextIteratorFunction;
131-
}
132-
133-
@Override
134-
public void close() {
135-
if (current != null) {
136-
current.close();
137-
current = null;
138-
}
139-
}
140-
141-
@Override
142-
public K peekNextKey() {
143-
throw new UnsupportedOperationException("peekNextKey not supported");
144-
}
145-
146-
@Override
147-
public boolean hasNext() {
148-
while ((current == null || !current.hasNext())
149-
&& storeIterator.hasNext()) {
150-
close();
151-
current = nextIteratorFunction.apply(storeIterator.next());
152-
}
153-
return current != null && current.hasNext();
154-
}
155-
156-
157-
@Override
158-
public KeyValue<K, V> next() {
159-
if (!hasNext()) {
160-
throw new NoSuchElementException();
161-
}
162-
return current.next();
163-
}
164-
165-
@Override
166-
public void remove() {
167-
throw new UnsupportedOperationException("Remove not supported");
168-
}
169-
}
170112
}
171113

streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.streams.state.ReadOnlySessionStore;
2424

2525
import java.util.List;
26+
import java.util.Objects;
2627

2728
/**
2829
* Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link
@@ -41,47 +42,41 @@ public CompositeReadOnlySessionStore(final StateStoreProvider storeProvider,
4142
this.storeName = storeName;
4243
}
4344

44-
private interface Fetcher<K, V> {
45-
KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store);
46-
}
4745

48-
private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher) {
46+
@Override
47+
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
48+
Objects.requireNonNull(key, "key can't be null");
4949
final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
5050
for (final ReadOnlySessionStore<K, V> store : stores) {
5151
try {
52-
final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store);
52+
final KeyValueIterator<Windowed<K>, V> result = store.fetch(key);
5353
if (!result.hasNext()) {
5454
result.close();
5555
} else {
5656
return result;
5757
}
5858
} catch (final InvalidStateStoreException ise) {
5959
throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" +
60-
" and may have been migrated to another instance; " +
61-
"please re-discover its location from the state metadata.");
60+
" and may have been migrated to another instance; " +
61+
"please re-discover its location from the state metadata.");
6262
}
6363
}
6464
return KeyValueIterators.emptyIterator();
6565
}
6666

67-
68-
@Override
69-
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
70-
return fetch(new Fetcher<K, V>() {
71-
@Override
72-
public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) {
73-
return store.fetch(key);
74-
}
75-
});
76-
}
77-
7867
@Override
7968
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
80-
return fetch(new Fetcher<K, V>() {
69+
Objects.requireNonNull(from, "from can't be null");
70+
Objects.requireNonNull(to, "to can't be null");
71+
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>>() {
8172
@Override
82-
public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) {
73+
public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlySessionStore<K, V> store) {
8374
return store.fetch(from, to);
8475
}
85-
});
76+
};
77+
return new DelegatingPeekingKeyValueIterator<>(storeName,
78+
new CompositeKeyValueIterator<>(
79+
storeProvider.stores(storeName, queryableStoreType).iterator(),
80+
nextIteratorFunction));
8681
}
8782
}

streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.streams.state.WindowStoreIterator;
2525

2626
import java.util.List;
27+
import java.util.Objects;
2728

2829
/**
2930
* Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
@@ -43,58 +44,40 @@ public CompositeReadOnlyWindowStore(final StateStoreProvider provider,
4344
this.storeName = storeName;
4445
}
4546

46-
private interface Fetcher<K, V, IteratorType extends KeyValueIterator<?, V>> {
47-
IteratorType fetch(ReadOnlyWindowStore<K, V> store);
48-
IteratorType empty();
49-
}
50-
51-
public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K, V, IteratorType> fetcher) {
47+
@Override
48+
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
49+
Objects.requireNonNull(key, "key can't be null");
5250
final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
5351
for (ReadOnlyWindowStore<K, V> windowStore : stores) {
5452
try {
55-
final IteratorType result = fetcher.fetch(windowStore);
53+
final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
5654
if (!result.hasNext()) {
5755
result.close();
5856
} else {
5957
return result;
6058
}
6159
} catch (InvalidStateStoreException e) {
6260
throw new InvalidStateStoreException(
63-
"State store is not available anymore and may have been migrated to another instance; " +
64-
"please re-discover its location from the state metadata.");
61+
"State store is not available anymore and may have been migrated to another instance; " +
62+
"please re-discover its location from the state metadata.");
6563
}
6664
}
67-
68-
return fetcher.empty();
69-
}
70-
71-
@Override
72-
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
73-
return fetch(new Fetcher<K, V, WindowStoreIterator<V>>() {
74-
@Override
75-
public WindowStoreIterator<V> fetch(ReadOnlyWindowStore<K, V> store) {
76-
return store.fetch(key, timeFrom, timeTo);
77-
}
78-
79-
@Override
80-
public WindowStoreIterator<V> empty() {
81-
return KeyValueIterators.emptyWindowStoreIterator();
82-
}
83-
});
65+
return KeyValueIterators.emptyWindowStoreIterator();
8466
}
8567

8668
@Override
8769
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
88-
return fetch(new Fetcher<K, V, KeyValueIterator<Windowed<K>, V>>() {
89-
@Override
90-
public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlyWindowStore<K, V> store) {
91-
return store.fetch(from, to, timeFrom, timeTo);
92-
}
93-
70+
Objects.requireNonNull(from, "from can't be null");
71+
Objects.requireNonNull(to, "to can't be null");
72+
final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
9473
@Override
95-
public KeyValueIterator<Windowed<K>, V> empty() {
96-
return KeyValueIterators.emptyIterator();
74+
public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
75+
return store.fetch(from, to, timeFrom, timeFrom);
9776
}
98-
});
77+
};
78+
return new DelegatingPeekingKeyValueIterator<>(storeName,
79+
new CompositeKeyValueIterator<>(
80+
provider.stores(storeName, windowStoreType).iterator(),
81+
nextIteratorFunction));
9982
}
10083
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.streams.state.KeyValueIterator;
20+
21+
interface NextIteratorFunction<K, V, StoreType> {
22+
23+
KeyValueIterator<K, V> apply(final StoreType store);
24+
}

0 commit comments

Comments
 (0)