Skip to content

Commit 89c4bda

Browse files
committed
Add tests to show new behavior
1 parent ac213fa commit 89c4bda

File tree

1 file changed

+308
-0
lines changed

1 file changed

+308
-0
lines changed

tests/rptest/tests/schema_registry_test.py

+308
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import random
2020
import socket
2121

22+
from confluent_kafka import Producer
2223
from confluent_kafka.schema_registry import SchemaRegistryClient, topic_subject_name_strategy, record_subject_name_strategy, topic_record_subject_name_strategy, Schema, SchemaRegistryError, SchemaReference
2324
from confluent_kafka.serialization import (MessageField, SerializationContext)
2425
from ducktape.mark import parametrize, matrix
@@ -103,6 +104,47 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
103104
Simple id = 1;
104105
}"""
105106

107+
schema_a_proto_def = """
108+
syntax = "proto3";
109+
110+
message AType {
111+
float f = 1;
112+
}"""
113+
114+
schema_b_proto_def = """
115+
syntax = "proto3";
116+
117+
import "schema_a.proto";
118+
119+
message BType {
120+
AType at = 1;
121+
}"""
122+
123+
schema_c_proto_def = """
124+
syntax = "proto3";
125+
126+
import "schema_b.proto";
127+
128+
message CType {
129+
BType bt = 1;
130+
}"""
131+
132+
schema_d_proto_def = """
133+
syntax = "proto3";
134+
135+
import "schema_e.proto";
136+
137+
message DType {
138+
EType bt = 1;
139+
}"""
140+
141+
schema_e_proto_def = """
142+
syntax = "proto3";
143+
144+
message EType {
145+
string et = 1;
146+
}"""
147+
106148
well_known_proto_def = """
107149
syntax = "proto3";
108150
@@ -205,6 +247,53 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
205247
json_incompat=r'{"type": "number", "multipleOf": 20}',
206248
)
207249

250+
import_schemas = {
251+
"schema_a": {
252+
"subject": "schema_a",
253+
"version": 1,
254+
"schema": schema_a_proto_def,
255+
},
256+
"schema_b": {
257+
"subject":
258+
"schema_b",
259+
"version":
260+
1,
261+
"references": [{
262+
"name": "schema_a.proto",
263+
"subject": "schema_a",
264+
"version": 1
265+
}],
266+
"schema":
267+
schema_b_proto_def,
268+
},
269+
"schema_c": {
270+
"subject":
271+
"schema_c",
272+
"version":
273+
1,
274+
"references": [{
275+
"name": "schema_b.proto",
276+
"subject": "schema_b",
277+
"version": 1
278+
}],
279+
"schema":
280+
schema_c_proto_def,
281+
},
282+
"schema_d": {
283+
"subject":
284+
"schema_d",
285+
"version":
286+
1,
287+
"references": [{
288+
"name": "schema_e.proto",
289+
"subject": "schema_e",
290+
"version": 1
291+
}],
292+
"schema":
293+
schema_d_proto_def,
294+
},
295+
}
296+
208297
log_config = LoggingConfig('info',
209298
logger_levels={
210299
'security': 'trace',
@@ -802,6 +891,40 @@ class SchemaRegistryTestMethods(SchemaRegistryEndpoints):
802891
def __init__(self, context, **kwargs):
803892
super(SchemaRegistryTestMethods, self).__init__(context, **kwargs)
804893

894+
def _push_to_schemas_topic(self, schemas):
895+
schema_topic = TopicSpec(name="_schemas",
896+
partition_count=1,
897+
replication_factor=1)
898+
self.client().create_topic(schema_topic)
899+
900+
producer = Producer({
901+
"bootstrap.servers": self.redpanda.brokers(),
902+
})
903+
904+
for i_schema, schema in enumerate(schemas):
905+
key = {
906+
"keytype": "SCHEMA",
907+
"subject": schema["subject"],
908+
"version": schema["version"],
909+
"magic": 1,
910+
"seq": i_schema,
911+
"node": 0
912+
}
913+
value = {
914+
"subject": schema["subject"],
915+
"version": schema["version"],
916+
"id": i_schema + 1,
917+
"schemaType": "PROTOBUF",
918+
"schema": schema["schema"],
919+
"deleted": False,
920+
}
921+
if "references" in schema:
922+
value["references"] = schema["references"]
923+
producer.produce(topic="_schemas",
924+
key=json.dumps(key),
925+
value=json.dumps(value))
926+
producer.flush()
927+
805928
@cluster(num_nodes=3)
806929
def test_schemas_types(self):
807930
"""
@@ -2020,6 +2143,191 @@ def test_protobuf(self):
20202143
assert result_raw.status_code == requests.codes.not_found
20212144
assert result_raw.json()["error_code"] == 40402
20222145

2146+
@cluster(num_nodes=3)
2147+
def test_imported_schemas(self):
2148+
"""
2149+
Verify basic protobuf functionality
2150+
"""
2151+
2152+
#Test setup: Simulate import of schemas by writting directly into the _schemas topic
2153+
#Note that trying to commit these schemas in this order, using POST subjects/{subject}/version
2154+
#would fail as schemas 'schema_b' and 'schema_d' have unmet dependencies
2155+
schemas = [
2156+
#schema_a has no dependencies
2157+
import_schemas["schema_a"],
2158+
#schema_c is dependent on schema_b, which is loaded later on
2159+
import_schemas["schema_c"],
2160+
#schema_b is dependent on schema_a, which is already loaded
2161+
import_schemas["schema_b"],
2162+
#schema_d is dependent on schema_e, which is not currently present
2163+
import_schemas["schema_d"],
2164+
]
2165+
2166+
self._push_to_schemas_topic(schemas)
2167+
2168+
#Test /schemas/ids/{id}
2169+
#All ids should return the stored schema, regardless if the schema is valid or not
2170+
for id in range(1, 5):
2171+
result_raw = self._request("GET",
2172+
f"schemas/ids/{id}",
2173+
headers=HTTP_GET_HEADERS)
2174+
assert result_raw.status_code == requests.codes.ok, \
2175+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET schemas/ids/{id}'"
2176+
result = result_raw.json()["schema"]
2177+
expected_result = schemas[id - 1]["schema"]
2178+
#Currently, schemas are not sanitized through this endpoint
2179+
assert result == expected_result, \
2180+
f"Expected:\n{result}\nGot:\n{expected_result}\nfor request 'GET schemas/ids/{id}"
2181+
2182+
#Test /schemas/ids/{id}/versions
2183+
for id in range(1, 5):
2184+
result_raw = self._request("GET",
2185+
f"schemas/ids/{id}/versions",
2186+
headers=HTTP_GET_HEADERS)
2187+
assert result_raw.status_code == requests.codes.ok, \
2188+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET schemas/ids/{id}/versions'"
2189+
2190+
#Test /schemas/ids/{id}/subjects
2191+
for id in range(1, 5):
2192+
result_raw = self._request("GET",
2193+
f"schemas/ids/{id}/subjects",
2194+
headers=HTTP_GET_HEADERS)
2195+
assert result_raw.status_code == requests.codes.ok, \
2196+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET schemas/ids/{id}/subjects"
2197+
2198+
#Test /subjects
2199+
result_raw = self._request("GET",
2200+
f"subjects",
2201+
headers=HTTP_GET_HEADERS)
2202+
assert result_raw.status_code == requests.codes.ok, \
2203+
f"Expected {requests.codes.ok} but got {result_raw.status_code}"
2204+
2205+
#All subjects should be present, regardless if their schemas are valid or not
2206+
expected_subjects = set(
2207+
["schema_a", "schema_b", "schema_c", "schema_d"])
2208+
subjects = set(result_raw.json())
2209+
assert subjects == expected_subjects, f"Expected {expected_subjects} but got {subjects}"
2210+
2211+
def test_subjects_subject(sub, expected_successful):
2212+
lookup_schema = import_schemas[sub]
2213+
schema_def = lookup_schema["schema"]
2214+
references = lookup_schema[
2215+
"references"] if "references" in lookup_schema else []
2216+
result_raw = self._post_subjects_subject(subject=sub,
2217+
data=json.dumps({
2218+
"schema":
2219+
schema_def,
2220+
"schemaType":
2221+
"PROTOBUF",
2222+
"references":
2223+
references
2224+
}))
2225+
if expected_successful:
2226+
assert result_raw.status_code == requests.codes.ok, \
2227+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'POST subjects/{sub}'"
2228+
result = result_raw.json()
2229+
assert result["version"] == 1, \
2230+
f"Expected version 1 but got {result['version']}, for request 'POST subjects/{sub}'"
2231+
else:
2232+
assert result_raw.status_code == 422, \
2233+
f"Expected 422 but got {result_raw.status_code}, for request 'POST subjects/{sub}'"
2234+
2235+
#Test /subjects/{subject}
2236+
for s in ["schema_a", "schema_b", "schema_c"]:
2237+
test_subjects_subject(s, expected_successful=True)
2238+
2239+
#schema_d should fail, as the *input* schema has an unsatisfied dependency on schema_e
2240+
test_subjects_subject("schema_d", expected_successful=False)
2241+
2242+
#Leave this for later... conflicting behaviors
2243+
##Test /subjects/{subject}/versions
2244+
#for s in ["schema_a", "schema_a", "schema_a"]:
2245+
# lookup_schema = import_schemas[s]
2246+
# schema_def = lookup_schema["schema"]
2247+
# references = lookup_schema["references"] if "references" in lookup_schema else []
2248+
# result_raw = self._post_subjects_subject_versions(subject=s,
2249+
# data=json.dumps({
2250+
# "schema":
2251+
# schema_def,
2252+
# "schemaType": "PROTOBUF",
2253+
# "references": references
2254+
# }))
2255+
# assert result_raw.status_code == requests.codes.ok, \
2256+
# f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'POST subjects/{s}/versions'"
2257+
# result = result_raw.json()
2258+
# print(f"Result: {result}")
2259+
# #These calls should find the existing version and not create a new one
2260+
# #assert result["version"] == 1, \
2261+
# # f"Expected version 1 but got {result['version']}, for request 'POST subjects/{s}/versions'"
2262+
2263+
##schema_d should fail, as the *input* schema has an unsatisfied dependency on schema_e
2264+
#for s in ["schema_d"]:
2265+
# lookup_schema = import_schemas[s]
2266+
# schema_def = lookup_schema["schema"]
2267+
# references = lookup_schema["references"] if "references" in lookup_schema else []
2268+
# result_raw = self._post_subjects_subject_versions(subject=s,
2269+
# data=json.dumps({
2270+
# "schema":
2271+
# schema_def,
2272+
# "schemaType": "PROTOBUF",
2273+
# "references": references
2274+
# }))
2275+
# assert result_raw.status_code == 422, \
2276+
# f"Expected 422 but got {result_raw.status_code}, for request 'POST subjects/{s}/versions'"
2277+
2278+
#Test /subjects/{subject}/versions/{version}
2279+
for s in ["schema_a", "schema_b", "schema_c", "schema_d"]:
2280+
lookup_schema = import_schemas[s]
2281+
schema_def = lookup_schema["schema"]
2282+
#references = lookup_schema["references"] if "references" in lookup_schema else []
2283+
result_raw = self._get_subjects_subject_versions_version(subject=s,
2284+
version=1)
2285+
assert result_raw.status_code == requests.codes.ok, \
2286+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET subjects/{s}/versions/1'"
2287+
result = result_raw.json()["schema"]
2288+
assert result == schema_def, \
2289+
f"Expected:\n{result}\nGot:\n{schema_def}\nfor request 'GET subjects/{s}/versions/1'"
2290+
2291+
#Test /subjects/{subject}/versions/{version}/schema
2292+
for s in ["schema_a", "schema_b", "schema_c", "schema_d"]:
2293+
lookup_schema = import_schemas[s]
2294+
schema_def = lookup_schema["schema"]
2295+
result_raw = self._request("GET",
2296+
f"subjects/{s}/versions/1/schema",
2297+
headers=HTTP_GET_HEADERS)
2298+
assert result_raw.status_code == requests.codes.ok, \
2299+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET subjects/{s}/versions/1/schema'"
2300+
result = result_raw.content.decode()
2301+
assert result == schema_def, \
2302+
f"Expected:\n{result}\nGot:\n{schema_def}\nfor request 'GET subjects/{s}/versions/1/schema'"
2303+
2304+
#Test /subjects/{subject}/versions/{version}/referencedby
2305+
def test_referenced_by(sub, expected_result):
2306+
result_raw = self._get_subjects_subject_versions_version_referenced_by(
2307+
sub, 1)
2308+
assert result_raw.status_code == requests.codes.ok, \
2309+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'GET subjects/{sub}/versions/1/referencedby'"
2310+
result = result_raw.json()
2311+
assert result == expected_result, \
2312+
f"Expected {expected_result} but got {result}, for request 'GET subjects/{sub}/versions/1/referencedby'"
2313+
2314+
test_referenced_by("schema_a", [3])
2315+
test_referenced_by("schema_b", [2])
2316+
test_referenced_by("schema_c", [])
2317+
test_referenced_by("schema_d", [])
2318+
2319+
#Insert missing dependency, schema_e, and retry the failed schema_d requests
2320+
result_raw = self._post_subjects_subject_versions(
2321+
subject="schema_e",
2322+
data=json.dumps({
2323+
"schema": schema_e_proto_def,
2324+
"schemaType": "PROTOBUF"
2325+
}))
2326+
assert result_raw.status_code == requests.codes.ok, \
2327+
f"Expected {requests.codes.ok} but got {result_raw.status_code}, for request 'POST subjects/schema_e/versions'"
2328+
test_referenced_by("schema_e", [4])
2329+
test_subjects_subject("schema_d", expected_successful=True)
2330+
20232331
@cluster(num_nodes=3)
20242332
def test_json(self):
20252333
"""

0 commit comments

Comments
 (0)