1717package org .apache .kafka .tiered .storage .integration ;
1818
1919import org .apache .kafka .clients .admin .OffsetSpec ;
20+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
21+ import org .apache .kafka .clients .consumer .GroupProtocol ;
22+ import org .apache .kafka .common .test .ClusterInstance ;
23+ import org .apache .kafka .common .test .api .ClusterConfig ;
24+ import org .apache .kafka .common .test .api .ClusterTemplate ;
25+ import org .apache .kafka .common .test .api .Type ;
2026import org .apache .kafka .common .utils .MockTime ;
2127import org .apache .kafka .common .utils .Time ;
2228import org .apache .kafka .storage .internals .log .EpochEntry ;
29+ import org .apache .kafka .tiered .storage .TieredStorageTestAction ;
2330import org .apache .kafka .tiered .storage .TieredStorageTestBuilder ;
24- import org .apache .kafka .tiered .storage .TieredStorageTestHarness ;
31+ import org .apache .kafka .tiered .storage .TieredStorageTestContext ;
2532import org .apache .kafka .tiered .storage .specs .KeyValueSpec ;
2633
27- import org .junit .jupiter .params .ParameterizedTest ;
28- import org .junit .jupiter .params .provider .MethodSource ;
29-
3034import java .util .List ;
35+ import java .util .Locale ;
3136import java .util .Map ;
37+ import java .util .Set ;
3238
3339import static org .apache .kafka .common .record .internal .RecordBatch .NO_PARTITION_LEADER_EPOCH ;
3440import static org .apache .kafka .common .utils .Utils .mkEntry ;
3541import static org .apache .kafka .common .utils .Utils .mkMap ;
3642import static org .apache .kafka .server .log .remote .storage .LocalTieredStorageEvent .EventType .DELETE_SEGMENT ;
43+ import static org .apache .kafka .tiered .storage .utils .TieredStorageTestUtils .createServerPropsForRemoteStorage ;
44+
45+ public final class ListOffsetsTest {
46+
47+ private static final int BROKER_COUNT = 3 ;
48+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5 ;
3749
38- public class ListOffsetsTest extends TieredStorageTestHarness {
39- @ Override
40- public int brokerCount () {
41- return 2 ;
50+ private static List <ClusterConfig > clusterConfig () {
51+ return List .of (ClusterConfig .defaultBuilder ()
52+ .setTypes (Set .of (Type .KRAFT ))
53+ .setBrokers (BROKER_COUNT )
54+ .setServerProperties (createServerPropsForRemoteStorage (
55+ ListOffsetsTest .class .getSimpleName ().toLowerCase (Locale .ROOT ),
56+ BROKER_COUNT ,
57+ NUM_REMOTE_LOG_METADATA_PARTITIONS ))
58+ .build ());
4259 }
4360
44- /**
45- * We are running this test only for the Kraft mode, since ZK mode is deprecated now. Note that:
46- * 1. In Kraft mode, the leader-epoch gets bumped only for leader-election (0 -> 1) and not for reassignment.
47- */
48- @ ParameterizedTest (name = "{displayName}.groupProtocol={0}" )
49- @ MethodSource ("getTestGroupProtocolParametersAll" )
50- @ Override
51- public void executeTieredStorageTest (String groupProtocol ) {
52- super .executeTieredStorageTest (groupProtocol );
61+ @ ClusterTemplate ("clusterConfig" )
62+ public void testListOffsetsWithClassicGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
63+ executeListOffsetsTest (clusterInstance , GroupProtocol .CLASSIC );
5364 }
5465
55- @ Override
56- protected void writeTestSpecifications (TieredStorageTestBuilder builder ) {
66+ @ ClusterTemplate ("clusterConfig" )
67+ public void testListOffsetsWithConsumerGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
68+ executeListOffsetsTest (clusterInstance , GroupProtocol .CONSUMER );
69+ }
70+
71+ private void executeListOffsetsTest (ClusterInstance clusterInstance , GroupProtocol groupProtocol ) throws Exception {
5772 final int broker0 = 0 ;
5873 final int broker1 = 1 ;
5974 final String topicA = "topicA" ;
@@ -62,6 +77,7 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
6277 final long timestamp = time .milliseconds ();
6378 final Map <Integer , List <Integer >> assignment = mkMap (mkEntry (p0 , List .of (broker0 , broker1 )));
6479
80+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder ();
6581 builder
6682 .createTopic (topicA , 1 , 2 , 2 , assignment , true )
6783 // send records to partition 0 and expect the first segment to be offloaded
@@ -120,5 +136,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
120136 .expectListOffsets (topicA , p0 , OffsetSpec .earliestLocal (), new EpochEntry (1 , 4 ))
121137 .expectListOffsets (topicA , p0 , OffsetSpec .latestTiered (), new EpochEntry (NO_PARTITION_LEADER_EPOCH , 3 ))
122138 .expectListOffsets (topicA , p0 , OffsetSpec .latest (), new EpochEntry (1 , 6 ));
139+
140+ Map <String , Object > extraConsumerProps = Map .of (
141+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ().toLowerCase (Locale .ROOT )
142+ );
143+ try (TieredStorageTestContext context = new TieredStorageTestContext (clusterInstance , extraConsumerProps )) {
144+ try {
145+ for (TieredStorageTestAction action : builder .complete ()) {
146+ action .execute (context );
147+ }
148+ } finally {
149+ context .printReport (System .out );
150+ }
151+ }
123152 }
124153}
0 commit comments