Skip to content

Commit ff80193

Browse files
committed
fix tests
1 parent 37bf60a commit ff80193

File tree

3 files changed

+36
-42
lines changed

3 files changed

+36
-42
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java

+1-41
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.stream.Collectors;
3131
import javax.ws.rs.core.Response;
32-
import org.apache.avro.Schema;
3332
import org.apache.commons.lang3.tuple.Pair;
3433
import org.apache.pulsar.broker.admin.AdminResource;
3534
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
3635
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
3736
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
3837
import org.apache.pulsar.broker.web.RestException;
3938
import org.apache.pulsar.client.impl.schema.SchemaUtils;
40-
import org.apache.pulsar.client.impl.schema.util.SchemaUtil;
4139
import org.apache.pulsar.client.internal.DefaultImplementation;
4240
import org.apache.pulsar.common.naming.TopicName;
4341
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -125,46 +123,8 @@ public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative,
125123
});
126124
}
127125

128-
protected CompletableFuture<Void> checkSchemaTypeSupported(PostSchemaPayload payload) {
129-
switch (SchemaType.valueOf(payload.getType())) {
130-
case AVRO : {
131-
Schema schema = SchemaUtil.parseAvroSchema(payload.getSchema());
132-
switch (schema.getType()) {
133-
case RECORD: {
134-
break;
135-
}
136-
case UNION: {
137-
return CompletableFuture.failedFuture(new RestException(
138-
Response.Status.BAD_REQUEST.getStatusCode(),
139-
"[" + String.valueOf(topicName) + "] Avro schema typed [UNION] is not supported"));
140-
}
141-
case INT:
142-
case LONG:
143-
case FLOAT:
144-
case DOUBLE:
145-
case BOOLEAN:
146-
case STRING:
147-
case BYTES: {
148-
return CompletableFuture.failedFuture(new RestException(
149-
Response.Status.BAD_REQUEST.getStatusCode(),
150-
"[" + String.valueOf(topicName) + "] Please call"
151-
+ " org.apache.pulsar.client.api.Schema." + schema.getType()
152-
+ " when using a simple type schema"));
153-
}
154-
default: {
155-
// ARRAY, MAP, FIXED, NULL.
156-
log.info("[{}] is using a special schema typed [{}]",
157-
String.valueOf(topicName), schema.getType());
158-
}
159-
}
160-
}
161-
}
162-
return CompletableFuture.completedFuture(null);
163-
}
164-
165126
public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
166-
return checkSchemaTypeSupported(payload)
167-
.thenCompose(__ -> validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE))
127+
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)
168128
.thenCompose(__ -> getSchemaCompatibilityStrategyAsyncWithoutAuth())
169129
.thenCompose(schemaCompatibilityStrategy -> {
170130
byte[] data;

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java

+4
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
2323
import org.apache.pulsar.common.protocol.schema.SchemaData;
2424
import org.apache.pulsar.common.schema.KeyValue;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
/**
2729
* A validator to validate the schema data is well formed.
2830
*/
2931
public interface SchemaDataValidator {
3032

33+
Logger logger = LoggerFactory.getLogger(SchemaDataValidator.class);
34+
3135
/**
3236
* Validate if the schema data is well formed.
3337
*

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
5151
try {
5252
Schema.Parser avroSchemaParser = new Schema.Parser();
5353
avroSchemaParser.setValidateDefaults(false);
54-
avroSchemaParser.parse(new String(data, UTF_8));
54+
Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
55+
if (SchemaType.AVRO.equals(schemaData.getType())) {
56+
checkAvroSchemaTypeSupported(schema);
57+
}
5558
} catch (SchemaParseException e) {
5659
if (schemaData.getType() == SchemaType.JSON) {
5760
// we used JsonSchema for storing the definition of a JSON schema
@@ -70,6 +73,33 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
7073
}
7174
}
7275

76+
static void checkAvroSchemaTypeSupported(Schema schema) throws InvalidSchemaDataException {
77+
switch (schema.getType()) {
78+
case RECORD: {
79+
break;
80+
}
81+
case UNION: {
82+
throw new InvalidSchemaDataException(
83+
"Avro schema typed [UNION] is not supported");
84+
}
85+
case INT:
86+
case LONG:
87+
case FLOAT:
88+
case DOUBLE:
89+
case BOOLEAN:
90+
case STRING:
91+
case BYTES: {
92+
throw new InvalidSchemaDataException("Please call"
93+
+ " org.apache.pulsar.client.api.Schema." + schema.getType()
94+
+ " when using a simple type schema");
95+
}
96+
default: {
97+
// ARRAY, MAP, FIXED, NULL.
98+
logger.info("Registering a special avro schema typed [{}]", schema.getType());
99+
}
100+
}
101+
}
102+
73103
private static void throwInvalidSchemaDataException(SchemaData schemaData,
74104
Throwable cause) throws InvalidSchemaDataException {
75105
throw new InvalidSchemaDataException("Invalid schema definition data for "

0 commit comments

Comments
 (0)