Skip to content

Commit ee8e934

Browse files
committed
KAFKA-5689; Add MeteredWindowStore and refactor store hierarchy
Add MeteredWindowStore and ChangeLoggingWindowBytesStore. Refactor Store hierarchy such that Metered is always the outermost store Do serialization in MeteredWindowStore Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#3692 from dguy/kafka-5689
1 parent 0b0819b commit ee8e934

14 files changed

+769
-175
lines changed

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
import java.util.List;
3535

36-
class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
36+
class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
3737

3838

3939
private final WindowStore<Bytes, byte[]> underlying;
@@ -47,6 +47,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
4747
private ThreadCache cache;
4848
private InternalProcessorContext context;
4949
private StateSerdes<K, V> serdes;
50+
private StateSerdes<Bytes, byte[]> bytesSerdes;
5051
private CacheFlushListener<Windowed<K>, V> flushListener;
5152
private final SegmentedCacheFunction cacheFunction;
5253

@@ -73,10 +74,14 @@ public void init(final ProcessorContext context, final StateStore root) {
7374
@SuppressWarnings("unchecked")
7475
private void initInternal(final ProcessorContext context) {
7576
this.context = (InternalProcessorContext) context;
76-
serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
77+
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name());
78+
serdes = new StateSerdes<>(topic,
7779
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
7880
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
7981

82+
bytesSerdes = new StateSerdes<>(topic,
83+
Serdes.Bytes(),
84+
Serdes.ByteArray());
8085
name = context.taskId() + "-" + underlying.name();
8186
cache = this.context.getCache();
8287

@@ -131,72 +136,67 @@ public void close() {
131136
}
132137

133138
@Override
134-
public synchronized void put(final K key, final V value) {
139+
public synchronized void put(final Bytes key, final byte[] value) {
135140
put(key, value, context.timestamp());
136141
}
137142

138143
@Override
139-
public synchronized void put(final K key, final V value, final long timestamp) {
144+
public synchronized void put(final Bytes key, final byte[] value, final long timestamp) {
140145
// since this function may not access the underlying inner store, we need to validate
141146
// if store is open outside as well.
142147
validateStoreOpen();
143148

144-
final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
145-
final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
149+
final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, bytesSerdes);
150+
final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
146151
timestamp, context.partition(), context.topic());
147152
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
148153
}
149154

150155
@Override
151-
public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
156+
public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
152157
// since this function may not access the underlying inner store, we need to validate
153158
// if store is open outside as well.
154159
validateStoreOpen();
155160

156-
final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
157-
final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);
161+
final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(key, timeFrom, timeTo);
158162

159-
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(keyBytes, timeFrom));
160-
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(keyBytes, timeTo));
163+
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom));
164+
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
161165
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
162166

163-
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
164-
keyBytes,
167+
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key,
168+
key,
165169
timeFrom,
166170
timeTo);
167171
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(
168172
cacheIterator, hasNextCondition, cacheFunction
169173
);
170174

171-
return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
172-
underlyingIterator,
173-
new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
175+
return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator);
174176
}
175177

176178
@Override
177-
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
179+
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) {
178180
// since this function may not access the underlying inner store, we need to validate
179181
// if store is open outside as well.
180182
validateStoreOpen();
181183

182-
final Bytes keyFromBytes = Bytes.wrap(serdes.rawKey(from));
183-
final Bytes keyToBytes = Bytes.wrap(serdes.rawKey(to));
184-
final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(keyFromBytes, keyToBytes, timeFrom, timeTo);
184+
final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(from, to, timeFrom, timeTo);
185185

186-
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFromBytes, timeFrom));
187-
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyToBytes, timeTo));
186+
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
187+
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
188188
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
189189

190-
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFromBytes,
191-
keyToBytes,
190+
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from,
191+
to,
192192
timeFrom,
193193
timeTo);
194194
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
195195

196-
return new MergedSortedCacheWindowStoreKeyValueIterator<>(
196+
return new MergedSortedCacheWindowStoreKeyValueIterator(
197197
filteredCacheIterator,
198198
underlyingIterator,
199-
serdes,
199+
bytesSerdes,
200200
windowSize,
201201
cacheFunction
202202
);
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.utils.Bytes;
20+
import org.apache.kafka.streams.kstream.Windowed;
21+
import org.apache.kafka.streams.processor.ProcessorContext;
22+
import org.apache.kafka.streams.processor.StateStore;
23+
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
24+
import org.apache.kafka.streams.state.KeyValueIterator;
25+
import org.apache.kafka.streams.state.StateSerdes;
26+
import org.apache.kafka.streams.state.WindowStore;
27+
import org.apache.kafka.streams.state.WindowStoreIterator;
28+
29+
/**
30+
* Simple wrapper around a {@link SegmentedBytesStore} to support writing
31+
* updates to a changelog
32+
*/
33+
class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]> {
34+
35+
private final WindowStore<Bytes, byte[]> bytesStore;
36+
private StoreChangeLogger<Bytes, byte[]> changeLogger;
37+
private ProcessorContext context;
38+
private StateSerdes<Bytes, byte[]> innerStateSerde;
39+
40+
ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore) {
41+
super(bytesStore);
42+
this.bytesStore = bytesStore;
43+
}
44+
45+
@Override
46+
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) {
47+
return bytesStore.fetch(key, from, to);
48+
}
49+
50+
@Override
51+
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
52+
return bytesStore.fetch(keyFrom, keyTo, from, to);
53+
}
54+
55+
56+
@Override
57+
public void put(final Bytes key, final byte[] value) {
58+
put(key, value, context.timestamp());
59+
}
60+
61+
@Override
62+
public void put(final Bytes key, final byte[] value, final long timestamp) {
63+
if (key != null) {
64+
bytesStore.put(key, value, timestamp);
65+
changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, 0, innerStateSerde), value);
66+
}
67+
}
68+
69+
@Override
70+
public void init(final ProcessorContext context, final StateStore root) {
71+
this.context = context;
72+
bytesStore.init(context, root);
73+
innerStateSerde = WindowStoreUtils.getInnerStateSerde(
74+
ProcessorStateManager.storeChangelogTopic(
75+
context.applicationId(),
76+
bytesStore.name()));
77+
changeLogger = new StoreChangeLogger<>(
78+
name(),
79+
context,
80+
innerStateSerde);
81+
}
82+
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,36 @@
1919
import org.apache.kafka.common.utils.Bytes;
2020
import org.apache.kafka.streams.KeyValue;
2121
import org.apache.kafka.streams.state.KeyValueIterator;
22-
import org.apache.kafka.streams.state.StateSerdes;
2322
import org.apache.kafka.streams.state.WindowStoreIterator;
2423

2524
import static org.apache.kafka.streams.state.internals.SegmentedCacheFunction.bytesFromCacheKey;
2625

2726
/**
2827
* Merges two iterators. Assumes each of them is sorted by key
2928
*
30-
* @param <V>
3129
*/
32-
class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V, byte[]> implements WindowStoreIterator<V> {
30+
class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStoreIterator<Long, Long, byte[], byte[]> implements WindowStoreIterator<byte[]> {
3331

34-
private final StateSerdes<Long, V> serdes;
3532

3633
MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
37-
final KeyValueIterator<Long, byte[]> storeIterator,
38-
final StateSerdes<Long, V> serdes) {
34+
final KeyValueIterator<Long, byte[]> storeIterator) {
3935
super(cacheIterator, storeIterator);
40-
this.serdes = serdes;
4136
}
4237

4338
@Override
44-
public KeyValue<Long, V> deserializeStorePair(final KeyValue<Long, byte[]> pair) {
45-
return KeyValue.pair(pair.key, serdes.valueFrom(pair.value));
39+
public KeyValue<Long, byte[]> deserializeStorePair(final KeyValue<Long, byte[]> pair) {
40+
return pair;
4641
}
4742

4843
@Override
4944
Long deserializeCacheKey(final Bytes cacheKey) {
5045
byte[] binaryKey = bytesFromCacheKey(cacheKey);
51-
5246
return WindowStoreUtils.timestampFromBinaryKey(binaryKey);
5347
}
5448

5549
@Override
56-
V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
57-
return serdes.valueFrom(cacheEntry.value);
50+
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
51+
return cacheEntry.value;
5852
}
5953

6054
@Override

streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@
2323
import org.apache.kafka.streams.state.KeyValueIterator;
2424
import org.apache.kafka.streams.state.StateSerdes;
2525

26-
class MergedSortedCacheWindowStoreKeyValueIterator<K, V>
27-
extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, V, byte[]> {
26+
class MergedSortedCacheWindowStoreKeyValueIterator
27+
extends AbstractMergedSortedCacheStoreIterator<Windowed<Bytes>, Windowed<Bytes>, byte[], byte[]> {
2828

29-
private final StateSerdes<K, V> serdes;
29+
private final StateSerdes<Bytes, byte[]> serdes;
3030
private final long windowSize;
3131
private final SegmentedCacheFunction cacheFunction;
3232

3333
MergedSortedCacheWindowStoreKeyValueIterator(
3434
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator,
3535
final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator,
36-
final StateSerdes<K, V> serdes,
36+
final StateSerdes<Bytes, byte[]> serdes,
3737
final long windowSize,
3838
final SegmentedCacheFunction cacheFunction
3939
) {
@@ -44,27 +44,27 @@ class MergedSortedCacheWindowStoreKeyValueIterator<K, V>
4444
}
4545

4646
@Override
47-
Windowed<K> deserializeStoreKey(final Windowed<Bytes> key) {
48-
return new Windowed<>(serdes.keyFrom(key.key().get()), key.window());
47+
Windowed<Bytes> deserializeStoreKey(final Windowed<Bytes> key) {
48+
return key;
4949
}
5050

5151
@Override
52-
KeyValue<Windowed<K>, V> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) {
53-
return KeyValue.pair(deserializeStoreKey(pair.key), serdes.valueFrom(pair.value));
52+
KeyValue<Windowed<Bytes>, byte[]> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) {
53+
return pair;
5454
}
5555

5656
@Override
57-
Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
57+
Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
5858
byte[] binaryKey = cacheFunction.key(cacheKey).get();
5959

6060
final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
61-
final K key = WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes);
61+
final Bytes key = WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes);
6262
return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
6363
}
6464

6565
@Override
66-
V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
67-
return serdes.valueFrom(cacheEntry.value);
66+
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
67+
return cacheEntry.value;
6868
}
6969

7070
@Override

0 commit comments

Comments
 (0)