Skip to content

Commit c67deb1

Browse files
committed
-
1 parent ba895f0 commit c67deb1

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMet
258258
response.resume(Response.status(
259259
Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
260260
} else {
261-
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
261+
response.resume(Response.ok()
262262
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
263263
}
264264
} else {
@@ -275,7 +275,7 @@ private static void handleGetAllSchemasResponse(AsyncResponse response, List<Sch
275275
response.resume(Response.status(
276276
Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
277277
} else {
278-
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
278+
response.resume(Response.ok()
279279
.entity(GetAllVersionsSchemaResponse.builder()
280280
.getSchemaResponses(schemas.stream()
281281
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.common.collect.Sets;
3232
import java.io.ByteArrayInputStream;
3333
import java.io.Serializable;
34+
import java.lang.reflect.Method;
3435
import java.nio.charset.StandardCharsets;
3536
import java.util.ArrayList;
3637
import java.util.Collections;
@@ -44,17 +45,23 @@
4445
import java.util.concurrent.TimeUnit;
4546
import java.util.concurrent.atomic.AtomicInteger;
4647
import java.util.stream.Collectors;
48+
import javax.ws.rs.client.InvocationCallback;
49+
import javax.ws.rs.client.WebTarget;
50+
import javax.ws.rs.core.HttpHeaders;
51+
import javax.ws.rs.core.MultivaluedMap;
4752
import lombok.Cleanup;
4853
import lombok.EqualsAndHashCode;
4954
import lombok.extern.slf4j.Slf4j;
5055
import org.apache.avro.Schema.Parser;
5156
import org.apache.bookkeeper.client.BKException;
5257
import org.apache.bookkeeper.client.BookKeeper;
58+
import org.apache.pulsar.broker.BrokerTestUtil;
5359
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
5460
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
5561
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
5662
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
5763
import org.apache.pulsar.client.admin.PulsarAdminException;
64+
import org.apache.pulsar.client.admin.internal.SchemasImpl;
5865
import org.apache.pulsar.client.api.Consumer;
5966
import org.apache.pulsar.client.api.Message;
6067
import org.apache.pulsar.client.api.Producer;
@@ -1257,6 +1264,40 @@ public void testCreateSchemaInParallel() throws Exception {
12571264
executor.shutdownNow();
12581265
}
12591266

1267+
@Test
1268+
public void testHTTPGetSchema() throws Exception {
1269+
final String namespace = "test-namespace-" + randomName(16);
1270+
String ns = PUBLIC_TENANT + "/" + namespace;
1271+
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
1272+
final String topic = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
1273+
// create a schema.
1274+
pulsarClient.newProducer(Schema.STRING).topic(topic).create().close();
1275+
1276+
// call get schemas.
1277+
SchemasImpl schemas = (SchemasImpl) admin.schemas();
1278+
Method methodSchemasPath = SchemasImpl.class.getDeclaredMethod("schemasPath", new Class[]{TopicName.class});
1279+
methodSchemasPath.setAccessible(true);
1280+
WebTarget path = (WebTarget) methodSchemasPath.invoke(schemas, TopicName.get(topic));
1281+
CompletableFuture<javax.ws.rs.core.Response> response = new CompletableFuture();
1282+
schemas.asyncGetRequest(path, new InvocationCallback<javax.ws.rs.core.Response>() {
1283+
1284+
@Override
1285+
public void completed(javax.ws.rs.core.Response getSchemaResponse) {
1286+
response.complete(getSchemaResponse);
1287+
}
1288+
1289+
@Override
1290+
public void failed(Throwable throwable) {
1291+
response.completeExceptionally(throwable);
1292+
}
1293+
});
1294+
MultivaluedMap<String, Object> responseHeaders = response.join().getHeaders();
1295+
assertTrue(!responseHeaders.containsKey(HttpHeaders.CONTENT_ENCODING)
1296+
|| !responseHeaders.get(HttpHeaders.CONTENT_ENCODING).toString().contains("application/json"));
1297+
assertTrue(responseHeaders.containsKey(HttpHeaders.CONTENT_TYPE)
1298+
&& responseHeaders.get(HttpHeaders.CONTENT_TYPE).toString().contains("application/json"));
1299+
}
1300+
12601301
@EqualsAndHashCode
12611302
static class User implements Serializable {
12621303
private String name;

0 commit comments

Comments
 (0)