Skip to content

Commit 9be8ca1

Browse files
authored
KAFKA-17689 Migrate ReassignReplicaMoveTest and ReassignReplicaExpandTest to new test infra (#22343)
Migrate `ReassignReplicaMoveTest` and `ReassignReplicaExpandTest` to new test infra, and integrated into a single test file. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent edcada2 commit 9be8ca1

4 files changed

Lines changed: 133 additions & 163 deletions

File tree

storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java

Lines changed: 0 additions & 101 deletions
This file was deleted.

storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java

Lines changed: 0 additions & 31 deletions
This file was deleted.
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.tiered.storage.integration;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.consumer.GroupProtocol;
21+
import org.apache.kafka.common.test.ClusterInstance;
22+
import org.apache.kafka.common.test.api.ClusterConfig;
23+
import org.apache.kafka.common.test.api.ClusterTemplate;
24+
import org.apache.kafka.common.test.api.Type;
25+
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
26+
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
27+
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
28+
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.Locale;
33+
import java.util.Map;
34+
import java.util.Set;
35+
36+
import static org.apache.kafka.common.utils.Utils.mkEntry;
37+
import static org.apache.kafka.common.utils.Utils.mkMap;
38+
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
39+
40+
public final class ReassignReplicaMoveAndExpandTest {
41+
private static final int BROKER_COUNT = 3;
42+
private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
43+
44+
private static final Integer BROKER_0 = 0;
45+
private static final Integer BROKER_1 = 1;
46+
47+
private static List<ClusterConfig> clusterConfig() {
48+
return List.of(ClusterConfig.defaultBuilder()
49+
.setTypes(Set.of(Type.KRAFT))
50+
.setBrokers(BROKER_COUNT)
51+
.setServerProperties(createServerPropsForRemoteStorage(
52+
ReassignReplicaMoveAndExpandTest.class.getSimpleName().toLowerCase(Locale.ROOT),
53+
BROKER_COUNT,
54+
NUM_REMOTE_LOG_METADATA_PARTITIONS))
55+
.build());
56+
}
57+
58+
@ClusterTemplate("clusterConfig")
59+
public void testReassignReplicaExpandWithClassicGroupProtocol(ClusterInstance clusterInstance) throws Exception {
60+
executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC, List.of(BROKER_0, BROKER_1));
61+
}
62+
63+
@ClusterTemplate("clusterConfig")
64+
public void testReassignReplicaExpandWithConsumerGroupProtocol(ClusterInstance clusterInstance) throws Exception {
65+
executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER, List.of(BROKER_0, BROKER_1));
66+
}
67+
68+
@ClusterTemplate("clusterConfig")
69+
public void testReassignReplicaMoveWithClassicGroupProtocol(ClusterInstance clusterInstance) throws Exception {
70+
executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC, List.of(BROKER_1));
71+
}
72+
73+
@ClusterTemplate("clusterConfig")
74+
public void testReassignReplicaMoveWithConsumerGroupProtocol(ClusterInstance clusterInstance) throws Exception {
75+
executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER, List.of(BROKER_1));
76+
}
77+
78+
private void executeReassignReplicaTest(ClusterInstance clusterInstance, GroupProtocol groupProtocol, List<Integer> replicaIds) throws Exception {
79+
final String topicA = "topicA";
80+
final String topicB = "topicB";
81+
final int p0 = 0;
82+
final int partitionCount = 1;
83+
final int replicationFactor = 1;
84+
final int maxBatchCountPerSegment = 1;
85+
final boolean enableRemoteLogStorage = true;
86+
final List<Integer> metadataPartitions = new ArrayList<>();
87+
for (int i = 0; i < NUM_REMOTE_LOG_METADATA_PARTITIONS; i++) {
88+
metadataPartitions.add(i);
89+
}
90+
91+
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
92+
builder
93+
// create topicA with 50 partitions and 2 RF. Using 50 partitions to ensure that the user-partitions
94+
// are mapped to all the __remote_log_metadata partitions. This is required to ensure that
95+
// TBRLMM able to handle the assignment of the newly created replica to one of the already assigned
96+
// metadata partition
97+
.createTopic(topicA, 50, 2, maxBatchCountPerSegment,
98+
null, enableRemoteLogStorage)
99+
.expectUserTopicMappedToMetadataPartitions(topicA, metadataPartitions)
100+
// create topicB with 1 partition and 1 RF
101+
.createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
102+
mkMap(mkEntry(p0, List.of(BROKER_0))), enableRemoteLogStorage)
103+
// send records to partition 0
104+
.expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
105+
.expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
106+
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
107+
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
108+
new KeyValueSpec("k2", "v2"))
109+
// The newly created replica gets mapped to one of the metadata partition which is being actively
110+
// consumed by both the brokers
111+
.reassignReplica(topicB, p0, replicaIds)
112+
.expectLeader(topicB, p0, BROKER_1, true)
113+
// produce some more events and verify the earliest local offset
114+
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
115+
.produce(topicB, p0, new KeyValueSpec("k3", "v3"))
116+
// consume from the beginning of the topic to read data from local and remote storage
117+
.expectFetchFromTieredStorage(BROKER_1, topicB, p0, 3)
118+
.consume(topicB, p0, 0L, 4, 3);
119+
120+
Map<String, Object> extraConsumerProps = Map.of(
121+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)
122+
);
123+
try (TieredStorageTestContext context = new TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
124+
try {
125+
for (TieredStorageTestAction action : builder.complete()) {
126+
action.execute(context);
127+
}
128+
} finally {
129+
context.printReport(System.out);
130+
}
131+
}
132+
}
133+
}

storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)