Skip to content

Commit fdeabe5

Browse files
authored
KAFKA-20229: Batch offset translation in RemoteClusterUtils (apache#21591)
Implements [KIP-1239](https://cwiki.apache.org/confluence/x/h4HMFw) Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
1 parent 23e2af1 commit fdeabe5

3 files changed

Lines changed: 162 additions & 13 deletions

File tree

connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Map;
4040
import java.util.Set;
4141
import java.util.concurrent.ExecutionException;
42+
import java.util.regex.Pattern;
4243
import java.util.stream.Collectors;
4344

4445
/**
@@ -70,6 +71,11 @@ public MirrorClient(MirrorClientConfig config) {
7071
this.consumerConfig = consumerConfig;
7172
}
7273

74+
// for testing
75+
Consumer<byte[], byte[]> consumer() {
76+
return new KafkaConsumer<>(consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer());
77+
}
78+
7379
/**
7480
* Closes internal clients.
7581
*/
@@ -147,45 +153,68 @@ public Set<String> remoteTopics(String source) throws InterruptedException {
147153
}
148154

149155
/**
150-
* Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically
156+
* Translates remote consumer groups' offsets into corresponding local offsets. Topics are automatically
151157
* renamed according to the ReplicationPolicy.
152-
* @param consumerGroupId The group ID of remote consumer group
158+
* @param consumerGroupPattern The regex pattern specifying the consumer groups to translate offsets for
153159
* @param remoteClusterAlias The alias of remote cluster
154160
* @param timeout The maximum time to block when consuming from the checkpoints topic
161+
* @throws IllegalArgumentException If any of the arguments are null
155162
*/
156-
public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
157-
String remoteClusterAlias, Duration timeout) {
163+
public Map<String, Map<TopicPartition, OffsetAndMetadata>> remoteConsumerOffsets(Pattern consumerGroupPattern,
164+
String remoteClusterAlias, Duration timeout) {
165+
if (consumerGroupPattern == null) {
166+
throw new IllegalArgumentException("`consumerGroupPattern` must not be null");
167+
}
168+
if (remoteClusterAlias == null) {
169+
throw new IllegalArgumentException("`remoteClusterAlias` must not be null");
170+
}
171+
if (timeout == null) {
172+
throw new IllegalArgumentException("`timeout` must not be null");
173+
}
158174
long deadline = System.currentTimeMillis() + timeout.toMillis();
159-
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
175+
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new HashMap<>();
160176

161-
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig,
162-
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
177+
try (Consumer<byte[], byte[]> consumer = consumer()) {
163178
// checkpoint topics are not "remote topics", as they are not replicated. So we don't need
164179
// to use ReplicationPolicy to create the checkpoint topic here.
165180
String checkpointTopic = replicationPolicy.checkpointsTopic(remoteClusterAlias);
166-
List<TopicPartition> checkpointAssignment =
167-
List.of(new TopicPartition(checkpointTopic, 0));
181+
List<TopicPartition> checkpointAssignment = List.of(new TopicPartition(checkpointTopic, 0));
168182
consumer.assign(checkpointAssignment);
169183
consumer.seekToBeginning(checkpointAssignment);
170184
while (System.currentTimeMillis() < deadline && !endOfStream(consumer, checkpointAssignment)) {
171185
ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
172186
for (ConsumerRecord<byte[], byte[]> record : records) {
173187
try {
174188
Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
175-
if (checkpoint.consumerGroupId().equals(consumerGroupId)) {
176-
offsets.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
189+
String consumerGroupId = checkpoint.consumerGroupId();
190+
if (consumerGroupPattern.matcher(consumerGroupId).matches()) {
191+
offsets.computeIfAbsent(consumerGroupId, k -> new HashMap<>())
192+
.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
177193
}
178194
} catch (SchemaException e) {
179195
log.info("Could not deserialize record. Skipping.", e);
180196
}
181197
}
182198
}
183-
log.info("Consumed {} checkpoint records for {} from {}.", offsets.size(),
184-
consumerGroupId, checkpointTopic);
199+
log.info("Consumed {} checkpoint records from {}.", offsets.size(), checkpointTopic);
185200
}
186201
return offsets;
187202
}
188203

204+
/**
205+
* Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically
206+
* renamed according to the ReplicationPolicy.
207+
* @param consumerGroupId The group ID of remote consumer group
208+
* @param remoteClusterAlias The alias of remote cluster
209+
* @param timeout The maximum time to block when consuming from the checkpoints topic
210+
*/
211+
public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
212+
String remoteClusterAlias, Duration timeout) {
213+
Pattern consumerGroupPattern = Pattern.compile(Pattern.quote(consumerGroupId));
214+
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = remoteConsumerOffsets(consumerGroupPattern, remoteClusterAlias, timeout);
215+
return offsets.getOrDefault(consumerGroupId, new HashMap<>());
216+
}
217+
189218
Set<String> listTopics() throws InterruptedException {
190219
try {
191220
return adminClient.listTopics().names().get();

connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.concurrent.TimeoutException;
29+
import java.util.regex.Pattern;
2930

3031

3132
/**
@@ -100,4 +101,20 @@ public static Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String
100101
return client.remoteConsumerOffsets(consumerGroupId, remoteClusterAlias, timeout);
101102
}
102103
}
104+
105+
/**
106+
* Translates remote consumer groups' offsets into corresponding local offsets. Topics are automatically
107+
* renamed according to the configured {@link ReplicationPolicy}.
108+
* @param properties Map of properties to instantiate a {@link MirrorClient}
109+
* @param remoteClusterAlias The alias of the remote cluster
110+
* @param consumerGroupPattern The regex pattern specifying the consumer groups to translate offsets for
111+
* @param timeout The maximum time to block when consuming from the checkpoints topic
112+
* @throws IllegalArgumentException If any of the arguments are null
113+
*/
114+
public static Map<String, Map<TopicPartition, OffsetAndMetadata>> translateOffsets(Map<String, Object> properties,
115+
String remoteClusterAlias, Pattern consumerGroupPattern, Duration timeout) {
116+
try (MirrorClient client = new MirrorClient(properties)) {
117+
return client.remoteConsumerOffsets(consumerGroupPattern, remoteClusterAlias, timeout);
118+
}
119+
}
103120
}

connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,36 @@
1616
*/
1717
package org.apache.kafka.connect.mirror;
1818

19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.clients.consumer.MockConsumer;
22+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
23+
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
1924
import org.apache.kafka.common.Configurable;
25+
import org.apache.kafka.common.TopicPartition;
2026

2127
import org.junit.jupiter.api.Test;
2228

29+
import java.time.Duration;
2330
import java.util.HashSet;
2431
import java.util.List;
2532
import java.util.Map;
2633
import java.util.Set;
34+
import java.util.regex.Pattern;
2735

2836
import static org.junit.jupiter.api.Assertions.assertEquals;
2937
import static org.junit.jupiter.api.Assertions.assertFalse;
38+
import static org.junit.jupiter.api.Assertions.assertThrows;
3039
import static org.junit.jupiter.api.Assertions.assertTrue;
3140

3241
public class MirrorClientTest {
3342

43+
private static final String SOURCE = "source";
44+
3445
private static class FakeMirrorClient extends MirrorClient {
3546

3647
List<String> topics;
48+
public MockConsumer<byte[], byte[]> consumer;
3749

3850
FakeMirrorClient(List<String> topics) {
3951
this(new DefaultReplicationPolicy(), topics);
@@ -52,6 +64,15 @@ private static class FakeMirrorClient extends MirrorClient {
5264
protected Set<String> listTopics() {
5365
return new HashSet<>(topics);
5466
}
67+
68+
@Override
69+
Consumer<byte[], byte[]> consumer() {
70+
if (consumer == null) {
71+
return super.consumer();
72+
} else {
73+
return consumer;
74+
}
75+
}
5576
}
5677

5778
@Test
@@ -208,9 +229,91 @@ public void testIdentityReplicationTopicSource() {
208229
.topicSource("backup.heartbeats"));
209230
}
210231

232+
@Test
233+
public void testRemoteConsumerOffsetsIllegalArgs() {
234+
FakeMirrorClient client = new FakeMirrorClient();
235+
assertThrows(IllegalArgumentException.class, () -> client.remoteConsumerOffsets((Pattern) null, "", Duration.ofSeconds(1L)));
236+
assertThrows(IllegalArgumentException.class, () -> client.remoteConsumerOffsets(Pattern.compile(""), null, Duration.ofSeconds(1L)));
237+
assertThrows(IllegalArgumentException.class, () -> client.remoteConsumerOffsets(Pattern.compile(""), "", null));
238+
}
239+
240+
@Test
241+
public void testRemoteConsumerOffsets() {
242+
String grp0 = "mygroup0";
243+
String grp1 = "mygroup1";
244+
FakeMirrorClient client = new FakeMirrorClient();
245+
String checkpointTopic = client.replicationPolicy().checkpointsTopic(SOURCE);
246+
TopicPartition checkpointTp = new TopicPartition(checkpointTopic, 0);
247+
248+
TopicPartition t0p0 = new TopicPartition("topic0", 0);
249+
TopicPartition t0p1 = new TopicPartition("topic0", 1);
250+
251+
Checkpoint cp0 = new Checkpoint(grp0, t0p0, 1L, 1L, "cp0");
252+
Checkpoint cp1 = new Checkpoint(grp0, t0p0, 2L, 2L, "cp1");
253+
Checkpoint cp2 = new Checkpoint(grp0, t0p1, 3L, 3L, "cp2");
254+
Checkpoint cp3 = new Checkpoint(grp1, t0p1, 4L, 4L, "cp3");
255+
256+
// Batch translation matches only mygroup0
257+
client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
258+
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = client.remoteConsumerOffsets(
259+
Pattern.compile(grp0), SOURCE, Duration.ofSeconds(10L));
260+
Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedOffsets = Map.of(
261+
grp0, Map.of(
262+
t0p0, cp1.offsetAndMetadata(),
263+
t0p1, cp2.offsetAndMetadata()
264+
)
265+
);
266+
assertEquals(expectedOffsets, offsets);
267+
268+
// Batch translation matches all groups
269+
client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
270+
offsets = client.remoteConsumerOffsets(Pattern.compile(".*"), SOURCE, Duration.ofSeconds(10L));
271+
expectedOffsets = Map.of(
272+
grp0, Map.of(
273+
t0p0, cp1.offsetAndMetadata(),
274+
t0p1, cp2.offsetAndMetadata()
275+
),
276+
grp1, Map.of(
277+
t0p1, cp3.offsetAndMetadata()
278+
)
279+
);
280+
assertEquals(expectedOffsets, offsets);
281+
282+
// Batch translation matches nothing
283+
client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
284+
offsets = client.remoteConsumerOffsets(Pattern.compile("unknown-group"), SOURCE, Duration.ofSeconds(10L));
285+
assertTrue(offsets.isEmpty());
286+
287+
// Translation for mygroup0
288+
client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
289+
Map<TopicPartition, OffsetAndMetadata> offsets2 = client.remoteConsumerOffsets(grp0, SOURCE, Duration.ofSeconds(10L));
290+
Map<TopicPartition, OffsetAndMetadata> expectedOffsets2 = Map.of(
291+
t0p0, cp1.offsetAndMetadata(),
292+
t0p1, cp2.offsetAndMetadata()
293+
);
294+
assertEquals(expectedOffsets2, offsets2);
295+
296+
// Translation for unknown group
297+
client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
298+
offsets2 = client.remoteConsumerOffsets("unknown-group", SOURCE, Duration.ofSeconds(10L));
299+
assertTrue(offsets2.isEmpty());
300+
}
301+
211302
private ReplicationPolicy identityReplicationPolicy(String source) {
212303
IdentityReplicationPolicy policy = new IdentityReplicationPolicy();
213304
policy.configure(Map.of(IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG, source));
214305
return policy;
215306
}
307+
308+
private MockConsumer<byte[], byte[]> buildConsumer(TopicPartition checkpointTp, Checkpoint... checkpoints) {
309+
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name());
310+
consumer.updateBeginningOffsets(Map.of(checkpointTp, 0L));
311+
consumer.assign(Set.of(checkpointTp));
312+
for (int i = 0; i < checkpoints.length; i++) {
313+
Checkpoint checkpoint = checkpoints[i];
314+
consumer.addRecord(new ConsumerRecord<>(checkpointTp.topic(), 0, i, checkpoint.recordKey(), checkpoint.recordValue()));
315+
}
316+
consumer.updateEndOffsets(Map.of(checkpointTp, checkpoints.length - 1L));
317+
return consumer;
318+
}
216319
}

0 commit comments

Comments
 (0)