Skip to content

Commit 5a1fa0c

Browse files
coderzclhotari
authored andcommitted
[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
(cherry picked from commit d8903da)
1 parent 2e13fba commit 5a1fa0c

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -2389,9 +2389,10 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
23892389
schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
23902390
}
23912391

2392+
final String topic = commandGetSchema.getTopic();
23922393
String schemaName;
23932394
try {
2394-
schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
2395+
schemaName = TopicName.get(topic).getSchemaName();
23952396
} catch (Throwable t) {
23962397
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
23972398
return;
@@ -2400,7 +2401,7 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
24002401
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
24012402
if (schemaAndMetadata == null) {
24022403
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
2403-
String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
2404+
String.format("Topic not found or no-schema %s", topic));
24042405
} else {
24052406
commandSender.sendGetSchemaResponse(requestId,
24062407
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.Executors;
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.regex.Pattern;
4950
import java.util.stream.Collectors;
5051
import lombok.Cleanup;
5152
import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@
6970
import org.apache.pulsar.client.api.TypedMessageBuilder;
7071
import org.apache.pulsar.client.api.schema.GenericRecord;
7172
import org.apache.pulsar.client.api.schema.SchemaDefinition;
73+
import org.apache.pulsar.client.impl.ConsumerImpl;
74+
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
7275
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
7376
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
7477
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
98101
@BeforeMethod
99102
@Override
100103
public void setup() throws Exception {
104+
isTcpLookup = true;
101105
super.internalSetup();
102106

103107
// Setup namespaces
@@ -106,6 +110,7 @@ public void setup() throws Exception {
106110
.allowedClusters(Collections.singleton(CLUSTER_NAME))
107111
.build();
108112
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
113+
admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
109114
}
110115

111116
@AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
130135
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
131136
}
132137

138+
@Test
139+
public void testGetSchemaWithPatternTopic() throws Exception {
140+
final String topicPrefix = "persistent://public/my-ns/test-getSchema";
141+
142+
int topicNums = 10;
143+
for (int i = 0; i < topicNums; i++) {
144+
String topic = topicPrefix + "-" + i;
145+
admin.topics().createNonPartitionedTopic(topic);
146+
}
147+
148+
Pattern pattern = Pattern.compile(topicPrefix + "-.*");
149+
@Cleanup
150+
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
151+
.topicsPattern(pattern)
152+
.subscriptionName("sub")
153+
.subscriptionType(SubscriptionType.Shared)
154+
.subscribe();
155+
156+
List<ConsumerImpl<GenericRecord>> consumers =
157+
((MultiTopicsConsumerImpl<GenericRecord>) consumer).getConsumers();
158+
Assert.assertEquals(topicNums, consumers.size());
159+
160+
for (int i = 0; i < topicNums; i++) {
161+
String topic = topicPrefix + "-" + i;
162+
admin.topics().delete(topic, true);
163+
}
164+
}
165+
133166
@Test
134167
public void testMultiTopicSetSchemaProvider() throws Exception {
135168
final String tenant = PUBLIC_TENANT;

0 commit comments

Comments
 (0)