Skip to content

[fix][schema] Reject unsupported Avro schema types during schema registration #24103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A validator to validate the schema data is well formed.
*/
public interface SchemaDataValidator {

Logger LOGGER = LoggerFactory.getLogger(SchemaDataValidator.class);

/**
* Validate if the schema data is well formed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
try {
Schema.Parser avroSchemaParser = new Schema.Parser();
avroSchemaParser.setValidateDefaults(false);
avroSchemaParser.parse(new String(data, UTF_8));
Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
if (SchemaType.AVRO.equals(schemaData.getType())) {
checkAvroSchemaTypeSupported(schema);
}
} catch (SchemaParseException e) {
if (schemaData.getType() == SchemaType.JSON) {
// we used JsonSchema for storing the definition of a JSON schema
Expand All @@ -65,11 +68,40 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
} else {
throwInvalidSchemaDataException(schemaData, e);
}
} catch (InvalidSchemaDataException invalidSchemaDataException) {
throw invalidSchemaDataException;
} catch (Exception e) {
throwInvalidSchemaDataException(schemaData, e);
}
}

static void checkAvroSchemaTypeSupported(Schema schema) throws InvalidSchemaDataException {
switch (schema.getType()) {
case RECORD: {
break;
}
case UNION: {
throw new InvalidSchemaDataException(
"Avro schema typed [UNION] is not supported");
}
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
case STRING:
case BYTES: {
throw new InvalidSchemaDataException("Please call"
+ " org.apache.pulsar.client.api.Schema." + schema.getType()
+ " when using a simple type schema");
}
default: {
// ARRAY, MAP, FIXED, NULL.
LOGGER.info("Registering a special avro schema typed [{}]", schema.getType());
}
}
}

private static void throwInvalidSchemaDataException(SchemaData schemaData,
Throwable cause) throws InvalidSchemaDataException {
throw new InvalidSchemaDataException("Invalid schema definition data for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.pulsar.schema.compatibility;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
Expand All @@ -36,6 +40,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -68,6 +73,8 @@ public void setup() throws Exception {
.allowedClusters(Collections.singleton(CLUSTER_NAME))
.build();
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -483,9 +490,8 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS

@Test
public void testSchemaLedgerAutoRelease() throws Exception {
String namespaceName = PUBLIC_TENANT + "/default";
String topicName = "persistent://" + namespaceName + "/tp";
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp");
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
// Update schema 100 times.
for (int i = 0; i < 100; i++){
Expand Down Expand Up @@ -516,6 +522,68 @@ public void testSchemaLedgerAutoRelease() throws Exception {
admin.topics().delete(topicName, true);
}

@Test
public void testAddUnionAvroSchema() throws Exception {
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
String topicName = BrokerTestUtil.newUniqueName(namespaceName + "/tp");
admin.topics().createNonPartitionedTopic(topicName);

// Create a union type schema.
SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
schemaInfo.setType(SchemaType.AVRO);
schemaInfo.setSchema(
"""
[{
"namespace": "org.apache.pulsar.schema.compatibility.TestA",
"type": "enum",
"name": "EventSource",
"symbols": ["AUTO_EVENTING", "HOODLUM", "OPTA", "ISD", "LIVE_STATS", "NGSS", "UNIFIED"]
}, {
"namespace": "org.apache.pulsar.schema.compatibility.TestB",
"type": "enum",
"name": "PeriodType",
"symbols": ["REGULAR", "EXTRA_TIME"]
}]
""".getBytes(UTF_8));
schemaInfo.setName(topicName);
schemaInfo.setTimestamp(System.currentTimeMillis());
try {
admin.schemas().createSchema(topicName, schemaInfo);
fail("avro-union schema is not supported");
} catch (PulsarAdminException e) {
assertTrue(e.getMessage().contains("Avro schema typed [UNION] is not supported"));
}

// Create a producer with auto_produce schema.
Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();

// Cleanup.
producer.close();
admin.topics().delete(topicName, false);
}

@Test
public void testAddSimpleSchema() throws Exception {
String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
String topicName = BrokerTestUtil.newUniqueName(namespaceName + "/tp");
admin.topics().createNonPartitionedTopic(topicName);

// Create a simple type schema.
try {
admin.schemas().createSchema(topicName, Schema.AVRO(String.class).getSchemaInfo());
fail("avro simple schema is not supported");
} catch (PulsarAdminException e) {
assertTrue(e.getMessage().contains("org.apache.pulsar.client.api.Schema"));
}

// Create a producer with auto_produce schema.
Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();

// Cleanup.
producer.close();
admin.topics().delete(topicName, false);
}

@Test
public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
Expand All @@ -40,10 +41,16 @@ public abstract class GenericSchemaImpl extends AvroBaseStructSchema<GenericReco
protected GenericSchemaImpl(SchemaInfo schemaInfo) {
super(schemaInfo);

this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
try {
this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
} catch (AvroRuntimeException avroRuntimeException) {
// Rewrite error log.
throw new AvroRuntimeException("Schema typed [" + schema.getClass().getName() + "], simple-type:["
+ schema.getType() + "] is not supported. schema-content: " + schema);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.AvroRuntimeException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;

import java.util.List;
Expand All @@ -42,6 +45,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Unit testing generic schemas.
Expand All @@ -64,6 +68,29 @@ public void testGenericJsonSchema() {
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}

@Test
public void testUnionSchema() {
SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
schemaInfo.setType(SchemaType.AVRO);
schemaInfo.setSchema(("[{\n"
+ "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestA\",\n"
+ "\"type\": \"enum\",\n"
+ "\"name\": \"EventSource\",\n"
+ "\"symbols\": [\"AUTO_EVENTING\", \"HOODLUM\", \"OPTA\", \"ISD\", \"LIVE_STATS\", \"NGSS\", \"UNIFIED\"]\n"
+ "}, {\n"
+ "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestB\",\n"
+ "\"type\": \"enum\",\n"
+ "\"name\": \"PeriodType\",\n"
+ "\"symbols\": [\"REGULAR\", \"EXTRA_TIME\"]\n"
+ "}]").getBytes(UTF_8));
try {
GenericJsonSchema.of(schemaInfo);
fail("expected an not-supported exception");
} catch (AvroRuntimeException e) {
assertTrue(e.getMessage().contains("simple-type:[UNION] is not supported"));
}
}

@Test
public void testAutoAvroSchema() {
// configure encode schema
Expand Down
Loading