Skip to content

Commit 0e4ca80

Browse files
poorbarcodelhotari
authored andcommitted
[fix][schema] Reject unsupported Avro schema types during schema registration (#24103)
(cherry picked from commit 3bdc661)
1 parent 6052e72 commit 0e4ca80

File tree

6 files changed

+144
-9
lines changed

6 files changed

+144
-9
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java

+4
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
2323
import org.apache.pulsar.common.protocol.schema.SchemaData;
2424
import org.apache.pulsar.common.schema.KeyValue;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
/**
2729
* A validator to validate the schema data is well formed.
2830
*/
2931
public interface SchemaDataValidator {
3032

33+
Logger LOGGER = LoggerFactory.getLogger(SchemaDataValidator.class);
34+
3135
/**
3236
* Validate if the schema data is well formed.
3337
*

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
5151
try {
5252
Schema.Parser avroSchemaParser = new Schema.Parser();
5353
avroSchemaParser.setValidateDefaults(false);
54-
avroSchemaParser.parse(new String(data, UTF_8));
54+
Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
55+
if (SchemaType.AVRO.equals(schemaData.getType())) {
56+
checkAvroSchemaTypeSupported(schema);
57+
}
5558
} catch (SchemaParseException e) {
5659
if (schemaData.getType() == SchemaType.JSON) {
5760
// we used JsonSchema for storing the definition of a JSON schema
@@ -65,11 +68,30 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
6568
} else {
6669
throwInvalidSchemaDataException(schemaData, e);
6770
}
71+
} catch (InvalidSchemaDataException invalidSchemaDataException) {
72+
throw invalidSchemaDataException;
6873
} catch (Exception e) {
6974
throwInvalidSchemaDataException(schemaData, e);
7075
}
7176
}
7277

78+
static void checkAvroSchemaTypeSupported(Schema schema) throws InvalidSchemaDataException {
79+
switch (schema.getType()) {
80+
case RECORD: {
81+
break;
82+
}
83+
case UNION: {
84+
throw new InvalidSchemaDataException(
85+
"Avro schema typed [UNION] is not supported");
86+
}
87+
default: {
88+
// INT, LONG, FLOAT, DOUBLE, BOOLEAN, STRING, BYTES.
89+
// ARRAY, MAP, FIXED, NULL.
90+
LOGGER.info("Registering a special avro schema typed [{}]", schema.getType());
91+
}
92+
}
93+
}
94+
7395
private static void throwInvalidSchemaDataException(SchemaData schemaData,
7496
Throwable cause) throws InvalidSchemaDataException {
7597
throw new InvalidSchemaDataException("Invalid schema definition data for "

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.avro.Schema.Parser;
5555
import org.apache.bookkeeper.client.BKException;
5656
import org.apache.bookkeeper.client.BookKeeper;
57+
import org.apache.pulsar.broker.BrokerTestUtil;
5758
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
5859
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
5960
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
@@ -412,7 +413,35 @@ public void testSendAvroAndJsonPrimitiveSchema() throws Exception {
412413
// JSON schema with primitive class can consume
413414
assertEquals(consumer.receive().getValue().getNativeObject(), producerJsonIntegerValue);
414415
assertArrayEquals((byte[]) consumer.receive().getValue().getNativeObject(), producerJsonBytesValue);
415-
}
416+
}
417+
418+
@Test
419+
public void testAvroIntSchema() throws Exception {
420+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + PUBLIC_TENANT + "/my-ns/tp");
421+
422+
Producer<Integer> producer = pulsarClient.newProducer(Schema.AVRO(Integer.class)).topic(topicName).create();
423+
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.AVRO(Integer.class)).topic(topicName)
424+
.subscriptionName("sub").subscribe();
425+
426+
producer.send(1);
427+
producer.send(2);
428+
producer.send(3);
429+
430+
Message<Integer> msg1 = consumer.receive(2, TimeUnit.SECONDS);
431+
assertNotNull(msg1);
432+
assertEquals(msg1.getValue(), 1);
433+
Message<Integer> msg2 = consumer.receive(2, TimeUnit.SECONDS);
434+
assertNotNull(msg2);
435+
assertEquals(msg2.getValue(), 2);
436+
Message<Integer> msg3 = consumer.receive(2, TimeUnit.SECONDS);
437+
assertNotNull(msg3);
438+
assertEquals(msg3.getValue(), 3);
439+
440+
// cleanup.
441+
consumer.close();
442+
producer.close();
443+
admin.topics().delete(topicName, false);
444+
}
416445

417446
@Test
418447
public void testJSONSchemaDeserialize() throws Exception {

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java

+49-3
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919
package org.apache.pulsar.schema.compatibility;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
2223
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
2324
import static org.testng.Assert.assertEquals;
25+
import static org.testng.Assert.assertTrue;
2426
import static org.testng.Assert.fail;
2527
import com.google.common.collect.Sets;
2628
import java.util.Collections;
2729
import java.util.concurrent.ThreadLocalRandom;
2830
import java.util.stream.Collectors;
2931
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.pulsar.broker.BrokerTestUtil;
3033
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
34+
import org.apache.pulsar.client.admin.PulsarAdminException;
3135
import org.apache.pulsar.client.api.Consumer;
3236
import org.apache.pulsar.client.api.ConsumerBuilder;
3337
import org.apache.pulsar.client.api.Message;
@@ -36,6 +40,7 @@
3640
import org.apache.pulsar.client.api.Schema;
3741
import org.apache.pulsar.client.api.SchemaSerializationException;
3842
import org.apache.pulsar.client.api.schema.SchemaDefinition;
43+
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
3944
import org.apache.pulsar.common.naming.NamespaceName;
4045
import org.apache.pulsar.common.naming.TopicDomain;
4146
import org.apache.pulsar.common.naming.TopicName;
@@ -68,6 +73,8 @@ public void setup() throws Exception {
6873
.allowedClusters(Collections.singleton(CLUSTER_NAME))
6974
.build();
7075
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
76+
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
77+
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
7178
}
7279

7380
@AfterMethod(alwaysRun = true)
@@ -483,9 +490,8 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
483490

484491
@Test
485492
public void testSchemaLedgerAutoRelease() throws Exception {
486-
String namespaceName = PUBLIC_TENANT + "/default";
487-
String topicName = "persistent://" + namespaceName + "/tp";
488-
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
493+
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
494+
String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp");
489495
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
490496
// Update schema 100 times.
491497
for (int i = 0; i < 100; i++){
@@ -516,6 +522,46 @@ public void testSchemaLedgerAutoRelease() throws Exception {
516522
admin.topics().delete(topicName, true);
517523
}
518524

525+
@Test
526+
public void testAddUnionAvroSchema() throws Exception {
527+
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
528+
String topicName = BrokerTestUtil.newUniqueName(namespaceName + "/tp");
529+
admin.topics().createNonPartitionedTopic(topicName);
530+
531+
// Create a union type schema.
532+
SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
533+
schemaInfo.setType(SchemaType.AVRO);
534+
schemaInfo.setSchema(
535+
"""
536+
[{
537+
"namespace": "org.apache.pulsar.schema.compatibility.TestA",
538+
"type": "enum",
539+
"name": "EventSource",
540+
"symbols": ["AUTO_EVENTING", "HOODLUM", "OPTA", "ISD", "LIVE_STATS", "NGSS", "UNIFIED"]
541+
}, {
542+
"namespace": "org.apache.pulsar.schema.compatibility.TestB",
543+
"type": "enum",
544+
"name": "PeriodType",
545+
"symbols": ["REGULAR", "EXTRA_TIME"]
546+
}]
547+
""".getBytes(UTF_8));
548+
schemaInfo.setName(topicName);
549+
schemaInfo.setTimestamp(System.currentTimeMillis());
550+
try {
551+
admin.schemas().createSchema(topicName, schemaInfo);
552+
fail("avro-union schema is not supported");
553+
} catch (PulsarAdminException e) {
554+
assertTrue(e.getMessage().contains("Avro schema typed [UNION] is not supported"));
555+
}
556+
557+
// Create a producer with auto_produce schema.
558+
Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
559+
560+
// Cleanup.
561+
producer.close();
562+
admin.topics().delete(topicName, false);
563+
}
564+
519565
@Test
520566
public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
521567
final String tenant = PUBLIC_TENANT;

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.List;
2222
import java.util.stream.Collectors;
23+
import org.apache.avro.AvroRuntimeException;
2324
import org.apache.pulsar.client.api.schema.Field;
2425
import org.apache.pulsar.client.api.schema.GenericRecord;
2526
import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -40,10 +41,16 @@ public abstract class GenericSchemaImpl extends AvroBaseStructSchema<GenericReco
4041
protected GenericSchemaImpl(SchemaInfo schemaInfo) {
4142
super(schemaInfo);
4243

43-
this.fields = schema.getFields()
44-
.stream()
45-
.map(f -> new Field(f.name(), f.pos()))
46-
.collect(Collectors.toList());
44+
try {
45+
this.fields = schema.getFields()
46+
.stream()
47+
.map(f -> new Field(f.name(), f.pos()))
48+
.collect(Collectors.toList());
49+
} catch (AvroRuntimeException avroRuntimeException) {
50+
// Rewrite error log.
51+
throw new AvroRuntimeException("Schema typed [" + schema.getClass().getName() + "], simple-type:["
52+
+ schema.getType() + "] is not supported. schema-content: " + schema);
53+
}
4754
}
4855

4956
@Override

Diff for: pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020

2121
import com.google.common.collect.Lists;
2222
import lombok.extern.slf4j.Slf4j;
23+
import org.apache.avro.AvroRuntimeException;
2324
import org.apache.pulsar.client.api.Schema;
2425
import org.apache.pulsar.client.api.schema.GenericRecord;
2526
import org.apache.pulsar.client.api.schema.GenericSchema;
2627
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
2728
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
2829
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
30+
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
2931
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
3032
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
3133
import org.apache.pulsar.common.schema.KeyValue;
3234
import org.apache.pulsar.common.schema.KeyValueEncodingType;
3335
import org.apache.pulsar.common.schema.LongSchemaVersion;
36+
import org.apache.pulsar.common.schema.SchemaType;
3437
import org.testng.annotations.Test;
3538

3639
import java.util.List;
@@ -42,6 +45,7 @@
4245
import static org.mockito.Mockito.when;
4346
import static org.testng.Assert.assertEquals;
4447
import static org.testng.Assert.assertTrue;
48+
import static org.testng.Assert.fail;
4549

4650
/**
4751
* Unit testing generic schemas.
@@ -64,6 +68,29 @@ public void testGenericJsonSchema() {
6468
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
6569
}
6670

71+
@Test
72+
public void testUnionSchema() {
73+
SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
74+
schemaInfo.setType(SchemaType.AVRO);
75+
schemaInfo.setSchema(("[{\n"
76+
+ "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestA\",\n"
77+
+ "\"type\": \"enum\",\n"
78+
+ "\"name\": \"EventSource\",\n"
79+
+ "\"symbols\": [\"AUTO_EVENTING\", \"HOODLUM\", \"OPTA\", \"ISD\", \"LIVE_STATS\", \"NGSS\", \"UNIFIED\"]\n"
80+
+ "}, {\n"
81+
+ "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestB\",\n"
82+
+ "\"type\": \"enum\",\n"
83+
+ "\"name\": \"PeriodType\",\n"
84+
+ "\"symbols\": [\"REGULAR\", \"EXTRA_TIME\"]\n"
85+
+ "}]").getBytes(UTF_8));
86+
try {
87+
GenericJsonSchema.of(schemaInfo);
88+
fail("expected an not-supported exception");
89+
} catch (AvroRuntimeException e) {
90+
assertTrue(e.getMessage().contains("simple-type:[UNION] is not supported"));
91+
}
92+
}
93+
6794
@Test
6895
public void testAutoAvroSchema() {
6996
// configure encode schema

0 commit comments

Comments
 (0)