1616 */
1717package org .apache .kafka .tiered .storage .integration ;
1818
19+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
20+ import org .apache .kafka .clients .consumer .GroupProtocol ;
21+ import org .apache .kafka .common .test .ClusterInstance ;
22+ import org .apache .kafka .common .test .api .ClusterConfig ;
23+ import org .apache .kafka .common .test .api .ClusterTemplate ;
24+ import org .apache .kafka .common .test .api .Type ;
25+ import org .apache .kafka .tiered .storage .TieredStorageTestAction ;
1926import org .apache .kafka .tiered .storage .TieredStorageTestBuilder ;
20- import org .apache .kafka .tiered .storage .TieredStorageTestHarness ;
27+ import org .apache .kafka .tiered .storage .TieredStorageTestContext ;
2128import org .apache .kafka .tiered .storage .specs .KeyValueSpec ;
2229
2330import java .util .List ;
31+ import java .util .Locale ;
32+ import java .util .Map ;
33+ import java .util .Set ;
2434
2535import static org .apache .kafka .common .utils .Utils .mkEntry ;
2636import static org .apache .kafka .common .utils .Utils .mkMap ;
37+ import static org .apache .kafka .tiered .storage .utils .TieredStorageTestUtils .createServerPropsForRemoteStorage ;
2738
28- public final class AlterLogDirTest extends TieredStorageTestHarness {
39+ public final class AlterLogDirTest {
2940
30- @ Override
31- public int brokerCount () {
32- return 2 ;
41+ private static final int BROKER_COUNT = 3 ;
42+
43+ private static List <ClusterConfig > clusterConfig () {
44+ return List .of (ClusterConfig .defaultBuilder ()
45+ .setTypes (Set .of (Type .KRAFT ))
46+ .setBrokers (BROKER_COUNT )
47+ .setDisksPerBroker (2 )
48+ .setServerProperties (createServerPropsForRemoteStorage (
49+ AlterLogDirTest .class .getSimpleName ().toLowerCase (Locale .ROOT ),
50+ BROKER_COUNT ,
51+ 5 ))
52+ .build ());
53+ }
54+
55+ @ ClusterTemplate ("clusterConfig" )
56+ public void testAlterLogDirWithClassicGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
57+ executeAlterLogDirTest (clusterInstance , GroupProtocol .CLASSIC );
3358 }
3459
35- @ Override
36- protected void writeTestSpecifications (TieredStorageTestBuilder builder ) {
60+ @ ClusterTemplate ("clusterConfig" )
61+ public void testAlterLogDirWithConsumerGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
62+ executeAlterLogDirTest (clusterInstance , GroupProtocol .CONSUMER );
63+ }
64+
65+ private void executeAlterLogDirTest (ClusterInstance clusterInstance , GroupProtocol groupProtocol ) throws Exception {
3766 final String topicB = "topicB" ;
3867 final int p0 = 0 ;
3968 final int partitionCount = 1 ;
@@ -43,8 +72,9 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
4372 final int broker0 = 0 ;
4473 final int broker1 = 1 ;
4574
75+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder ();
4676 builder
47- // create topicB with 1 partition and 1 RF
77+ // create topicB with 1 partition and 2 RF
4878 .createTopic (topicB , partitionCount , replicationFactor , maxBatchCountPerSegment ,
4979 mkMap (mkEntry (p0 , List .of (broker1 , broker0 ))), enableRemoteLogStorage )
5080 // send records to partition 0
@@ -63,5 +93,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
6393 // consume from the beginning of the topic to read data from local and remote storage
6494 .expectFetchFromTieredStorage (broker0 , topicB , p0 , 3 )
6595 .consume (topicB , p0 , 0L , 4 , 3 );
96+
97+ Map <String , Object > extraConsumerProps = Map .of (
98+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ().toLowerCase (Locale .ROOT )
99+ );
100+ try (TieredStorageTestContext context = new TieredStorageTestContext (clusterInstance , extraConsumerProps )) {
101+ try {
102+ for (TieredStorageTestAction action : builder .complete ()) {
103+ action .execute (context );
104+ }
105+ } finally {
106+ context .printReport (System .out );
107+ }
108+ }
66109 }
67110}
0 commit comments