Skip to content

Commit 4dc666e

Browse files
committed
Replace canonical schemas for unparsed schemas stored in store
1 parent 0e9dc55 commit 4dc666e

12 files changed

+98
-83
lines changed

src/v/pandaproxy/schema_registry/sharded_store.cc

+26-23
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ sharded_store::get_schema_version(subject_schema schema) {
137137
// Validate the schema (may throw)
138138
co_await validate_schema(schema.schema.share());
139139

140+
auto unparsed_def = to_unparsed(schema.schema.def().share());
140141
// Determine if the definition already exists
141-
auto map = [&schema](store& s) {
142-
return s.get_schema_id(schema.schema.def());
142+
auto map = [&unparsed_def](store& s) {
143+
return s.get_schema_id(unparsed_def);
143144
};
144145
auto reduce = [](
145146
std::optional<schema_id> acc,
@@ -246,22 +247,7 @@ ss::future<bool> sharded_store::upsert(
246247
schema_id id,
247248
schema_version version,
248249
is_deleted deleted) {
249-
auto norm = normalize{
250-
config::shard_local_cfg().schema_registry_normalize_on_startup()};
251-
co_return co_await upsert(
252-
marker,
253-
co_await make_canonical_schema(std::move(schema), norm),
254-
id,
255-
version,
256-
deleted);
257-
}
258-
259-
ss::future<bool> sharded_store::upsert(
260-
seq_marker marker,
261-
canonical_schema schema,
262-
schema_id id,
263-
schema_version version,
264-
is_deleted deleted) {
250+
// TODO(ik):Normalization on startup??
265251
auto [sub, def] = std::move(schema).destructure();
266252
co_await upsert_schema(id, std::move(def));
267253
co_return co_await upsert_subject(
@@ -324,10 +310,10 @@ ss::future<subject_schema> sharded_store::has_schema(
324310

325311
ss::future<std::optional<canonical_schema_definition>>
326312
sharded_store::maybe_get_schema_definition(schema_id id) {
327-
co_return co_await _store.invoke_on(
313+
auto unparsed = co_await _store.invoke_on(
328314
shard_for(id),
329315
_smp_opts,
330-
[id](store& s) -> std::optional<canonical_schema_definition> {
316+
[id](store& s) -> std::optional<unparsed_schema_definition> {
331317
auto s_res = s.get_schema_definition(id);
332318
if (
333319
s_res.has_error()
@@ -336,14 +322,28 @@ sharded_store::maybe_get_schema_definition(schema_id id) {
336322
}
337323
return std::move(s_res.value());
338324
});
325+
if (!unparsed.has_value()) {
326+
co_return std::nullopt;
327+
}
328+
329+
// TODO(ik): This may throw. Intercept exception
330+
auto canonical = co_await make_canonical_schema(
331+
{{}, std::move(unparsed.value())});
332+
333+
co_return std::move(canonical).def();
339334
}
340335

341336
ss::future<canonical_schema_definition>
342337
sharded_store::get_schema_definition(schema_id id) {
343-
co_return co_await _store.invoke_on(
338+
auto unparsed = co_await _store.invoke_on(
344339
shard_for(id), _smp_opts, [id](store& s) {
345340
return s.get_schema_definition(id).value();
346341
});
342+
343+
// TODO(ik): This may throw. Intercept exception
344+
auto canonical = co_await make_canonical_schema({{}, std::move(unparsed)});
345+
346+
co_return std::move(canonical).def();
347347
}
348348

349349
ss::future<chunked_vector<subject_version>>
@@ -387,8 +387,11 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
387387
return s.get_schema_definition(id).value();
388388
});
389389

390+
// TODO(ik): this may throw. Intercept exception
391+
auto canonical = co_await make_canonical_schema({sub, std::move(def)});
392+
390393
co_return subject_schema{
391-
.schema = {sub, std::move(def)},
394+
.schema = std::move(canonical),
392395
.version = v_id.version,
393396
.id = v_id.id,
394397
.deleted = v_id.deleted};
@@ -649,7 +652,7 @@ sharded_store::clear_compatibility(seq_marker marker, subject sub) {
649652
}
650653

651654
ss::future<bool>
652-
sharded_store::upsert_schema(schema_id id, canonical_schema_definition def) {
655+
sharded_store::upsert_schema(schema_id id, unparsed_schema_definition def) {
653656
co_await maybe_update_max_schema_id(id);
654657
co_return co_await _store.invoke_on(
655658
shard_for(id), _smp_opts, [id, def{std::move(def)}](store& s) mutable {

src/v/pandaproxy/schema_registry/sharded_store.h

+1-8
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,6 @@ class sharded_store final : public schema_getter {
5555
};
5656
ss::future<insert_result> project_ids(subject_schema schema);
5757

58-
ss::future<bool> upsert(
59-
seq_marker marker,
60-
canonical_schema schema,
61-
schema_id id,
62-
schema_version version,
63-
is_deleted deleted);
64-
6558
ss::future<bool> upsert(
6659
seq_marker marker,
6760
unparsed_schema schema,
@@ -212,7 +205,7 @@ class sharded_store final : public schema_getter {
212205
schema_version version, canonical_schema new_schema, verbose is_verbose);
213206

214207
ss::future<bool>
215-
upsert_schema(schema_id id, canonical_schema_definition def);
208+
upsert_schema(schema_id id, unparsed_schema_definition def);
216209
ss::future<> delete_schema(schema_id id);
217210

218211
struct insert_subject_result {

src/v/pandaproxy/schema_registry/store.h

+9-9
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ class store {
8787
/// version.
8888
///
8989
/// return the schema_version and schema_id, and whether it's new.
90-
insert_result insert(canonical_schema schema) {
90+
insert_result insert(unparsed_schema schema) {
9191
auto [sub, def] = std::move(schema).destructure();
9292
auto id = insert_schema(std::move(def)).id;
9393
auto [version, inserted] = insert_subject(std::move(sub), id);
9494
return {version, id, inserted};
9595
}
9696

9797
///\brief Return a schema definition by id.
98-
result<canonical_schema_definition>
98+
result<unparsed_schema_definition>
9999
get_schema_definition(const schema_id& id) const {
100100
auto it = _schemas.find(id);
101101
if (it == _schemas.end()) {
@@ -106,7 +106,7 @@ class store {
106106

107107
///\brief Return the id of the schema, if it already exists.
108108
std::optional<schema_id>
109-
get_schema_id(const canonical_schema_definition& def) const {
109+
get_schema_id(const unparsed_schema_definition& def) const {
110110
const auto s_it = std::find_if(
111111
_schemas.begin(), _schemas.end(), [&](const auto& s) {
112112
const auto& entry = s.second;
@@ -169,7 +169,7 @@ class store {
169169
}
170170

171171
///\brief Return a schema by subject and version.
172-
result<subject_schema> get_subject_schema(
172+
result<unparsed_subject_schema> get_subject_schema(
173173
const subject& sub,
174174
std::optional<schema_version> version,
175175
include_deleted inc_del) const {
@@ -178,7 +178,7 @@ class store {
178178

179179
auto def = BOOST_OUTCOME_TRYX(get_schema_definition(v_id.id));
180180

181-
return subject_schema{
181+
return unparsed_subject_schema{
182182
.schema = {sub, std::move(def)},
183183
.version = v_id.version,
184184
.id = v_id.id,
@@ -626,7 +626,7 @@ class store {
626626
schema_id id;
627627
bool inserted;
628628
};
629-
insert_schema_result insert_schema(canonical_schema_definition def) {
629+
insert_schema_result insert_schema(unparsed_schema_definition def) {
630630
const auto s_it = std::find_if(
631631
_schemas.begin(), _schemas.end(), [&](const auto& s) {
632632
const auto& entry = s.second;
@@ -642,7 +642,7 @@ class store {
642642
return {id, inserted};
643643
}
644644

645-
bool upsert_schema(schema_id id, canonical_schema_definition def) {
645+
bool upsert_schema(schema_id id, unparsed_schema_definition def) {
646646
return _schemas.insert_or_assign(id, schema_entry(std::move(def)))
647647
.second;
648648
}
@@ -785,10 +785,10 @@ class store {
785785

786786
private:
787787
struct schema_entry {
788-
explicit schema_entry(canonical_schema_definition definition)
788+
explicit schema_entry(unparsed_schema_definition definition)
789789
: definition{std::move(definition)} {}
790790

791-
canonical_schema_definition definition;
791+
unparsed_schema_definition definition;
792792
};
793793

794794
class subject_entry {

src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc

+14-14
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ struct simple_sharded_store {
4545
simple_sharded_store& operator=(simple_sharded_store&&) = delete;
4646

4747
pps::schema_id
48-
insert(const pps::canonical_schema& schema, pps::schema_version version) {
48+
insert(const pps::unparsed_schema& schema, pps::schema_version version) {
4949
const auto id = next_id++;
5050
store
5151
.upsert(
@@ -73,9 +73,9 @@ bool check_compatible(
7373
simple_sharded_store store;
7474
store.store.set_compatibility(lvl).get();
7575
store.insert(
76-
pandaproxy::schema_registry::canonical_schema{
76+
pandaproxy::schema_registry::unparsed_schema{
7777
pps::subject{"sub"},
78-
pps::canonical_schema_definition{writer, pps::schema_type::protobuf}},
78+
pps::unparsed_schema_definition{writer, pps::schema_type::protobuf}},
7979
pps::schema_version{1});
8080
return store.store
8181
.is_compatible(
@@ -107,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_simple) {
107107

108108
auto schema1 = pps::canonical_schema{
109109
pps::subject{"simple"}, simple.share()};
110-
store.insert(schema1, pps::schema_version{1});
110+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
111111
auto valid_simple = pps::make_protobuf_schema_definition(
112112
store.store, schema1.share())
113113
.get();
@@ -119,7 +119,7 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_nested) {
119119

120120
auto schema1 = pps::canonical_schema{
121121
pps::subject{"nested"}, nested.share()};
122-
store.insert(schema1, pps::schema_version{1});
122+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
123123
auto valid_nested = pps::make_protobuf_schema_definition(
124124
store.store, schema1.share())
125125
.get();
@@ -134,7 +134,7 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) {
134134
// imported depends on simple, which han't been inserted
135135
auto schema1 = pps::canonical_schema{
136136
pps::subject{"imported"}, imported.share()};
137-
store.insert(schema1, pps::schema_version{1});
137+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
138138
BOOST_REQUIRE_EXCEPTION(
139139
pps::make_protobuf_schema_definition(store.store, schema1.share()).get(),
140140
pps::exception,
@@ -151,7 +151,7 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) {
151151
auto schema2 = pps::canonical_schema{
152152
pps::subject{"imported"}, imported_no_ref.share()};
153153

154-
store.insert(schema1, pps::schema_version{1});
154+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
155155

156156
auto valid_simple = pps::make_protobuf_schema_definition(
157157
store.store, schema1.share())
@@ -174,9 +174,9 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_referenced) {
174174
auto schema3 = pps::canonical_schema{
175175
pps::subject{"imported-again.proto"}, imported_again.share()};
176176

177-
store.insert(schema1, pps::schema_version{1});
178-
store.insert(schema2, pps::schema_version{1});
179-
store.insert(schema3, pps::schema_version{1});
177+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
178+
store.insert(pps::to_unparsed(schema2.share()), pps::schema_version{1});
179+
store.insert(pps::to_unparsed(schema3.share()), pps::schema_version{1});
180180

181181
auto valid_simple = pps::make_protobuf_schema_definition(
182182
store.store, schema1.share())
@@ -199,9 +199,9 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_recursive_reference) {
199199
auto schema3 = pps::canonical_schema{
200200
pps::subject{"imported-twice.proto"}, imported_twice.share()};
201201

202-
store.insert(schema1, pps::schema_version{1});
203-
store.insert(schema2, pps::schema_version{1});
204-
store.insert(schema3, pps::schema_version{1});
202+
store.insert(pps::to_unparsed(schema1.share()), pps::schema_version{1});
203+
store.insert(pps::to_unparsed(schema2.share()), pps::schema_version{1});
204+
store.insert(pps::to_unparsed(schema3.share()), pps::schema_version{1});
205205

206206
auto valid_simple = pps::make_protobuf_schema_definition(
207207
store.store, schema1.share())
@@ -338,7 +338,7 @@ message well_known_types {
338338
confluent.type.Decimal c_decimal = 48;
339339
})",
340340
pps::schema_type::protobuf}};
341-
store.insert(schema, pps::schema_version{1});
341+
store.insert(pps::to_unparsed(schema.share()), pps::schema_version{1});
342342

343343
auto valid_empty
344344
= pps::make_protobuf_schema_definition(store.store, schema.share()).get();

src/v/pandaproxy/schema_registry/test/compatibility_store.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) {
3333
auto sub = pps::subject{"sub"};
3434
s.upsert(
3535
dummy_marker,
36-
{sub, schema1.share()},
36+
pps::to_unparsed({sub, schema1.share()}),
3737
pps::schema_id{1},
3838
pps::schema_version{1},
3939
pps::is_deleted::no)
@@ -43,7 +43,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) {
4343
s.is_compatible(pps::schema_version{1}, {sub, schema2.share()}).get());
4444
s.upsert(
4545
dummy_marker,
46-
{sub, schema2.share()},
46+
pps::to_unparsed({sub, schema2.share()}),
4747
pps::schema_id{2},
4848
pps::schema_version{2},
4949
pps::is_deleted::no)
@@ -56,7 +56,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) {
5656
// Insert schema with non-defaulted field
5757
s.upsert(
5858
dummy_marker,
59-
{sub, schema2.share()},
59+
pps::to_unparsed({sub, schema2.share()}),
6060
pps::schema_id{2},
6161
pps::schema_version{2},
6262
pps::is_deleted::no)

src/v/pandaproxy/schema_registry/test/mt_sharded_store.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_cross_shard_def) {
3535
// Upsert a large(ish) number of schemas to the store, all with different
3636
// subject names and IDs, so they should hash to different shards.
3737
for (int i = 1; i <= id_n; ++i) {
38-
auto referenced_schema = pps::canonical_schema{
39-
pps::subject{fmt::format("simple_{}.proto", i)}, simple.share()};
38+
auto referenced_schema = pps::to_unparsed(
39+
{pps::subject{fmt::format("simple_{}.proto", i)}, simple.share()});
4040
store
4141
.upsert(
4242
pps::seq_marker{

src/v/pandaproxy/schema_registry/test/protobuf_rendering.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ struct simple_sharded_store {
4040
simple_sharded_store& operator=(simple_sharded_store&&) = delete;
4141

4242
pps::schema_id
43-
insert(const pps::canonical_schema& schema, pps::schema_version version) {
43+
insert(const pps::unparsed_schema& schema, pps::schema_version version) {
4444
const auto id = next_id++;
4545
store
4646
.upsert(

src/v/pandaproxy/schema_registry/test/sharded_store.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) {
3131
const pps::schema_version ver1{1};
3232

3333
// Insert simple
34-
auto referenced_schema = pps::canonical_schema{
35-
pps::subject{"simple.proto"}, simple.share()};
34+
auto referenced_schema = pps::to_unparsed(
35+
{pps::subject{"simple.proto"}, simple.share()});
3636
store
3737
.upsert(
3838
pps::seq_marker{
@@ -44,8 +44,8 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) {
4444
.get();
4545

4646
// Insert referenced
47-
auto importing_schema = pps::canonical_schema{
48-
pps::subject{"imported.proto"}, imported.share()};
47+
auto importing_schema = pps::to_unparsed(
48+
{pps::subject{"imported.proto"}, imported.share()});
4949

5050
store
5151
.upsert(

0 commit comments

Comments
 (0)