Skip to content

Comments

KAFKA-20158: Adding MeteredSessionStore and tests (3/N)#21513

Open
bbejeck wants to merge 11 commits intoapache:trunkfrom
bbejeck:KAFKA-20158_session_store_w_headers_part_3
Open

KAFKA-20158: Adding MeteredSessionStore and tests (3/N)#21513
bbejeck wants to merge 11 commits intoapache:trunkfrom
bbejeck:KAFKA-20158_session_store_w_headers_part_3

Conversation

@bbejeck
Copy link
Member

@bbejeck bbejeck commented Feb 18, 2026

This PR adds MeteredSessionStoreWithHeaders and a unit test for
supporting KIP-1271. This is a stacked PR only review
c266f45

@bbejeck bbejeck changed the title KAFKA-20158: Adding MeteredSessionStore and tests KAFKA-20158: Adding MeteredSessionStore and tests (3/N) Feb 18, 2026
Copy link
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bbejeck. I made a pass. If inheritance and therefore refactoring is possible, then reviewing would be easier.

@bbejeck bbejeck force-pushed the KAFKA-20158_session_store_w_headers_part_3 branch from f948830 to 85a1bb6 Compare February 19, 2026 20:56
final Time time) {
super(inner, metricsScope, keySerde, createAggregationWithHeadersSerde(aggSerde), time);
this.rawAggSerde = aggSerde;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it could be

MeteredSessionStoreWithHeaders(final SessionStore<Bytes, byte[]> inner,
                                   final String metricsScope,
                                   final Serde<K> keySerde,
                                   final Serde<AggregationWithHeaders<AGG>> aggSerde,
                                   final Time time) {
        super(inner, metricsScope, keySerde, aggSerde, time);
        this.rawAggSerde = aggSerde;
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's doable, then we dont need the properties and most of the methods except the write ones. Of course we must change MeteredSessionStore to send empty headers to keyBytes for read methods as well.

final Time time) {
super(inner, metricsScope, keySerde, createAggregationWithHeadersSerde(aggSerde), time);
this.rawAggSerde = aggSerde;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's doable, then we dont need the properties and most of the methods except the write ones. Of course we must change MeteredSessionStore to send empty headers to keyBytes for read methods as well.

protected Sensor restoreSensor;

protected final LongAdder numOpenIterators = new LongAdder();
protected final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's send empty headers to keyBytes or any de/serializer caller of this class as we are depcrating the old ones with no headers an input. Thanks

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR


@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, AggregationWithHeaders<AGG>> listener,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, AggregationWithHeaders<AGG>> listener,
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, final AggregationWithHeaders<AGG>> listener,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky the CacheFlushListener is decalred final the other part there is within the generics definition.

@bbejeck bbejeck force-pushed the KAFKA-20158_session_store_w_headers_part_3 branch from 71c27e9 to c266f45 Compare February 20, 2026 20:38
@bbejeck
Copy link
Member Author

bbejeck commented Feb 20, 2026

@aliehsaeedii @frankvicky all comments addressed - I took another pass and was able to clean things up quite a bit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants