Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit b9ca312

Browse files
committed
[cleanup] Create only one instance of KafkaTopicLookupService
1 parent 57f910c commit b9ca312

File tree

5 files changed

+37
-21
lines changed

5 files changed

+37
-21
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
5050
private final KopBrokerLookupManager kopBrokerLookupManager;
5151
@Getter
5252
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
53+
private final KafkaTopicLookupService kafkaTopicLookupService;
5354
private final LookupClient lookupClient;
5455

5556
private final AdminManager adminManager;
@@ -82,6 +83,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
8283
RequestStats requestStats,
8384
OrderedScheduler sendResponseScheduler,
8485
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
86+
KafkaTopicLookupService kafkaTopicLookupService,
8587
LookupClient lookupClient) {
8688
super();
8789
this.pulsarService = pulsarService;
@@ -104,6 +106,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
104106
}
105107
this.sendResponseScheduler = sendResponseScheduler;
106108
this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
109+
this.kafkaTopicLookupService = kafkaTopicLookupService;
107110
this.lengthFieldPrepender = new LengthFieldPrepender(4);
108111
}
109112

@@ -130,7 +133,7 @@ public KafkaRequestHandler newCnx() throws Exception {
130133
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
131134
producePurgatory, fetchPurgatory,
132135
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
133-
kafkaTopicManagerSharedState, lookupClient);
136+
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
134137
}
135138

136139
@VisibleForTesting
@@ -141,6 +144,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
141144
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
142145
requestStats,
143146
sendResponseScheduler,
144-
kafkaTopicManagerSharedState, lookupClient);
147+
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
145148
}
146149
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+4
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8080
private SystemTopicClient txnTopicClient;
8181
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
8282
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
83+
private KafkaTopicLookupService kafkaTopicLookupService;
8384
private LookupClient lookupClient;
8485
@VisibleForTesting
8586
@Getter
@@ -400,6 +401,7 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
400401
requestStats,
401402
sendResponseScheduler,
402403
kafkaTopicManagerSharedState,
404+
kafkaTopicLookupService,
403405
lookupClient);
404406
}
405407

@@ -418,6 +420,8 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
418420
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
419421
.build();
420422

423+
kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);
424+
421425
replicaManager = new ReplicaManager(
422426
kafkaConfig,
423427
requestStats,

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
317317
RequestStats requestStats,
318318
OrderedScheduler sendResponseScheduler,
319319
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
320+
KafkaTopicLookupService kafkaTopicLookupService,
320321
LookupClient lookupClient) throws Exception {
321322
super(requestStats, kafkaConfig, sendResponseScheduler);
322323
this.pulsarService = pulsarService;
@@ -343,7 +344,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
343344
this.tlsEnabled = tlsEnabled;
344345
this.advertisedEndPoint = advertisedEndPoint;
345346
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
346-
this.topicManager = new KafkaTopicManager(this);
347+
this.topicManager = new KafkaTopicManager(this, kafkaTopicLookupService);
347348
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
348349
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
349350
this.currentConnectedGroup = new ConcurrentHashMap<>();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java

+24-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.net.SocketAddress;
1717
import java.util.Optional;
1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ConcurrentHashMap;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021
import lombok.extern.slf4j.Slf4j;
2122
import org.apache.pulsar.broker.PulsarService;
@@ -41,14 +42,14 @@ public class KafkaTopicManager {
4142

4243
private final AtomicBoolean closed = new AtomicBoolean(false);
4344

44-
KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
45+
KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler, KafkaTopicLookupService kafkaTopicLookupService) {
4546
this.requestHandler = kafkaRequestHandler;
4647
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
4748
this.brokerService = pulsarService.getBrokerService();
4849
this.internalServerCnx = new InternalServerCnx(requestHandler);
4950
this.lookupClient = kafkaRequestHandler.getLookupClient();
50-
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
51-
}
51+
this.kafkaTopicLookupService = kafkaTopicLookupService;
52+
}
5253

5354
// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
5455
public void setRemoteAddress(SocketAddress remoteAddress) {
@@ -101,12 +102,12 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
101102

102103
private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
103104
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
104-
lookupClient.getPulsarClient().newRequestId(),
105-
brokerService.generateUniqueProducerName());
105+
lookupClient.getPulsarClient().newRequestId(),
106+
brokerService.generateUniqueProducerName());
106107

107108
if (log.isDebugEnabled()) {
108109
log.debug("[{}] Register Mock Producer {} into PersistentTopic {}",
109-
requestHandler.ctx.channel(), producer, persistentTopic.getName());
110+
requestHandler.ctx.channel(), producer, persistentTopic.getName());
110111
}
111112

112113
// this will register and add USAGE_COUNT_UPDATER.
@@ -122,8 +123,9 @@ public Optional<Producer> registerProducerInPersistentTopic(String topicName, Pe
122123
}
123124
return Optional.empty();
124125
}
125-
return Optional.of(requestHandler.getKafkaTopicManagerSharedState()
126-
.getReferences().computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
126+
ConcurrentHashMap<String, Producer> references = requestHandler
127+
.getKafkaTopicManagerSharedState().getReferences();
128+
return Optional.of(references.computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
127129
}
128130

129131
// when channel close, release all the topics reference in persistentTopic
@@ -141,18 +143,23 @@ public void close() {
141143
}
142144

143145
public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
144-
if (closed.get()) {
145-
if (log.isDebugEnabled()) {
146-
log.debug("[{}] Return null for getTopic({}) since channel is closing",
147-
requestHandler.ctx.channel(), topicName);
146+
try {
147+
if (closed.get()) {
148+
if (log.isDebugEnabled()) {
149+
log.debug("[{}] Return null for getTopic({}) since channel is closing",
150+
requestHandler.ctx.channel(), topicName);
151+
}
152+
return CompletableFuture.completedFuture(Optional.empty());
148153
}
154+
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
155+
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
156+
// cache for removing producer
157+
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
158+
return topicCompletableFuture;
159+
} catch (Throwable error) {
160+
log.error("Unhandled error for {}", topicName, error);
149161
return CompletableFuture.completedFuture(Optional.empty());
150162
}
151-
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
152-
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
153-
// cache for removing producer
154-
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
155-
return topicCompletableFuture;
156163
}
157164

158165
public void invalidateCacheForFencedManagerLedgerOnTopic(String fullTopicName) {

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ protected void setup() throws Exception {
8787
doReturn(mockChannel).when(mockCtx).channel();
8888
kafkaRequestHandler.ctx = mockCtx;
8989

90-
kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler);
90+
kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler,
91+
new KafkaTopicLookupService(pulsar.getBrokerService()));
9192
kafkaTopicManager.setRemoteAddress(InternalServerCnx.MOCKED_REMOTE_ADDRESS);
9293
}
9394

0 commit comments

Comments
 (0)