Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 0c72c46

Browse files
committed
Add MigrationWorkflowManager
1 parent 50f1693 commit 0c72c46

File tree

8 files changed

+628
-8
lines changed

8 files changed

+628
-8
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@
5656
import org.apache.pulsar.client.admin.PulsarAdminException;
5757

5858
@Slf4j
59-
class AdminManager {
59+
public class AdminManager {
6060

6161
private final DelayedOperationPurgatory<DelayedOperation> topicPurgatory =
62-
DelayedOperationPurgatory.<DelayedOperation>builder()
62+
DelayedOperationPurgatory.builder()
6363
.purgatoryName("topic")
6464
.timeoutTimer(SystemTimer.builder().executorName("topic").build())
6565
.build();
@@ -83,9 +83,9 @@ public void shutdown() {
8383
topicPurgatory.shutdown();
8484
}
8585

86-
CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, TopicDetails> createInfo,
87-
int timeoutMs,
88-
String namespacePrefix) {
86+
public CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, TopicDetails> createInfo,
87+
int timeoutMs,
88+
String namespacePrefix) {
8989
final Map<String, CompletableFuture<ApiError>> futureMap = new ConcurrentHashMap<>();
9090
final AtomicInteger numTopics = new AtomicInteger(createInfo.size());
9191
final CompletableFuture<Map<String, ApiError>> resultFuture = new CompletableFuture<>();
@@ -245,11 +245,10 @@ CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeC
245245
}
246246

247247
private DescribeConfigsResponse.ConfigEntry buildDummyEntryConfig(String configName, String configValue) {
248-
DescribeConfigsResponse.ConfigEntry configEntry = new DescribeConfigsResponse.ConfigEntry(
248+
return new DescribeConfigsResponse.ConfigEntry(
249249
configName, configValue,
250250
DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, true, true,
251251
Collections.emptyList());
252-
return configEntry;
253252
}
254253

255254
public void deleteTopic(String topicToDelete,

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
public class KafkaTopicLookupService {
3333
private final BrokerService brokerService;
3434

35-
KafkaTopicLookupService(BrokerService brokerService) {
35+
public KafkaTopicLookupService(BrokerService brokerService) {
3636
this.brokerService = brokerService;
3737
}
3838

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.kop.migration.metadata;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.collect.ImmutableMap;
18+
import io.netty.channel.Channel;
19+
import io.streamnative.pulsar.handlers.kop.AdminManager;
20+
import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService;
21+
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
22+
import java.nio.ByteBuffer;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.Map;
26+
import java.util.Properties;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentSkipListSet;
31+
import java.util.concurrent.Future;
32+
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.kafka.clients.admin.AdminClient;
34+
import org.apache.kafka.clients.admin.AdminClientConfig;
35+
import org.apache.kafka.clients.admin.TopicDescription;
36+
import org.apache.kafka.clients.consumer.Consumer;
37+
import org.apache.kafka.clients.consumer.KafkaConsumer;
38+
import org.apache.kafka.clients.producer.KafkaProducer;
39+
import org.apache.kafka.clients.producer.Producer;
40+
import org.apache.kafka.common.KafkaFuture;
41+
import org.apache.kafka.common.requests.ApiError;
42+
import org.apache.kafka.common.requests.CreateTopicsRequest;
43+
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
44+
import org.apache.kafka.common.serialization.ByteBufferSerializer;
45+
import org.apache.kafka.common.serialization.StringDeserializer;
46+
import org.apache.kafka.common.serialization.StringSerializer;
47+
48+
/**
49+
* A MigrationMetadata Manager that uses Managed Ledger properties for metadata storage.
50+
*/
51+
@Slf4j
52+
public class ManagedLedgerPropertiesMigrationMetadataManager implements MigrationMetadataManager {
53+
@VisibleForTesting
54+
static final String KAFKA_CLUSTER_ADDRESS = "migrationKafkaClusterAddress";
55+
@VisibleForTesting
56+
static final String TOPIC_MIGRATION_STATUS = "migrationTopicMigrationStatus";
57+
58+
@VisibleForTesting
59+
final Map<String, Integer> numOutstandingRequests = new ConcurrentHashMap<>();
60+
@VisibleForTesting
61+
final Map<String, KafkaProducer<String, ByteBuffer>> kafkaProducers = new ConcurrentHashMap<>();
62+
@VisibleForTesting
63+
final Map<String, KafkaConsumer<String, ByteBuffer>> kafkaConsumers = new ConcurrentHashMap<>();
64+
@VisibleForTesting
65+
final Map<String, AdminClient> adminClients = new ConcurrentHashMap<>();
66+
67+
// Cache topics that are not configured with migration so save lookup costs
68+
private final Set<String> nonMigratoryTopics = new ConcurrentSkipListSet<>();
69+
70+
private final AdminManager adminManager;
71+
private final KafkaTopicLookupService topicLookupService;
72+
73+
public ManagedLedgerPropertiesMigrationMetadataManager(KafkaTopicLookupService topicLookupService,
74+
AdminManager adminManager) {
75+
this.adminManager = adminManager;
76+
this.topicLookupService = topicLookupService;
77+
}
78+
79+
@Override
80+
public KafkaProducer<String, ByteBuffer> getKafkaProducerForTopic(String topic, String namespacePrefix,
81+
String kafkaClusterAddress) {
82+
String fullTopicName = new KopTopic(topic, namespacePrefix).getFullName();
83+
return kafkaProducers.computeIfAbsent(fullTopicName, key -> {
84+
Properties props = new Properties();
85+
props.put("bootstrap.servers", kafkaClusterAddress);
86+
props.put("key.serializer", StringSerializer.class);
87+
props.put("value.serializer", ByteBufferSerializer.class);
88+
return new KafkaProducer<>(props);
89+
});
90+
}
91+
92+
@Override
93+
public KafkaConsumer<String, ByteBuffer> getKafkaConsumerForTopic(String topic, String namespacePrefix,
94+
String kafkaClusterAddress) {
95+
String fullTopicName = new KopTopic(topic, namespacePrefix).getFullName();
96+
return kafkaConsumers.computeIfAbsent(fullTopicName, key -> {
97+
Properties props = new Properties();
98+
props.put("bootstrap.servers", kafkaClusterAddress);
99+
props.put("key.deserializer", StringDeserializer.class);
100+
props.put("value.deserializer", ByteBufferDeserializer.class);
101+
return new KafkaConsumer<>(props);
102+
});
103+
}
104+
105+
@Override
106+
public AdminClient getAdminClientForKafka(String kafkaClusterAddress) {
107+
return adminClients.computeIfAbsent(kafkaClusterAddress, key -> {
108+
Properties props = new Properties();
109+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterAddress);
110+
return AdminClient.create(props);
111+
});
112+
}
113+
114+
private CompletableFuture<Map<String, String>> getManagedLedgerProperties(String topic, String namespacePrefix,
115+
Channel channel) {
116+
String fullPartitionName = KopTopic.toString(topic, 0, namespacePrefix);
117+
return topicLookupService.getTopic(fullPartitionName, channel).thenApply(persistentTopic -> {
118+
if (!persistentTopic.isPresent()) {
119+
throw new IllegalArgumentException("Cannot get topic " + fullPartitionName);
120+
}
121+
return persistentTopic.get().getManagedLedger().getProperties();
122+
});
123+
}
124+
125+
@Override
126+
public CompletableFuture<MigrationMetadata> getMigrationMetadata(String topic, String namespacePrefix,
127+
Channel channel) {
128+
if (nonMigratoryTopics.contains(topic)) {
129+
return null;
130+
}
131+
return getManagedLedgerProperties(topic, namespacePrefix, channel).thenApply(properties -> {
132+
String status = properties.get(TOPIC_MIGRATION_STATUS);
133+
if (status == null) {
134+
nonMigratoryTopics.add(topic);
135+
return null;
136+
}
137+
138+
String kafkaClusterAddress = properties.get(KAFKA_CLUSTER_ADDRESS);
139+
if (kafkaClusterAddress == null) {
140+
log.error("Topic {} migration misconfigured", topic);
141+
nonMigratoryTopics.add(topic);
142+
return null;
143+
}
144+
145+
return new MigrationMetadata(kafkaClusterAddress, MigrationStatus.valueOf(status));
146+
});
147+
}
148+
149+
@Override
150+
public CompletableFuture<String> getKafkaClusterAddress(String topic, String namespacePrefix, Channel channel) {
151+
return getMigrationMetadata(topic, namespacePrefix, channel).thenApply(migrationMetadata -> {
152+
if (migrationMetadata == null) {
153+
return null;
154+
}
155+
return migrationMetadata.getKafkaClusterAddress();
156+
});
157+
}
158+
159+
@Override
160+
public CompletableFuture<MigrationStatus> getMigrationStatus(String topic, String namespacePrefix,
161+
Channel channel) {
162+
return getMigrationMetadata(topic, namespacePrefix, channel).thenApply(migrationMetadata -> {
163+
if (migrationMetadata == null) {
164+
return null;
165+
}
166+
return migrationMetadata.getMigrationStatus();
167+
});
168+
}
169+
170+
@Override
171+
public void startProxyRequest(String topic, String namespacePrefix) {
172+
String fullTopicName = new KopTopic(topic, namespacePrefix).getFullName();
173+
numOutstandingRequests.put(fullTopicName, numOutstandingRequests.getOrDefault(topic, 0) + 1);
174+
}
175+
176+
@Override
177+
public void finishProxyRequest(String topic, String namespacePrefix) {
178+
String fullTopicName = new KopTopic(topic, namespacePrefix).getFullName();
179+
numOutstandingRequests.compute(fullTopicName, (key, value) -> {
180+
if (value == null) {
181+
log.error("Cannot finish request for topic {}; no request has been proxied", topic);
182+
return null;
183+
}
184+
if (value == 0) {
185+
log.error("Cannot finish more requests than started for topic {}", topic);
186+
return 0;
187+
}
188+
return value - 1;
189+
});
190+
}
191+
192+
private CompletableFuture<Void> setMigrationMetadata(String topic, String namespacePrefix, String key, String value,
193+
Channel channel) {
194+
return getManagedLedgerProperties(topic, namespacePrefix, channel).thenAccept(
195+
properties -> properties.put(key, value));
196+
}
197+
198+
private CompletableFuture<Void> setMigrationStatus(String topic, String namespacePrefix, MigrationStatus status,
199+
Channel channel) {
200+
return setMigrationMetadata(topic, namespacePrefix, TOPIC_MIGRATION_STATUS, status.name(), channel);
201+
}
202+
203+
@Override
204+
public CompletableFuture<Void> createWithMigration(String topic, String namespacePrefix, String kafkaClusterAddress,
205+
Channel channel) {
206+
// TODO: Check authorization
207+
208+
AdminClient adminClient = getAdminClientForKafka(kafkaClusterAddress);
209+
KafkaFuture<TopicDescription> future =
210+
new ArrayList<>(adminClient.describeTopics(Collections.singleton(topic)).values().values()).get(0);
211+
212+
// https://gist.github.com/bmaggi/8e42a16a02f18d3bff9b0b742a75bfe7
213+
CompletableFuture<Void> wrappingFuture = new CompletableFuture<>();
214+
215+
future.thenApply(topicDescription -> {
216+
log.error(topicDescription.toString());
217+
int numPartitions = topicDescription.partitions().size();
218+
int replicationFactor = topicDescription.partitions().get(0).replicas().size();
219+
adminManager.createTopicsAsync(ImmutableMap.of(topic,
220+
new CreateTopicsRequest.TopicDetails(numPartitions, (short) replicationFactor)),
221+
1000,
222+
namespacePrefix)
223+
.thenCompose(validResult -> CompletableFuture.allOf(validResult.entrySet().stream().map(entry -> {
224+
String key = entry.getKey();
225+
ApiError value = entry.getValue();
226+
if (!value.equals(ApiError.NONE)) {
227+
throw value.exception();
228+
}
229+
log.info("Created topic partition: " + key + " with result " + value);
230+
return setMigrationMetadata(topic, namespacePrefix, KAFKA_CLUSTER_ADDRESS, kafkaClusterAddress,
231+
channel).thenCompose(
232+
ignored -> setMigrationStatus(
233+
topic,
234+
namespacePrefix,
235+
MigrationStatus.NOT_STARTED,
236+
channel));
237+
}).toArray(CompletableFuture[]::new))).join();
238+
return null;
239+
}).whenComplete((value, throwable) -> {
240+
if (throwable != null) {
241+
wrappingFuture.completeExceptionally(throwable);
242+
} else {
243+
wrappingFuture.complete(null);
244+
}
245+
});
246+
return wrappingFuture;
247+
}
248+
249+
@Override
250+
public CompletableFuture<Void> migrate(String topic, String namespacePrefix, Channel channel) {
251+
// TODO: actually start the migration
252+
setMigrationStatus(topic, namespacePrefix, MigrationStatus.STARTED, channel);
253+
return CompletableFuture.completedFuture(null);
254+
}
255+
256+
@Override
257+
public void close() {
258+
kafkaProducers.values().forEach(Producer::close);
259+
kafkaConsumers.values().forEach(Consumer::close);
260+
adminClients.values().forEach(AdminClient::close);
261+
}
262+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.kop.migration.metadata;
15+
16+
import lombok.AllArgsConstructor;
17+
import lombok.Data;
18+
19+
/**
20+
* Metadata about Kafka migration for a topic.
21+
*/
22+
@AllArgsConstructor
23+
@Data
24+
public class MigrationMetadata {
25+
/**
26+
* Address of the Kafka cluster backing this topic.
27+
*/
28+
private String kafkaClusterAddress;
29+
30+
/**
31+
* Migration status of the topic.
32+
*/
33+
private MigrationStatus migrationStatus;
34+
}

0 commit comments

Comments
 (0)