Skip to content

Commit 9834b51

Browse files
authored
KAFKA-20134: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods (apache#21705)
Fixes a bug where `MeteredTimestampedWindowStoreWithHeaders` iterator methods fail when key deserializers require headers. The class inherits iterator-returning methods from `MeteredWindowStore`: - `fetchAll(long, long)` / `backwardFetchAll(long, long)` - `all()` / `backwardAll()` - `fetch(K, K, long, long)` / `backwardFetch(K, K, long, long)` These methods use the deprecated `serdes::keyFrom(rawKey)` which provides empty headers, causing deserialization failures when headers are required. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent d7fed9d commit 9834b51

2 files changed

Lines changed: 449 additions & 0 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.serialization.Serde;
2222
import org.apache.kafka.common.utils.Bytes;
2323
import org.apache.kafka.common.utils.Time;
24+
import org.apache.kafka.streams.KeyValue;
2425
import org.apache.kafka.streams.errors.ProcessorStateException;
2526
import org.apache.kafka.streams.kstream.Windowed;
2627
import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -282,6 +283,132 @@ private <R> QueryResult<R> runWindowRangeQuery(final WindowRangeQuery<K, ValueTi
282283
return result;
283284
}
284285

286+
@Override
287+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final K keyFrom,
288+
final K keyTo,
289+
final long timeFrom,
290+
final long timeTo) {
291+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
292+
wrapped().fetch(
293+
keyBytes(keyFrom, new RecordHeaders()),
294+
keyBytes(keyTo, new RecordHeaders()),
295+
timeFrom,
296+
timeTo)
297+
);
298+
}
299+
300+
@Override
301+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> backwardFetch(final K keyFrom,
302+
final K keyTo,
303+
final long timeFrom,
304+
final long timeTo) {
305+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
306+
wrapped().backwardFetch(
307+
keyBytes(keyFrom, new RecordHeaders()),
308+
keyBytes(keyTo, new RecordHeaders()),
309+
timeFrom,
310+
timeTo)
311+
);
312+
}
313+
314+
@Override
315+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetchAll(final long timeFrom, final long timeTo) {
316+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
317+
wrapped().fetchAll(timeFrom, timeTo)
318+
);
319+
}
320+
321+
@Override
322+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> backwardFetchAll(final long timeFrom, final long timeTo) {
323+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
324+
wrapped().backwardFetchAll(timeFrom, timeTo)
325+
);
326+
}
327+
328+
@Override
329+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> all() {
330+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
331+
wrapped().all()
332+
);
333+
}
334+
335+
@Override
336+
public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> backwardAll() {
337+
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
338+
wrapped().backwardAll()
339+
);
340+
}
341+
342+
private class MeteredTimestampedWindowStoreWithHeadersKeyValueIterator
343+
implements KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>, MeteredIterator {
344+
345+
private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
346+
private final long startNs;
347+
private final long startTimestampMs;
348+
private KeyValue<Windowed<K>, ValueTimestampHeaders<V>> cachedNext;
349+
350+
private MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
351+
final KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
352+
this.iter = iter;
353+
this.startNs = time.nanoseconds();
354+
this.startTimestampMs = time.milliseconds();
355+
numOpenIterators.increment();
356+
openIterators.add(this);
357+
}
358+
359+
@Override
360+
public long startTimestamp() {
361+
return this.startTimestampMs;
362+
}
363+
364+
@Override
365+
public boolean hasNext() {
366+
return cachedNext != null || iter.hasNext();
367+
}
368+
369+
@Override
370+
public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
371+
if (cachedNext != null) {
372+
final KeyValue<Windowed<K>, ValueTimestampHeaders<V>> result = cachedNext;
373+
cachedNext = null;
374+
return result;
375+
}
376+
377+
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
378+
379+
if (next == null) {
380+
return null;
381+
}
382+
383+
final ValueTimestampHeaders<V> valueTimestampHeaders = serdes.valueFrom(next.value, new RecordHeaders());
384+
final Headers headers = valueTimestampHeaders != null ? valueTimestampHeaders.headers() : new RecordHeaders();
385+
final K key = serdes.keyFrom(next.key.key().get(), headers);
386+
final Windowed<K> windowedKey = new Windowed<>(key, next.key.window());
387+
return KeyValue.pair(windowedKey, valueTimestampHeaders);
388+
}
389+
390+
@Override
391+
public void close() {
392+
try {
393+
iter.close();
394+
} finally {
395+
final long duration = time.nanoseconds() - startNs;
396+
fetchSensor.record(duration);
397+
iteratorDurationSensor.record(duration);
398+
numOpenIterators.decrement();
399+
openIterators.remove(this);
400+
}
401+
}
402+
403+
@Override
404+
public Windowed<K> peekNextKey() {
405+
if (cachedNext == null) {
406+
cachedNext = next();
407+
}
408+
return cachedNext == null ? null : cachedNext.key;
409+
}
410+
}
411+
285412
private boolean isUnderlyingStoreTimestamped() {
286413
Object store = wrapped();
287414
do {

0 commit comments

Comments
 (0)