|
29 | 29 | import java.util.concurrent.CompletableFuture;
|
30 | 30 | import java.util.stream.Collectors;
|
31 | 31 | import javax.ws.rs.core.Response;
|
32 |
| -import org.apache.avro.Schema; |
33 | 32 | import org.apache.commons.lang3.tuple.Pair;
|
34 | 33 | import org.apache.pulsar.broker.admin.AdminResource;
|
35 | 34 | import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
|
36 | 35 | import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
|
37 | 36 | import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
|
38 | 37 | import org.apache.pulsar.broker.web.RestException;
|
39 | 38 | import org.apache.pulsar.client.impl.schema.SchemaUtils;
|
40 |
| -import org.apache.pulsar.client.impl.schema.util.SchemaUtil; |
41 | 39 | import org.apache.pulsar.client.internal.DefaultImplementation;
|
42 | 40 | import org.apache.pulsar.common.naming.TopicName;
|
43 | 41 | import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
|
@@ -125,46 +123,8 @@ public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative,
|
125 | 123 | });
|
126 | 124 | }
|
127 | 125 |
|
128 |
| - protected CompletableFuture<Void> checkSchemaTypeSupported(PostSchemaPayload payload) { |
129 |
| - switch (SchemaType.valueOf(payload.getType())) { |
130 |
| - case AVRO : { |
131 |
| - Schema schema = SchemaUtil.parseAvroSchema(payload.getSchema()); |
132 |
| - switch (schema.getType()) { |
133 |
| - case RECORD: { |
134 |
| - break; |
135 |
| - } |
136 |
| - case UNION: { |
137 |
| - return CompletableFuture.failedFuture(new RestException( |
138 |
| - Response.Status.BAD_REQUEST.getStatusCode(), |
139 |
| - "[" + String.valueOf(topicName) + "] Avro schema typed [UNION] is not supported")); |
140 |
| - } |
141 |
| - case INT: |
142 |
| - case LONG: |
143 |
| - case FLOAT: |
144 |
| - case DOUBLE: |
145 |
| - case BOOLEAN: |
146 |
| - case STRING: |
147 |
| - case BYTES: { |
148 |
| - return CompletableFuture.failedFuture(new RestException( |
149 |
| - Response.Status.BAD_REQUEST.getStatusCode(), |
150 |
| - "[" + String.valueOf(topicName) + "] Please call" |
151 |
| - + " org.apache.pulsar.client.api.Schema." + schema.getType() |
152 |
| - + " when using a simple type schema")); |
153 |
| - } |
154 |
| - default: { |
155 |
| - // ARRAY, MAP, FIXED, NULL. |
156 |
| - log.info("[{}] is using a special schema typed [{}]", |
157 |
| - String.valueOf(topicName), schema.getType()); |
158 |
| - } |
159 |
| - } |
160 |
| - } |
161 |
| - } |
162 |
| - return CompletableFuture.completedFuture(null); |
163 |
| - } |
164 |
| - |
165 | 126 | public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
|
166 |
| - return checkSchemaTypeSupported(payload) |
167 |
| - .thenCompose(__ -> validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)) |
| 127 | + return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE) |
168 | 128 | .thenCompose(__ -> getSchemaCompatibilityStrategyAsyncWithoutAuth())
|
169 | 129 | .thenCompose(schemaCompatibilityStrategy -> {
|
170 | 130 | byte[] data;
|
|
0 commit comments