|
31 | 31 | import com.google.common.collect.Sets;
|
32 | 32 | import java.io.ByteArrayInputStream;
|
33 | 33 | import java.io.Serializable;
|
| 34 | +import java.lang.reflect.Method; |
34 | 35 | import java.nio.charset.StandardCharsets;
|
35 | 36 | import java.util.ArrayList;
|
36 | 37 | import java.util.Collections;
|
|
44 | 45 | import java.util.concurrent.TimeUnit;
|
45 | 46 | import java.util.concurrent.atomic.AtomicInteger;
|
46 | 47 | import java.util.stream.Collectors;
|
| 48 | +import javax.ws.rs.client.InvocationCallback; |
| 49 | +import javax.ws.rs.client.WebTarget; |
| 50 | +import javax.ws.rs.core.HttpHeaders; |
| 51 | +import javax.ws.rs.core.MultivaluedMap; |
47 | 52 | import lombok.Cleanup;
|
48 | 53 | import lombok.EqualsAndHashCode;
|
49 | 54 | import lombok.extern.slf4j.Slf4j;
|
50 | 55 | import org.apache.avro.Schema.Parser;
|
51 | 56 | import org.apache.bookkeeper.client.BKException;
|
52 | 57 | import org.apache.bookkeeper.client.BookKeeper;
|
| 58 | +import org.apache.pulsar.broker.BrokerTestUtil; |
53 | 59 | import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
|
54 | 60 | import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
|
55 | 61 | import org.apache.pulsar.broker.service.schema.SchemaRegistry;
|
56 | 62 | import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
|
57 | 63 | import org.apache.pulsar.client.admin.PulsarAdminException;
|
| 64 | +import org.apache.pulsar.client.admin.internal.SchemasImpl; |
58 | 65 | import org.apache.pulsar.client.api.Consumer;
|
59 | 66 | import org.apache.pulsar.client.api.Message;
|
60 | 67 | import org.apache.pulsar.client.api.Producer;
|
@@ -1257,6 +1264,40 @@ public void testCreateSchemaInParallel() throws Exception {
|
1257 | 1264 | executor.shutdownNow();
|
1258 | 1265 | }
|
1259 | 1266 |
|
| 1267 | + @Test |
| 1268 | + public void testHTTPGetSchema() throws Exception { |
| 1269 | + final String namespace = "test-namespace-" + randomName(16); |
| 1270 | + String ns = PUBLIC_TENANT + "/" + namespace; |
| 1271 | + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); |
| 1272 | + final String topic = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); |
| 1273 | + // create a schema. |
| 1274 | + pulsarClient.newProducer(Schema.STRING).topic(topic).create().close(); |
| 1275 | + |
| 1276 | + // call get schemas. |
| 1277 | + SchemasImpl schemas = (SchemasImpl) admin.schemas(); |
| 1278 | + Method methodSchemasPath = SchemasImpl.class.getDeclaredMethod("schemasPath", new Class[]{TopicName.class}); |
| 1279 | + methodSchemasPath.setAccessible(true); |
| 1280 | + WebTarget path = (WebTarget) methodSchemasPath.invoke(schemas, TopicName.get(topic)); |
| 1281 | + CompletableFuture<javax.ws.rs.core.Response> response = new CompletableFuture(); |
| 1282 | + schemas.asyncGetRequest(path, new InvocationCallback<javax.ws.rs.core.Response>() { |
| 1283 | + |
| 1284 | + @Override |
| 1285 | + public void completed(javax.ws.rs.core.Response getSchemaResponse) { |
| 1286 | + response.complete(getSchemaResponse); |
| 1287 | + } |
| 1288 | + |
| 1289 | + @Override |
| 1290 | + public void failed(Throwable throwable) { |
| 1291 | + response.completeExceptionally(throwable); |
| 1292 | + } |
| 1293 | + }); |
| 1294 | + MultivaluedMap<String, Object> responseHeaders = response.join().getHeaders(); |
| 1295 | + assertTrue(!responseHeaders.containsKey(HttpHeaders.CONTENT_ENCODING) |
| 1296 | + || !responseHeaders.get(HttpHeaders.CONTENT_ENCODING).toString().contains("application/json")); |
| 1297 | + assertTrue(responseHeaders.containsKey(HttpHeaders.CONTENT_TYPE) |
| 1298 | + && responseHeaders.get(HttpHeaders.CONTENT_TYPE).toString().contains("application/json")); |
| 1299 | + } |
| 1300 | + |
1260 | 1301 | @EqualsAndHashCode
|
1261 | 1302 | static class User implements Serializable {
|
1262 | 1303 | private String name;
|
|
0 commit comments