1616 */
1717package org .apache .kafka .tiered .storage .integration ;
1818
19+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
20+ import org .apache .kafka .clients .consumer .GroupProtocol ;
21+ import org .apache .kafka .common .test .ClusterInstance ;
22+ import org .apache .kafka .common .test .api .ClusterConfig ;
23+ import org .apache .kafka .common .test .api .ClusterTemplate ;
24+ import org .apache .kafka .common .test .api .Type ;
25+ import org .apache .kafka .tiered .storage .TieredStorageTestAction ;
1926import org .apache .kafka .tiered .storage .TieredStorageTestBuilder ;
20- import org .apache .kafka .tiered .storage .TieredStorageTestHarness ;
27+ import org .apache .kafka .tiered .storage .TieredStorageTestContext ;
2128import org .apache .kafka .tiered .storage .specs .KeyValueSpec ;
2229
2330import java .util .List ;
31+ import java .util .Locale ;
2432import java .util .Map ;
33+ import java .util .Set ;
2534
2635import static org .apache .kafka .common .utils .Utils .mkEntry ;
2736import static org .apache .kafka .common .utils .Utils .mkMap ;
2837import static org .apache .kafka .server .log .remote .storage .LocalTieredStorageEvent .EventType .DELETE_SEGMENT ;
38+ import static org .apache .kafka .tiered .storage .utils .TieredStorageTestUtils .createServerPropsForRemoteStorage ;
2939
30- public final class DeleteTopicTest extends TieredStorageTestHarness {
40+ public final class DeleteTopicTest {
3141
32- @ Override
33- public int brokerCount () {
34- return 2 ;
42+ private static final int BROKER_COUNT = 3 ;
43+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5 ;
44+
45+ private static List <ClusterConfig > clusterConfig () {
46+ return List .of (ClusterConfig .defaultBuilder ()
47+ .setTypes (Set .of (Type .KRAFT ))
48+ .setBrokers (BROKER_COUNT )
49+ .setServerProperties (createServerPropsForRemoteStorage (
50+ DeleteTopicTest .class .getSimpleName ().toLowerCase (Locale .ROOT ),
51+ BROKER_COUNT ,
52+ NUM_REMOTE_LOG_METADATA_PARTITIONS ))
53+ .build ());
54+ }
55+
56+ @ ClusterTemplate ("clusterConfig" )
57+ public void testDeleteTopicWithClassicGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
58+ executeDeleteTopicTest (clusterInstance , GroupProtocol .CLASSIC );
3559 }
3660
37- @ Override
38- protected void writeTestSpecifications (TieredStorageTestBuilder builder ) {
61+ @ ClusterTemplate ("clusterConfig" )
62+ public void testDeleteTopicWithConsumerGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
63+ executeDeleteTopicTest (clusterInstance , GroupProtocol .CONSUMER );
64+ }
65+
66+ private static void executeDeleteTopicTest (ClusterInstance clusterInstance ,
67+ GroupProtocol groupProtocol ) throws Exception {
3968 final int broker0 = 0 ;
4069 final int broker1 = 1 ;
4170 final String topicA = "topicA" ;
@@ -50,6 +79,7 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
5079 mkEntry (p1 , List .of (broker1 , broker0 ))
5180 );
5281
82+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder ();
5383 builder
5484 .createTopic (topicA , partitionCount , replicationFactor , maxBatchCountPerSegment ,
5585 assignment , enableRemoteLogStorage )
@@ -71,5 +101,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
71101 .deleteTopic (List .of (topicA ))
72102 .expectEmptyRemoteStorage (topicA , p0 )
73103 .expectEmptyRemoteStorage (topicA , p1 );
104+
105+ Map <String , Object > extraConsumerProps = Map .of (
106+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ().toLowerCase (Locale .ROOT )
107+ );
108+ try (TieredStorageTestContext context = new TieredStorageTestContext (clusterInstance , extraConsumerProps )) {
109+ try {
110+ for (TieredStorageTestAction action : builder .complete ()) {
111+ action .execute (context );
112+ }
113+ } finally {
114+ context .printReport (System .out );
115+ }
116+ }
74117 }
75- }
118+ }
0 commit comments