Skip to content

Commit 616f221

Browse files
committed
Replace canonical schemas for unparsed schemas stored in store
1 parent 0e9dc55 commit 616f221

File tree

6 files changed

+73
-58
lines changed

6 files changed

+73
-58
lines changed

src/v/pandaproxy/schema_registry/sharded_store.cc

+26-23
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ sharded_store::get_schema_version(subject_schema schema) {
137137
// Validate the schema (may throw)
138138
co_await validate_schema(schema.schema.share());
139139

140+
auto unparsed_def = to_unparsed(schema.schema.def().share());
140141
// Determine if the definition already exists
141-
auto map = [&schema](store& s) {
142-
return s.get_schema_id(schema.schema.def());
142+
auto map = [&unparsed_def](store& s) {
143+
return s.get_schema_id(unparsed_def);
143144
};
144145
auto reduce = [](
145146
std::optional<schema_id> acc,
@@ -246,22 +247,7 @@ ss::future<bool> sharded_store::upsert(
246247
schema_id id,
247248
schema_version version,
248249
is_deleted deleted) {
249-
auto norm = normalize{
250-
config::shard_local_cfg().schema_registry_normalize_on_startup()};
251-
co_return co_await upsert(
252-
marker,
253-
co_await make_canonical_schema(std::move(schema), norm),
254-
id,
255-
version,
256-
deleted);
257-
}
258-
259-
ss::future<bool> sharded_store::upsert(
260-
seq_marker marker,
261-
canonical_schema schema,
262-
schema_id id,
263-
schema_version version,
264-
is_deleted deleted) {
250+
// TODO(ik):Normalization on startup??
265251
auto [sub, def] = std::move(schema).destructure();
266252
co_await upsert_schema(id, std::move(def));
267253
co_return co_await upsert_subject(
@@ -324,10 +310,10 @@ ss::future<subject_schema> sharded_store::has_schema(
324310

325311
ss::future<std::optional<canonical_schema_definition>>
326312
sharded_store::maybe_get_schema_definition(schema_id id) {
327-
co_return co_await _store.invoke_on(
313+
auto unparsed = co_await _store.invoke_on(
328314
shard_for(id),
329315
_smp_opts,
330-
[id](store& s) -> std::optional<canonical_schema_definition> {
316+
[id](store& s) -> std::optional<unparsed_schema_definition> {
331317
auto s_res = s.get_schema_definition(id);
332318
if (
333319
s_res.has_error()
@@ -336,14 +322,28 @@ sharded_store::maybe_get_schema_definition(schema_id id) {
336322
}
337323
return std::move(s_res.value());
338324
});
325+
if (!unparsed.has_value()) {
326+
co_return std::nullopt;
327+
}
328+
329+
// TODO(ik): This may throw. Intercept exception
330+
auto canonical = co_await make_canonical_schema(
331+
{{}, std::move(unparsed.value())});
332+
333+
co_return std::move(canonical).def();
339334
}
340335

341336
ss::future<canonical_schema_definition>
342337
sharded_store::get_schema_definition(schema_id id) {
343-
co_return co_await _store.invoke_on(
338+
auto unparsed = co_await _store.invoke_on(
344339
shard_for(id), _smp_opts, [id](store& s) {
345340
return s.get_schema_definition(id).value();
346341
});
342+
343+
// TODO(ik): This may throw. Intercept exception
344+
auto canonical = co_await make_canonical_schema({{}, std::move(unparsed)});
345+
346+
co_return std::move(canonical).def();
347347
}
348348

349349
ss::future<chunked_vector<subject_version>>
@@ -387,8 +387,11 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
387387
return s.get_schema_definition(id).value();
388388
});
389389

390+
// TODO(ik): this may throw. Intercept exception
391+
auto canonical = co_await make_canonical_schema({sub, std::move(def)});
392+
390393
co_return subject_schema{
391-
.schema = {sub, std::move(def)},
394+
.schema = std::move(canonical),
392395
.version = v_id.version,
393396
.id = v_id.id,
394397
.deleted = v_id.deleted};
@@ -649,7 +652,7 @@ sharded_store::clear_compatibility(seq_marker marker, subject sub) {
649652
}
650653

651654
ss::future<bool>
652-
sharded_store::upsert_schema(schema_id id, canonical_schema_definition def) {
655+
sharded_store::upsert_schema(schema_id id, unparsed_schema_definition def) {
653656
co_await maybe_update_max_schema_id(id);
654657
co_return co_await _store.invoke_on(
655658
shard_for(id), _smp_opts, [id, def{std::move(def)}](store& s) mutable {

src/v/pandaproxy/schema_registry/sharded_store.h

+1-8
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,6 @@ class sharded_store final : public schema_getter {
5555
};
5656
ss::future<insert_result> project_ids(subject_schema schema);
5757

58-
ss::future<bool> upsert(
59-
seq_marker marker,
60-
canonical_schema schema,
61-
schema_id id,
62-
schema_version version,
63-
is_deleted deleted);
64-
6558
ss::future<bool> upsert(
6659
seq_marker marker,
6760
unparsed_schema schema,
@@ -212,7 +205,7 @@ class sharded_store final : public schema_getter {
212205
schema_version version, canonical_schema new_schema, verbose is_verbose);
213206

214207
ss::future<bool>
215-
upsert_schema(schema_id id, canonical_schema_definition def);
208+
upsert_schema(schema_id id, unparsed_schema_definition def);
216209
ss::future<> delete_schema(schema_id id);
217210

218211
struct insert_subject_result {

src/v/pandaproxy/schema_registry/store.h

+9-9
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ class store {
8787
/// version.
8888
///
8989
/// return the schema_version and schema_id, and whether it's new.
90-
insert_result insert(canonical_schema schema) {
90+
insert_result insert(unparsed_schema schema) {
9191
auto [sub, def] = std::move(schema).destructure();
9292
auto id = insert_schema(std::move(def)).id;
9393
auto [version, inserted] = insert_subject(std::move(sub), id);
9494
return {version, id, inserted};
9595
}
9696

9797
///\brief Return a schema definition by id.
98-
result<canonical_schema_definition>
98+
result<unparsed_schema_definition>
9999
get_schema_definition(const schema_id& id) const {
100100
auto it = _schemas.find(id);
101101
if (it == _schemas.end()) {
@@ -106,7 +106,7 @@ class store {
106106

107107
///\brief Return the id of the schema, if it already exists.
108108
std::optional<schema_id>
109-
get_schema_id(const canonical_schema_definition& def) const {
109+
get_schema_id(const unparsed_schema_definition& def) const {
110110
const auto s_it = std::find_if(
111111
_schemas.begin(), _schemas.end(), [&](const auto& s) {
112112
const auto& entry = s.second;
@@ -169,7 +169,7 @@ class store {
169169
}
170170

171171
///\brief Return a schema by subject and version.
172-
result<subject_schema> get_subject_schema(
172+
result<unparsed_subject_schema> get_subject_schema(
173173
const subject& sub,
174174
std::optional<schema_version> version,
175175
include_deleted inc_del) const {
@@ -178,7 +178,7 @@ class store {
178178

179179
auto def = BOOST_OUTCOME_TRYX(get_schema_definition(v_id.id));
180180

181-
return subject_schema{
181+
return unparsed_subject_schema{
182182
.schema = {sub, std::move(def)},
183183
.version = v_id.version,
184184
.id = v_id.id,
@@ -626,7 +626,7 @@ class store {
626626
schema_id id;
627627
bool inserted;
628628
};
629-
insert_schema_result insert_schema(canonical_schema_definition def) {
629+
insert_schema_result insert_schema(unparsed_schema_definition def) {
630630
const auto s_it = std::find_if(
631631
_schemas.begin(), _schemas.end(), [&](const auto& s) {
632632
const auto& entry = s.second;
@@ -642,7 +642,7 @@ class store {
642642
return {id, inserted};
643643
}
644644

645-
bool upsert_schema(schema_id id, canonical_schema_definition def) {
645+
bool upsert_schema(schema_id id, unparsed_schema_definition def) {
646646
return _schemas.insert_or_assign(id, schema_entry(std::move(def)))
647647
.second;
648648
}
@@ -785,10 +785,10 @@ class store {
785785

786786
private:
787787
struct schema_entry {
788-
explicit schema_entry(canonical_schema_definition definition)
788+
explicit schema_entry(unparsed_schema_definition definition)
789789
: definition{std::move(definition)} {}
790790

791-
canonical_schema_definition definition;
791+
unparsed_schema_definition definition;
792792
};
793793

794794
class subject_entry {

src/v/pandaproxy/schema_registry/test/store.cc

+10-14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "pandaproxy/schema_registry/error.h"
1313
#include "pandaproxy/schema_registry/test/compatibility_avro.h"
14+
#include "pandaproxy/schema_registry/types.h"
1415
#include "pandaproxy/schema_registry/util.h"
1516

1617
#include <absl/algorithm/container.h>
@@ -21,13 +22,13 @@ namespace pps = pandaproxy::schema_registry;
2122
constexpr std::string_view sv_string_def0{R"({"type":"string"})"};
2223
constexpr std::string_view sv_string_def1{R"({"type": "string"})"};
2324
constexpr std::string_view sv_int_def0{R"({"type": "int"})"};
24-
const pps::canonical_schema_definition string_def0{
25+
const pps::unparsed_schema_definition string_def0{
2526
pps::make_schema_definition<json::UTF8<>>(sv_string_def0).value(),
2627
pps::schema_type::avro};
27-
const pps::canonical_schema_definition string_def1{
28+
const pps::unparsed_schema_definition string_def1{
2829
pps::make_schema_definition<json::UTF8<>>(sv_string_def1).value(),
2930
pps::schema_type::avro};
30-
const pps::canonical_schema_definition int_def0{
31+
const pps::unparsed_schema_definition int_def0{
3132
pps::make_schema_definition<json::UTF8<>>(sv_int_def0).value(),
3233
pps::schema_type::avro};
3334
const pps::subject subject0{"subject0"};
@@ -72,7 +73,7 @@ BOOST_AUTO_TEST_CASE(test_store_insert) {
7273
bool upsert(
7374
pps::store& store,
7475
pps::subject sub,
75-
pps::canonical_schema_definition def,
76+
pps::unparsed_schema_definition def,
7677
pps::schema_type,
7778
pps::schema_id id,
7879
pps::schema_version version,
@@ -207,8 +208,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) {
207208
pps::seq_marker dummy_marker;
208209

209210
// First insert, expect id{1}
210-
auto ins_res = s.insert(
211-
{subject0, pps::canonical_schema_definition(schema1.share())});
211+
auto ins_res = s.insert(pps::to_unparsed({subject0, schema1.share()}));
212212
BOOST_REQUIRE(ins_res.inserted);
213213
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
214214
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});
@@ -222,8 +222,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) {
222222
BOOST_REQUIRE(versions.empty());
223223

224224
// Second insert, expect id{2}
225-
ins_res = s.insert(
226-
{subject0, pps::canonical_schema_definition(schema2.share())});
225+
ins_res = s.insert(pps::to_unparsed({subject0, schema2.share()}));
227226
BOOST_REQUIRE(ins_res.inserted);
228227
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2});
229228
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{2});
@@ -259,8 +258,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) {
259258
pps::seq_marker dummy_marker;
260259

261260
// First insert, expect id{1}
262-
auto ins_res = s.insert(
263-
{subject0, pps::canonical_schema_definition(schema1.share())});
261+
auto ins_res = s.insert(pps::to_unparsed({subject0, schema1.share()}));
264262
BOOST_REQUIRE(ins_res.inserted);
265263
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
266264
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});
@@ -271,15 +269,13 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) {
271269
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
272270

273271
// Second insert, same schema, expect id{1}
274-
ins_res = s.insert(
275-
{subject1, pps::canonical_schema_definition(schema1.share())});
272+
ins_res = s.insert(pps::to_unparsed({subject1, schema1.share()}));
276273
BOOST_REQUIRE(ins_res.inserted);
277274
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
278275
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});
279276

280277
// Insert yet another schema associated with a different subject
281-
ins_res = s.insert(
282-
{subject2, pps::canonical_schema_definition(schema2.share())});
278+
ins_res = s.insert(pps::to_unparsed({subject2, schema2.share()}));
283279
BOOST_REQUIRE(ins_res.inserted);
284280
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2});
285281
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

src/v/pandaproxy/schema_registry/types.cc

+11
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,15 @@ std::ostream& operator<<(std::ostream& os, const compatibility_result& res) {
9393
return os;
9494
}
9595

96+
unparsed_schema_definition
97+
to_unparsed(canonical_schema_definition&& canonical) {
98+
auto [raw, type, refs] = std::move(canonical).destructure();
99+
return {std::move(raw), type, std::move(refs)};
100+
}
101+
102+
unparsed_schema to_unparsed(canonical_schema&& canonical) {
103+
auto [sub, def] = std::move(canonical).destructure();
104+
return {std::move(sub), to_unparsed(std::move(def))};
105+
}
106+
96107
} // namespace pandaproxy::schema_registry

src/v/pandaproxy/schema_registry/types.h

+16-4
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ using unparsed_schema_definition
204204
using canonical_schema_definition
205205
= typed_schema_definition<struct canonical_schema_definition_tag>;
206206

207+
///\brief Util function for when a canonical schema need to be re-ingested
208+
unparsed_schema_definition to_unparsed(canonical_schema_definition&&);
209+
207210
static const unparsed_schema_definition invalid_schema_definition{
208211
"", schema_type::avro};
209212

@@ -468,16 +471,25 @@ class typed_schema {
468471
using unparsed_schema = typed_schema<unparsed_schema_definition::tag>;
469472
using canonical_schema = typed_schema<canonical_schema_definition::tag>;
470473

471-
///\brief Complete description of a subject and schema for a version.
472-
struct subject_schema {
473-
canonical_schema schema;
474+
///\brief Util function for when a canonical schema need to be re-ingested
475+
unparsed_schema to_unparsed(canonical_schema&&);
476+
477+
template<typename tag>
478+
struct typed_subject_schema {
479+
typed_schema<tag> schema;
474480
schema_version version{invalid_schema_version};
475481
schema_id id{invalid_schema_id};
476482
is_deleted deleted{false};
477-
subject_schema share() const {
483+
typed_subject_schema share() const {
478484
return {schema.share(), version, id, deleted};
479485
}
480486
};
487+
///\brief Complete description of a subject and schema for a version, as stored
488+
/// in store
489+
using unparsed_subject_schema
490+
= typed_subject_schema<unparsed_schema_definition::tag>;
491+
///\brief Complete description of a subject and schema for a version.
492+
using subject_schema = typed_subject_schema<canonical_schema_definition::tag>;
481493

482494
enum class compatibility_level {
483495
none = 0,

0 commit comments

Comments
 (0)