1616 */
1717package org .apache .kafka .tiered .storage .integration ;
1818
19+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
20+ import org .apache .kafka .clients .consumer .GroupProtocol ;
1921import org .apache .kafka .common .config .TopicConfig ;
22+ import org .apache .kafka .common .test .ClusterInstance ;
23+ import org .apache .kafka .common .test .api .ClusterConfig ;
24+ import org .apache .kafka .common .test .api .ClusterTemplate ;
25+ import org .apache .kafka .common .test .api .Type ;
26+ import org .apache .kafka .tiered .storage .TieredStorageTestAction ;
2027import org .apache .kafka .tiered .storage .TieredStorageTestBuilder ;
21- import org .apache .kafka .tiered .storage .TieredStorageTestHarness ;
28+ import org .apache .kafka .tiered .storage .TieredStorageTestContext ;
2229import org .apache .kafka .tiered .storage .specs .KeyValueSpec ;
2330
2431import java .util .HashMap ;
2532import java .util .List ;
33+ import java .util .Locale ;
2634import java .util .Map ;
35+ import java .util .Set ;
36+
37+ import static org .apache .kafka .tiered .storage .utils .TieredStorageTestUtils .createServerPropsForRemoteStorage ;
2738
2839/**
2940 * Test to verify that the active segment is rolled and uploaded to remote storage when the segment breaches the
3041 * local log retention policy.
3142 */
32- public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
43+ public final class RollAndOffloadActiveSegmentTest {
44+ private static final int BROKER_COUNT = 3 ;
45+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5 ;
46+
47+ @ SuppressWarnings ("unused" )
48+ private static List <ClusterConfig > clusterConfig () {
49+ return List .of (ClusterConfig .defaultBuilder ()
50+ .setTypes (Set .of (Type .KRAFT ))
51+ .setBrokers (BROKER_COUNT )
52+ .setServerProperties (createServerPropsForRemoteStorage (
53+ RollAndOffloadActiveSegmentTest .class .getSimpleName ().toLowerCase (Locale .ROOT ),
54+ BROKER_COUNT ,
55+ NUM_REMOTE_LOG_METADATA_PARTITIONS ))
56+ .build ());
57+ }
3358
34- @ Override
35- public int brokerCount () {
36- return 1 ;
59+ @ ClusterTemplate ( "clusterConfig" )
60+ public void testRollAndOffloadActiveSegmentWithClassicGroupProtocol ( ClusterInstance clusterInstance ) throws Exception {
61+ executeRollAndOffloadActiveSegmentTest ( clusterInstance , GroupProtocol . CLASSIC ) ;
3762 }
3863
39- @ Override
40- protected void writeTestSpecifications (TieredStorageTestBuilder builder ) {
64+ @ ClusterTemplate ("clusterConfig" )
65+ public void testRollAndOffloadActiveSegmentWithConsumerGroupProtocol (ClusterInstance clusterInstance ) throws Exception {
66+ executeRollAndOffloadActiveSegmentTest (clusterInstance , GroupProtocol .CONSUMER );
67+ }
68+
69+ private void executeRollAndOffloadActiveSegmentTest (ClusterInstance clusterInstance , GroupProtocol groupProtocol ) throws Exception {
4170 final int broker0 = 0 ;
4271 final String topicA = "topicA" ;
4372 final int p0 = 0 ;
4473 final int partitionCount = 1 ;
4574 final int replicationFactor = 1 ;
4675 final int maxBatchCountPerSegment = 1 ;
47- final Map <Integer , List <Integer >> replicaAssignment = null ;
76+ // Pin the partition to broker 0 so the broker0-based expectations are deterministic
77+ // regardless of how many brokers the cluster has.
78+ final Map <Integer , List <Integer >> replicaAssignment = Map .of (p0 , List .of (broker0 ));
4879 final boolean enableRemoteLogStorage = true ;
4980
50- // Create topicA with 1 partition, 1 RF and enabled with remote storage.
51- builder .createTopic (topicA , partitionCount , replicationFactor , maxBatchCountPerSegment , replicaAssignment ,
81+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder ()
82+ // Create topicA with 1 partition, 1 RF and enabled with remote storage.
83+ .createTopic (topicA , partitionCount , replicationFactor , maxBatchCountPerSegment , replicaAssignment ,
5284 enableRemoteLogStorage )
5385 // update the topic config such that it triggers the rolling of the active segment
5486 .updateTopicConfig (topicA , configsToBeAdded (), List .of ())
@@ -63,9 +95,22 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
6395 // consume from the beginning of the topic to read data from local and remote storage
6496 .expectFetchFromTieredStorage (broker0 , topicA , p0 , 4 )
6597 .consume (topicA , p0 , 0L , 4 , 4 );
98+
99+ final Map <String , Object > extraConsumerProps = Map .of (
100+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ().toLowerCase (Locale .ROOT )
101+ );
102+ try (TieredStorageTestContext context = new TieredStorageTestContext (clusterInstance , extraConsumerProps )) {
103+ try {
104+ for (TieredStorageTestAction action : builder .complete ()) {
105+ action .execute (context );
106+ }
107+ } finally {
108+ context .printReport (System .out );
109+ }
110+ }
66111 }
67112
68- private Map <String , String > configsToBeAdded () {
113+ private static Map <String , String > configsToBeAdded () {
69114 // Update localLog retentionMs to 1 ms and segment roll-time to 10 ms
70115 Map <String , String > topicConfigs = new HashMap <>();
71116 topicConfigs .put (TopicConfig .LOCAL_LOG_RETENTION_MS_CONFIG , "1" );
0 commit comments