Skip to content

Commit cdd91e2

Browse files
committed
[fix][proxy]Fix incorrect client error when calling get topic metadata
1 parent 8412fda commit cdd91e2

File tree

3 files changed

+88
-2
lines changed

3 files changed

+88
-2
lines changed

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+41
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,47 @@ public static PulsarClientException getPulsarClientException(ServerError error,
13821382
}
13831383
}
13841384

1385+
public static ServerError revertClientExToErrorCode(PulsarClientException ex) {
1386+
if (ex instanceof PulsarClientException.AuthenticationException) {
1387+
return ServerError.AuthenticationError;
1388+
} else if (ex instanceof PulsarClientException.AuthorizationException) {
1389+
return ServerError.AuthorizationError;
1390+
} else if (ex instanceof PulsarClientException.ProducerBusyException) {
1391+
return ServerError.ProducerBusy;
1392+
} else if (ex instanceof PulsarClientException.ConsumerBusyException) {
1393+
return ServerError.ConsumerBusy;
1394+
} else if (ex instanceof PulsarClientException.BrokerMetadataException) {
1395+
return ServerError.MetadataError;
1396+
} else if (ex instanceof PulsarClientException.BrokerPersistenceException) {
1397+
return ServerError.PersistenceError;
1398+
} else if (ex instanceof PulsarClientException.TooManyRequestsException) {
1399+
return ServerError.TooManyRequests;
1400+
} else if (ex instanceof PulsarClientException.LookupException) {
1401+
return ServerError.ServiceNotReady;
1402+
} else if (ex instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
1403+
return ServerError.ProducerBlockedQuotaExceededError;
1404+
} else if (ex instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
1405+
return ServerError.ProducerBlockedQuotaExceededException;
1406+
} else if (ex instanceof PulsarClientException.TopicTerminatedException) {
1407+
return ServerError.TopicTerminatedError;
1408+
} else if (ex instanceof PulsarClientException.IncompatibleSchemaException) {
1409+
return ServerError.IncompatibleSchema;
1410+
} else if (ex instanceof PulsarClientException.TopicDoesNotExistException) {
1411+
return ServerError.TopicNotFound;
1412+
} else if (ex instanceof PulsarClientException.SubscriptionNotFoundException) {
1413+
return ServerError.SubscriptionNotFound;
1414+
} else if (ex instanceof PulsarClientException.ConsumerAssignException) {
1415+
return ServerError.ConsumerAssignError;
1416+
} else if (ex instanceof PulsarClientException.NotAllowedException) {
1417+
return ServerError.NotAllowedError;
1418+
} else if (ex instanceof PulsarClientException.TransactionConflictException) {
1419+
return ServerError.TransactionConflict;
1420+
} else if (ex instanceof PulsarClientException.ProducerFencedException) {
1421+
return ServerError.ProducerFenced;
1422+
}
1423+
return ServerError.UnknownError;
1424+
}
1425+
13851426
public void close() {
13861427
if (ctx != null) {
13871428
ctx.close();

Diff for: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,13 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
247247
if (t != null) {
248248
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
249249
t.getMessage(), t);
250-
writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
251-
t.getMessage(), clientRequestId));
250+
if (t instanceof PulsarClientException pce) {
251+
writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce),
252+
t.getMessage(), clientRequestId));
253+
} else {
254+
writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
255+
t.getMessage(), clientRequestId));
256+
}
252257
} else {
253258
writeAndFlush(
254259
Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));

Diff for: pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertNotNull;
27+
import static org.testng.Assert.assertTrue;
2728
import io.netty.buffer.ByteBuf;
2829
import io.netty.channel.EventLoopGroup;
2930
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -52,6 +53,7 @@
5253
import org.apache.pulsar.client.api.MessageRoutingMode;
5354
import org.apache.pulsar.client.api.Producer;
5455
import org.apache.pulsar.client.api.PulsarClient;
56+
import org.apache.pulsar.client.api.PulsarClientException;
5557
import org.apache.pulsar.client.api.Schema;
5658
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
5759
import org.apache.pulsar.client.api.SubscriptionType;
@@ -68,13 +70,15 @@
6870
import org.apache.pulsar.common.api.proto.ProtocolVersion;
6971
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
7072
import org.apache.pulsar.common.naming.TopicName;
73+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
7174
import org.apache.pulsar.common.policies.data.ClusterData;
7275
import org.apache.pulsar.common.policies.data.RetentionPolicies;
7376
import org.apache.pulsar.common.policies.data.TenantInfo;
7477
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
7578
import org.apache.pulsar.common.policies.data.TopicType;
7679
import org.apache.pulsar.common.protocol.Commands;
7780
import org.apache.pulsar.common.schema.SchemaInfo;
81+
import org.apache.pulsar.common.util.FutureUtil;
7882
import org.apache.pulsar.common.util.netty.EventLoopUtil;
7983
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
8084
import org.mockito.Mockito;
@@ -140,6 +144,13 @@ protected void initializeProxyConfig() throws Exception {
140144
proxyClientAuthentication.start();
141145
}
142146

147+
@Override
148+
protected void doInitConf() throws Exception {
149+
super.doInitConf();
150+
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
151+
conf.setDefaultNumPartitions(1);
152+
}
153+
143154
@Override
144155
@AfterClass(alwaysRun = true)
145156
protected void cleanup() throws Exception {
@@ -415,6 +426,35 @@ public void testProtocolVersionAdvertisement() throws Exception {
415426
}
416427
}
417428

429+
@Test
430+
public void testGetPartitionedMetadataErrorCode() throws Exception {
431+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
432+
// Trigger partitioned metadata creation.
433+
PulsarClientImpl brokerClient = (PulsarClientImpl) pulsarClient;
434+
PartitionedTopicMetadata brokerMetadata =
435+
brokerClient.getPartitionedTopicMetadata(topic, true, true).get();
436+
assertEquals(brokerMetadata.partitions, 1);
437+
assertEquals(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
438+
.getPartitionedTopicMetadataAsync(TopicName.get(topic)).get().get().partitions, 1);
439+
// Verify: Proxy never rewrite error code.
440+
ClientConfigurationData proxyClientConf = new ClientConfigurationData();
441+
proxyClientConf.setServiceUrl(proxyService.getServiceUrl());
442+
PulsarClientImpl proxyClient =
443+
(PulsarClientImpl) getClientActiveConsumerChangeNotSupported(proxyClientConf);
444+
PartitionedTopicMetadata proxyMetadata =
445+
proxyClient.getPartitionedTopicMetadata(topic, false, false).get();
446+
assertEquals(proxyMetadata.partitions, 1);
447+
try {
448+
proxyClient.getPartitionedTopicMetadata(topic + "-partition-0", false, false).get();
449+
} catch (Exception ex) {
450+
assertTrue(FutureUtil.unwrapCompletionException(ex)
451+
instanceof PulsarClientException.TopicDoesNotExistException);
452+
}
453+
// cleanup.
454+
proxyClient.close();
455+
admin.topics().deletePartitionedTopic(topic);
456+
}
457+
418458
@Test
419459
public void testGetClientVersion() throws Exception {
420460
@Cleanup

0 commit comments

Comments
 (0)