Skip to content

Commit 4847648

Browse files
coderzclhotari
authored andcommitted
[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
(cherry picked from commit d8903da)
1 parent 6255b12 commit 4847648

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -2496,9 +2496,10 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
24962496
schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
24972497
}
24982498

2499+
final String topic = commandGetSchema.getTopic();
24992500
String schemaName;
25002501
try {
2501-
schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
2502+
schemaName = TopicName.get(topic).getSchemaName();
25022503
} catch (Throwable t) {
25032504
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
25042505
return;
@@ -2507,7 +2508,7 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
25072508
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
25082509
if (schemaAndMetadata == null) {
25092510
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
2510-
String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
2511+
String.format("Topic not found or no-schema %s", topic));
25112512
} else {
25122513
commandSender.sendGetSchemaResponse(requestId,
25132514
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.Executors;
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.regex.Pattern;
4950
import java.util.stream.Collectors;
5051
import lombok.Cleanup;
5152
import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@
6970
import org.apache.pulsar.client.api.TypedMessageBuilder;
7071
import org.apache.pulsar.client.api.schema.GenericRecord;
7172
import org.apache.pulsar.client.api.schema.SchemaDefinition;
73+
import org.apache.pulsar.client.impl.ConsumerImpl;
74+
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
7275
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
7376
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
7477
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
98101
@BeforeMethod
99102
@Override
100103
public void setup() throws Exception {
104+
isTcpLookup = true;
101105
super.internalSetup();
102106

103107
// Setup namespaces
@@ -106,6 +110,7 @@ public void setup() throws Exception {
106110
.allowedClusters(Collections.singleton(CLUSTER_NAME))
107111
.build();
108112
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
113+
admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
109114
}
110115

111116
@AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
130135
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
131136
}
132137

138+
@Test
139+
public void testGetSchemaWithPatternTopic() throws Exception {
140+
final String topicPrefix = "persistent://public/my-ns/test-getSchema";
141+
142+
int topicNums = 10;
143+
for (int i = 0; i < topicNums; i++) {
144+
String topic = topicPrefix + "-" + i;
145+
admin.topics().createNonPartitionedTopic(topic);
146+
}
147+
148+
Pattern pattern = Pattern.compile(topicPrefix + "-.*");
149+
@Cleanup
150+
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
151+
.topicsPattern(pattern)
152+
.subscriptionName("sub")
153+
.subscriptionType(SubscriptionType.Shared)
154+
.subscribe();
155+
156+
List<ConsumerImpl<GenericRecord>> consumers =
157+
((MultiTopicsConsumerImpl<GenericRecord>) consumer).getConsumers();
158+
Assert.assertEquals(topicNums, consumers.size());
159+
160+
for (int i = 0; i < topicNums; i++) {
161+
String topic = topicPrefix + "-" + i;
162+
admin.topics().delete(topic, true);
163+
}
164+
}
165+
133166
@Test
134167
public void testMultiTopicSetSchemaProvider() throws Exception {
135168
final String tenant = PUBLIC_TENANT;

0 commit comments

Comments
 (0)