1616 */
1717package org .apache .kafka .tiered .storage .integration ;
1818
19+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
20+ import org .apache .kafka .clients .consumer .GroupProtocol ;
21+ import org .apache .kafka .common .test .ClusterInstance ;
22+ import org .apache .kafka .common .test .api .ClusterConfig ;
23+ import org .apache .kafka .common .test .api .ClusterTemplate ;
24+ import org .apache .kafka .common .test .api .Type ;
25+ import org .apache .kafka .tiered .storage .TieredStorageTestAction ;
1926import org .apache .kafka .tiered .storage .TieredStorageTestBuilder ;
20- import org .apache .kafka .tiered .storage .TieredStorageTestHarness ;
27+ import org .apache .kafka .tiered .storage .TieredStorageTestContext ;
2128import org .apache .kafka .tiered .storage .specs .KeyValueSpec ;
2229
2330import java .util .List ;
31+ import java .util .Locale ;
2432import java .util .Map ;
33+ import java .util .Set ;
2534
2635import static org .apache .kafka .common .utils .Utils .mkEntry ;
2736import static org .apache .kafka .common .utils .Utils .mkMap ;
2837import static org .apache .kafka .server .log .remote .storage .LocalTieredStorageEvent .EventType .DELETE_SEGMENT ;
38+ import static org .apache .kafka .tiered .storage .utils .TieredStorageTestUtils .createServerPropsForRemoteStorage ;
2939
30- public final class DeleteSegmentsDueToLogStartOffsetBreachTest extends TieredStorageTestHarness {
40+ public final class DeleteSegmentsDueToLogStartOffsetBreachTest {
3141
32- @ Override
33- public int brokerCount () {
34- return 2 ;
42+ private static final int BROKER_COUNT = 3 ;
43+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5 ;
44+
45+ private static List <ClusterConfig > clusterConfig () {
46+ return List .of (ClusterConfig .defaultBuilder ()
47+ .setTypes (Set .of (Type .KRAFT ))
48+ .setBrokers (BROKER_COUNT )
49+ .setServerProperties (createServerPropsForRemoteStorage (
50+ DeleteSegmentsDueToLogStartOffsetBreachTest .class .getSimpleName ().toLowerCase (Locale .ROOT ),
51+ BROKER_COUNT ,
52+ NUM_REMOTE_LOG_METADATA_PARTITIONS ))
53+ .build ());
54+ }
55+
56+ @ ClusterTemplate ("clusterConfig" )
57+ public void testDeleteSegmentsDueToLogStartOffsetBreachWithClassicGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
58+ executeDeleteSegmentsDueToLogStartOffsetBreachTest (clusterInstance , GroupProtocol .CLASSIC );
3559 }
3660
37- @ Override
38- protected void writeTestSpecifications (TieredStorageTestBuilder builder ) {
61+ @ ClusterTemplate ("clusterConfig" )
62+ public void testDeleteSegmentsDueToLogStartOffsetBreachWithConsumerGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
63+ executeDeleteSegmentsDueToLogStartOffsetBreachTest (clusterInstance , GroupProtocol .CONSUMER );
64+ }
65+
66+ private void executeDeleteSegmentsDueToLogStartOffsetBreachTest (ClusterInstance clusterInstance ,
67+ GroupProtocol groupProtocol ) throws Exception {
3968 final int broker0 = 0 ;
4069 final int broker1 = 1 ;
4170 final String topicA = "topicA" ;
@@ -50,6 +79,8 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
5079 final long beforeOffset = 3L ;
5180 final long beforeOffset1 = 7L ;
5281
82+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder ();
83+
5384 // Create topicA with 1 partition and 2 RF
5485 builder .createTopic (topicA , partitionCount , replicationFactor , maxBatchCountPerSegment , replicaAssignment ,
5586 enableRemoteLogStorage )
@@ -87,5 +118,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
87118 // consume from the topic with fetch-offset 7 to read data from local and remote storage
88119 .expectFetchFromTieredStorage (broker1 , topicA , p0 , 1 )
89120 .consume (topicA , p0 , 7L , 3 , 1 );
121+
122+ Map <String , Object > extraConsumerProps = Map .of (
123+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ().toLowerCase (Locale .ROOT )
124+ );
125+ try (TieredStorageTestContext context = new TieredStorageTestContext (clusterInstance , extraConsumerProps )) {
126+ try {
127+ for (TieredStorageTestAction action : builder .complete ()) {
128+ action .execute (context );
129+ }
130+ } finally {
131+ context .printReport (System .out );
132+ }
133+ }
90134 }
91135}
0 commit comments