Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 57f910c

Browse files
committed
[cleanup] Remove static LOOKUP_CLIENT_MAP
1 parent 5398f77 commit 57f910c

File tree

8 files changed

+40
-36
lines changed

8 files changed

+40
-36
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
5050
private final KopBrokerLookupManager kopBrokerLookupManager;
5151
@Getter
5252
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
53+
private final LookupClient lookupClient;
5354

5455
private final AdminManager adminManager;
5556
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
@@ -80,13 +81,15 @@ public KafkaChannelInitializer(PulsarService pulsarService,
8081
boolean skipMessagesWithoutIndex,
8182
RequestStats requestStats,
8283
OrderedScheduler sendResponseScheduler,
83-
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) {
84+
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
85+
LookupClient lookupClient) {
8486
super();
8587
this.pulsarService = pulsarService;
8688
this.kafkaConfig = kafkaConfig;
8789
this.tenantContextManager = tenantContextManager;
8890
this.replicaManager = replicaManager;
8991
this.kopBrokerLookupManager = kopBrokerLookupManager;
92+
this.lookupClient = lookupClient;
9093
this.adminManager = adminManager;
9194
this.producePurgatory = producePurgatory;
9295
this.fetchPurgatory = fetchPurgatory;
@@ -127,7 +130,7 @@ public KafkaRequestHandler newCnx() throws Exception {
127130
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
128131
producePurgatory, fetchPurgatory,
129132
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
130-
kafkaTopicManagerSharedState);
133+
kafkaTopicManagerSharedState, lookupClient);
131134
}
132135

133136
@VisibleForTesting
@@ -138,6 +141,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
138141
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
139142
requestStats,
140143
sendResponseScheduler,
141-
kafkaTopicManagerSharedState);
144+
kafkaTopicManagerSharedState, lookupClient);
142145
}
143146
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+18-21
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.concurrent.ConcurrentHashMap;
4444
import java.util.concurrent.TimeUnit;
4545
import lombok.Getter;
46-
import lombok.NonNull;
4746
import lombok.extern.slf4j.Slf4j;
4847
import org.apache.bookkeeper.common.util.OrderedScheduler;
4948
import org.apache.commons.configuration.Configuration;
@@ -52,7 +51,6 @@
5251
import org.apache.kafka.common.record.CompressionType;
5352
import org.apache.kafka.common.utils.Time;
5453
import org.apache.pulsar.broker.PulsarServerException;
55-
import org.apache.pulsar.broker.PulsarService;
5654
import org.apache.pulsar.broker.ServiceConfiguration;
5755
import org.apache.pulsar.broker.namespace.NamespaceService;
5856
import org.apache.pulsar.broker.protocol.ProtocolHandler;
@@ -71,8 +69,6 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
7169

7270
public static final String PROTOCOL_NAME = "kafka";
7371
public static final String TLS_HANDLER = "tls";
74-
private static final Map<PulsarService, LookupClient> LOOKUP_CLIENT_MAP = new ConcurrentHashMap<>();
75-
7672
@Getter
7773
private RequestStats requestStats;
7874
private PrometheusMetricsProvider statsProvider;
@@ -84,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8480
private SystemTopicClient txnTopicClient;
8581
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
8682
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
83+
private LookupClient lookupClient;
8784
@VisibleForTesting
8885
@Getter
8986
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
@@ -202,12 +199,12 @@ public void start(BrokerService service) {
202199
throw new IllegalStateException(e);
203200
}
204201

205-
LOOKUP_CLIENT_MAP.put(brokerService.pulsar(), new LookupClient(brokerService.pulsar(), kafkaConfig));
202+
lookupClient = new LookupClient(brokerService.pulsar(), kafkaConfig);
206203
offsetTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
207204
txnTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
208205

209206
try {
210-
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar());
207+
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar(), lookupClient);
211208
} catch (Exception ex) {
212209
log.error("Failed to get kopBrokerLookupManager", ex);
213210
throw new IllegalStateException(ex);
@@ -402,7 +399,8 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
402399
kafkaConfig.isSkipMessagesWithoutIndex(),
403400
requestStats,
404401
sendResponseScheduler,
405-
kafkaTopicManagerSharedState);
402+
kafkaTopicManagerSharedState,
403+
lookupClient);
406404
}
407405

408406
// this is called after initialize, and with kafkaConfig, brokerService all set.
@@ -456,16 +454,6 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
456454

457455
@Override
458456
public void close() {
459-
Optional.ofNullable(LOOKUP_CLIENT_MAP.remove(brokerService.pulsar())).ifPresent(LookupClient::close);
460-
if (offsetTopicClient != null) {
461-
offsetTopicClient.close();
462-
}
463-
if (txnTopicClient != null) {
464-
txnTopicClient.close();
465-
}
466-
if (adminManager != null) {
467-
adminManager.shutdown();
468-
}
469457
if (producePurgatory != null) {
470458
producePurgatory.shutdown();
471459
}
@@ -483,6 +471,19 @@ public void close() {
483471
kopBrokerLookupManager.close();
484472
statsProvider.stop();
485473
sendResponseScheduler.shutdown();
474+
475+
if (offsetTopicClient != null) {
476+
offsetTopicClient.close();
477+
}
478+
if (txnTopicClient != null) {
479+
txnTopicClient.close();
480+
}
481+
if (adminManager != null) {
482+
adminManager.shutdown();
483+
}
484+
if (lookupClient != null) {
485+
lookupClient.close();
486+
}
486487
}
487488

488489
@VisibleForTesting
@@ -571,8 +572,4 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd
571572

572573
return transactionCoordinator;
573574
}
574-
575-
public static @NonNull LookupClient getLookupClient(final PulsarService pulsarService) {
576-
return LOOKUP_CLIENT_MAP.computeIfAbsent(pulsarService, ignored -> new LookupClient(pulsarService));
577-
}
578575
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
213213
private final TenantContextManager tenantContextManager;
214214
private final ReplicaManager replicaManager;
215215
private final KopBrokerLookupManager kopBrokerLookupManager;
216+
217+
@Getter
218+
private final LookupClient lookupClient;
216219
@Getter
217220
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
218221

@@ -313,12 +316,14 @@ public KafkaRequestHandler(PulsarService pulsarService,
313316
boolean skipMessagesWithoutIndex,
314317
RequestStats requestStats,
315318
OrderedScheduler sendResponseScheduler,
316-
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) throws Exception {
319+
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
320+
LookupClient lookupClient) throws Exception {
317321
super(requestStats, kafkaConfig, sendResponseScheduler);
318322
this.pulsarService = pulsarService;
319323
this.tenantContextManager = tenantContextManager;
320324
this.replicaManager = replicaManager;
321325
this.kopBrokerLookupManager = kopBrokerLookupManager;
326+
this.lookupClient = lookupClient;
322327
this.clusterName = kafkaConfig.getClusterName();
323328
this.executor = pulsarService.getExecutor();
324329
this.admin = pulsarService.getAdminClient();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class KafkaTopicManager {
4646
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
4747
this.brokerService = pulsarService.getBrokerService();
4848
this.internalServerCnx = new InternalServerCnx(requestHandler);
49-
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
49+
this.lookupClient = kafkaRequestHandler.getLookupClient();
5050
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
5151
}
5252

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ public class KopBrokerLookupManager {
4848
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
4949
LOOKUP_CACHE = new ConcurrentHashMap<>();
5050

51-
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService) throws Exception {
51+
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService,
52+
LookupClient lookupClient) throws Exception {
5253
this.pulsar = pulsarService;
53-
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
54+
this.lookupClient = lookupClient;
5455
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarService.getPulsarResources(),
5556
conf.getBrokerLookupTimeoutMs());
5657
this.selfAdvertisedListeners = conf.getKafkaAdvertisedListeners();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java

-6
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@ public LookupClient(final PulsarService pulsarService, final KafkaServiceConfigu
3030
super(createPulsarClient(pulsarService, kafkaConfig, conf -> {}));
3131
}
3232

33-
public LookupClient(final PulsarService pulsarService) {
34-
super(createPulsarClient(pulsarService));
35-
log.warn("This constructor should not be called, it's only called "
36-
+ "when the PulsarService doesn't exist in KafkaProtocolHandlers.LOOKUP_CLIENT_UP");
37-
}
38-
3933
public CompletableFuture<InetSocketAddress> getBrokerAddress(final TopicName topicName) {
4034
return getPulsarClient().getLookup().getBroker(topicName).thenApply(Pair::getLeft);
4135
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManagerTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class KopBrokerLookupManagerTest extends KopProtocolHandlerTestBase {
3030
private static final String TENANT = "test";
3131
private static final String NAMESPACE = TENANT + "/" + "kop-broker-lookup-manager-test";
3232

33+
private LookupClient lookupClient;
3334
private KopBrokerLookupManager kopBrokerLookupManager;
3435

3536
@BeforeClass
@@ -43,13 +44,16 @@ protected void setup() throws Exception {
4344
.build());
4445
admin.namespaces().createNamespace(NAMESPACE);
4546
admin.namespaces().setDeduplicationStatus(NAMESPACE, true);
46-
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar);
47+
lookupClient = new LookupClient(pulsar, conf);
48+
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar, lookupClient);
4749
}
4850

4951
@AfterClass
5052
@Override
5153
protected void cleanup() throws Exception {
5254
super.internalCleanup();
55+
kopBrokerLookupManager.close();
56+
lookupClient.close();
5357
}
5458

5559
@Test(timeOut = 20 * 1000)

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ protected void createAdmin() throws Exception {
247247
}
248248

249249
protected void createClient() throws Exception {
250-
this.pulsarClient = KafkaProtocolHandler.getLookupClient(pulsar).getPulsarClient();
250+
this.pulsarClient = new LookupClient(pulsar, conf).getPulsarClient();
251251
}
252252

253253
protected String getAdvertisedAddress() {

0 commit comments

Comments
 (0)