33
33
34
34
import java .util .List ;
35
35
36
- class CachingWindowStore <K , V > extends WrappedStateStore .AbstractStateStore implements WindowStore <K , V >, CachedStateStore <Windowed <K >, V > {
36
+ class CachingWindowStore <K , V > extends WrappedStateStore .AbstractStateStore implements WindowStore <Bytes , byte [] >, CachedStateStore <Windowed <K >, V > {
37
37
38
38
39
39
private final WindowStore <Bytes , byte []> underlying ;
@@ -47,6 +47,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
47
47
private ThreadCache cache ;
48
48
private InternalProcessorContext context ;
49
49
private StateSerdes <K , V > serdes ;
50
+ private StateSerdes <Bytes , byte []> bytesSerdes ;
50
51
private CacheFlushListener <Windowed <K >, V > flushListener ;
51
52
private final SegmentedCacheFunction cacheFunction ;
52
53
@@ -73,10 +74,14 @@ public void init(final ProcessorContext context, final StateStore root) {
73
74
@ SuppressWarnings ("unchecked" )
74
75
private void initInternal (final ProcessorContext context ) {
75
76
this .context = (InternalProcessorContext ) context ;
76
- serdes = new StateSerdes <>(ProcessorStateManager .storeChangelogTopic (context .applicationId (), underlying .name ()),
77
+ final String topic = ProcessorStateManager .storeChangelogTopic (context .applicationId (), underlying .name ());
78
+ serdes = new StateSerdes <>(topic ,
77
79
keySerde == null ? (Serde <K >) context .keySerde () : keySerde ,
78
80
valueSerde == null ? (Serde <V >) context .valueSerde () : valueSerde );
79
81
82
+ bytesSerdes = new StateSerdes <>(topic ,
83
+ Serdes .Bytes (),
84
+ Serdes .ByteArray ());
80
85
name = context .taskId () + "-" + underlying .name ();
81
86
cache = this .context .getCache ();
82
87
@@ -131,72 +136,67 @@ public void close() {
131
136
}
132
137
133
138
@ Override
134
- public synchronized void put (final K key , final V value ) {
139
+ public synchronized void put (final Bytes key , final byte [] value ) {
135
140
put (key , value , context .timestamp ());
136
141
}
137
142
138
143
@ Override
139
- public synchronized void put (final K key , final V value , final long timestamp ) {
144
+ public synchronized void put (final Bytes key , final byte [] value , final long timestamp ) {
140
145
// since this function may not access the underlying inner store, we need to validate
141
146
// if store is open outside as well.
142
147
validateStoreOpen ();
143
148
144
- final Bytes keyBytes = WindowStoreUtils .toBinaryKey (key , timestamp , 0 , serdes );
145
- final LRUCacheEntry entry = new LRUCacheEntry (serdes . rawValue ( value ) , true , context .offset (),
149
+ final Bytes keyBytes = WindowStoreUtils .toBinaryKey (key , timestamp , 0 , bytesSerdes );
150
+ final LRUCacheEntry entry = new LRUCacheEntry (value , true , context .offset (),
146
151
timestamp , context .partition (), context .topic ());
147
152
cache .put (name , cacheFunction .cacheKey (keyBytes ), entry );
148
153
}
149
154
150
155
@ Override
151
- public synchronized WindowStoreIterator <V > fetch (final K key , final long timeFrom , final long timeTo ) {
156
+ public synchronized WindowStoreIterator <byte [] > fetch (final Bytes key , final long timeFrom , final long timeTo ) {
152
157
// since this function may not access the underlying inner store, we need to validate
153
158
// if store is open outside as well.
154
159
validateStoreOpen ();
155
160
156
- final Bytes keyBytes = Bytes .wrap (serdes .rawKey (key ));
157
- final WindowStoreIterator <byte []> underlyingIterator = underlying .fetch (keyBytes , timeFrom , timeTo );
161
+ final WindowStoreIterator <byte []> underlyingIterator = underlying .fetch (key , timeFrom , timeTo );
158
162
159
- final Bytes cacheKeyFrom = cacheFunction .cacheKey (keySchema .lowerRangeFixedSize (keyBytes , timeFrom ));
160
- final Bytes cacheKeyTo = cacheFunction .cacheKey (keySchema .upperRangeFixedSize (keyBytes , timeTo ));
163
+ final Bytes cacheKeyFrom = cacheFunction .cacheKey (keySchema .lowerRangeFixedSize (key , timeFrom ));
164
+ final Bytes cacheKeyTo = cacheFunction .cacheKey (keySchema .upperRangeFixedSize (key , timeTo ));
161
165
final ThreadCache .MemoryLRUCacheBytesIterator cacheIterator = cache .range (name , cacheKeyFrom , cacheKeyTo );
162
166
163
- final HasNextCondition hasNextCondition = keySchema .hasNextCondition (keyBytes ,
164
- keyBytes ,
167
+ final HasNextCondition hasNextCondition = keySchema .hasNextCondition (key ,
168
+ key ,
165
169
timeFrom ,
166
170
timeTo );
167
171
final PeekingKeyValueIterator <Bytes , LRUCacheEntry > filteredCacheIterator = new FilteredCacheIterator (
168
172
cacheIterator , hasNextCondition , cacheFunction
169
173
);
170
174
171
- return new MergedSortedCacheWindowStoreIterator <>(filteredCacheIterator ,
172
- underlyingIterator ,
173
- new StateSerdes <>(serdes .topic (), Serdes .Long (), serdes .valueSerde ()));
175
+ return new MergedSortedCacheWindowStoreIterator (filteredCacheIterator , underlyingIterator );
174
176
}
175
177
176
178
@ Override
177
- public KeyValueIterator <Windowed <K >, V > fetch (final K from , final K to , final long timeFrom , final long timeTo ) {
179
+ public KeyValueIterator <Windowed <Bytes >, byte [] > fetch (final Bytes from , final Bytes to , final long timeFrom , final long timeTo ) {
178
180
// since this function may not access the underlying inner store, we need to validate
179
181
// if store is open outside as well.
180
182
validateStoreOpen ();
181
183
182
- final Bytes keyFromBytes = Bytes .wrap (serdes .rawKey (from ));
183
- final Bytes keyToBytes = Bytes .wrap (serdes .rawKey (to ));
184
- final KeyValueIterator <Windowed <Bytes >, byte []> underlyingIterator = underlying .fetch (keyFromBytes , keyToBytes , timeFrom , timeTo );
184
+ final KeyValueIterator <Windowed <Bytes >, byte []> underlyingIterator = underlying .fetch (from , to , timeFrom , timeTo );
185
185
186
- final Bytes cacheKeyFrom = cacheFunction .cacheKey (keySchema .lowerRange (keyFromBytes , timeFrom ));
187
- final Bytes cacheKeyTo = cacheFunction .cacheKey (keySchema .upperRange (keyToBytes , timeTo ));
186
+ final Bytes cacheKeyFrom = cacheFunction .cacheKey (keySchema .lowerRange (from , timeFrom ));
187
+ final Bytes cacheKeyTo = cacheFunction .cacheKey (keySchema .upperRange (to , timeTo ));
188
188
final ThreadCache .MemoryLRUCacheBytesIterator cacheIterator = cache .range (name , cacheKeyFrom , cacheKeyTo );
189
189
190
- final HasNextCondition hasNextCondition = keySchema .hasNextCondition (keyFromBytes ,
191
- keyToBytes ,
190
+ final HasNextCondition hasNextCondition = keySchema .hasNextCondition (from ,
191
+ to ,
192
192
timeFrom ,
193
193
timeTo );
194
194
final PeekingKeyValueIterator <Bytes , LRUCacheEntry > filteredCacheIterator = new FilteredCacheIterator (cacheIterator , hasNextCondition , cacheFunction );
195
195
196
- return new MergedSortedCacheWindowStoreKeyValueIterator <> (
196
+ return new MergedSortedCacheWindowStoreKeyValueIterator (
197
197
filteredCacheIterator ,
198
198
underlyingIterator ,
199
- serdes ,
199
+ bytesSerdes ,
200
200
windowSize ,
201
201
cacheFunction
202
202
);
0 commit comments