Skip to content

Commit ac213fa

Browse files
committed
Change store to unparsed_schema and remove upsert validation
1 parent 0e9dc55 commit ac213fa

21 files changed

+92
-108
lines changed

src/v/pandaproxy/schema_registry/avro.cc

+6-6
Original file line numberDiff line numberDiff line change
@@ -534,32 +534,32 @@ class collected_schema {
534534
bool contains(const ss::sstring& name) const {
535535
return _names.contains(name);
536536
}
537-
bool insert(ss::sstring name, canonical_schema_definition def) {
537+
bool insert(ss::sstring name, unparsed_schema_definition def) {
538538
bool inserted = _names.insert(std::move(name)).second;
539539
if (inserted) {
540540
_schemas.push_back(std::move(def).raw());
541541
}
542542
return inserted;
543543
}
544-
canonical_schema_definition::raw_string flatten() && {
544+
unparsed_schema_definition::raw_string flatten() && {
545545
iobuf out;
546546
for (auto& s : _schemas) {
547547
out.append(std::move(s));
548548
out.append("\n", 1);
549549
}
550-
return canonical_schema_definition::raw_string{std::move(out)};
550+
return unparsed_schema_definition::raw_string{std::move(out)};
551551
}
552552

553553
private:
554554
absl::flat_hash_set<ss::sstring> _names;
555-
std::vector<canonical_schema_definition::raw_string> _schemas;
555+
std::vector<unparsed_schema_definition::raw_string> _schemas;
556556
};
557557

558558
ss::future<collected_schema> collect_schema(
559559
schema_getter& store,
560560
collected_schema collected,
561561
ss::sstring name,
562-
canonical_schema schema) {
562+
unparsed_schema schema) {
563563
for (const auto& ref : schema.def().refs()) {
564564
if (!collected.contains(ref.name)) {
565565
auto ss = co_await store.get_subject_schema(
@@ -573,7 +573,7 @@ ss::future<collected_schema> collect_schema(
573573
}
574574

575575
ss::future<avro_schema_definition>
576-
make_avro_schema_definition(schema_getter& store, canonical_schema schema) {
576+
make_avro_schema_definition(schema_getter& store, unparsed_schema schema) {
577577
std::optional<avro::Exception> ex;
578578
try {
579579
auto name = schema.sub()();

src/v/pandaproxy/schema_registry/avro.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
namespace pandaproxy::schema_registry {
1919

2020
ss::future<avro_schema_definition>
21-
make_avro_schema_definition(schema_getter& store, canonical_schema schema);
21+
make_avro_schema_definition(schema_getter& store, unparsed_schema schema);
2222

2323
result<canonical_schema_definition>
2424
sanitize_avro_schema_definition(unparsed_schema_definition def);

src/v/pandaproxy/schema_registry/errors.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ inline error_info invalid_subject_schema(const subject& sub) {
145145
fmt::format("Error while looking up schema under subject {}", sub())};
146146
}
147147

148-
inline error_info invalid_schema(const canonical_schema& schema) {
148+
inline error_info invalid_schema(const unparsed_schema& schema) {
149149
return {
150150
error_code::schema_invalid, fmt::format("Invalid schema {}", schema)};
151151
}

src/v/pandaproxy/schema_registry/handlers.cc

+15-10
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,14 @@ post_subject(server::request_t rq, server::reply_t rp) {
421421
// Force 40401 if no subject
422422
co_await rq.service().schema_store().get_versions(sub, inc_del);
423423

424-
canonical_schema schema;
424+
unparsed_schema validated;
425425
try {
426426
auto unparsed = co_await ppj::rjson_parse(
427427
std::move(rq.req), post_subject_versions_request_handler<>{sub});
428-
schema = co_await rq.service().schema_store().make_canonical_schema(
429-
std::move(unparsed.def), norm);
428+
canonical_schema canonical
429+
= co_await rq.service().schema_store().make_canonical_schema(
430+
std::move(unparsed.def), norm);
431+
validated = to_unparsed_schema(std::move(canonical));
430432
} catch (const exception& e) {
431433
if (e.code() == error_code::schema_empty) {
432434
throw as_exception(invalid_subject_schema(sub));
@@ -437,7 +439,7 @@ post_subject(server::request_t rq, server::reply_t rp) {
437439
}
438440

439441
auto sub_schema = co_await rq.service().schema_store().has_schema(
440-
std::move(schema), inc_del, norm);
442+
std::move(validated), inc_del, norm);
441443

442444
rp.rep->write_body(
443445
"json",
@@ -477,9 +479,10 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
477479
unparsed.id = invalid_schema_id;
478480
}
479481

482+
auto canonical = co_await rq.service().schema_store().make_canonical_schema(
483+
std::move(unparsed.def), norm);
480484
subject_schema schema{
481-
co_await rq.service().schema_store().make_canonical_schema(
482-
std::move(unparsed.def), norm),
485+
to_unparsed_schema(std::move(canonical)),
483486
unparsed.version.value_or(invalid_schema_version),
484487
unparsed.id.value_or(invalid_schema_id),
485488
is_deleted::no};
@@ -674,10 +677,12 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) {
674677
version = parse_numerical_schema_version(ver).value();
675678
}
676679

677-
canonical_schema schema;
680+
unparsed_schema validated;
678681
try {
679-
schema = co_await rq.service().schema_store().make_canonical_schema(
680-
std::move(unparsed.def));
682+
auto canonical
683+
= co_await rq.service().schema_store().make_canonical_schema(
684+
std::move(unparsed.def));
685+
validated = to_unparsed_schema(std::move(canonical));
681686
} catch (exception& e) {
682687
constexpr auto reportable = [](std::error_code ec) {
683688
constexpr std::array errors{
@@ -699,7 +704,7 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) {
699704
}
700705

701706
auto get_res = co_await get_or_load(
702-
rq, [&rq, schema{std::move(schema)}, version, is_verbose]() {
707+
rq, [&rq, schema{std::move(validated)}, version, is_verbose]() {
703708
return rq.service().schema_store().is_compatible(
704709
version, schema.share(), is_verbose);
705710
});

src/v/pandaproxy/schema_registry/json.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -2357,7 +2357,7 @@ result<id_to_schema_pointer> collect_bundled_schema_and_fix_refs(
23572357
} // namespace
23582358

23592359
ss::future<json_schema_definition>
2360-
make_json_schema_definition(schema_getter&, canonical_schema schema) {
2360+
make_json_schema_definition(schema_getter&, unparsed_schema schema) {
23612361
auto doc
23622362
= parse_json(schema.def().shared_raw()()).value(); // throws on error
23632363
std::string_view name = schema.sub()();

src/v/pandaproxy/schema_registry/json.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
namespace pandaproxy::schema_registry {
1818

1919
ss::future<json_schema_definition>
20-
make_json_schema_definition(schema_getter& store, canonical_schema schema);
20+
make_json_schema_definition(schema_getter& store, unparsed_schema schema);
2121

2222
ss::future<canonical_schema> make_canonical_json_schema(
2323
sharded_store& store, unparsed_schema def, normalize norm = normalize::no);

src/v/pandaproxy/schema_registry/protobuf.cc

+9-9
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ class dp_error_collector final : public pb::DescriptorPool::ErrorCollector {
233233
///\brief Implements ZeroCopyInputStream with a copy of the definition
234234
class schema_def_input_stream : public pb::io::ZeroCopyInputStream {
235235
public:
236-
explicit schema_def_input_stream(const canonical_schema_definition& def)
236+
explicit schema_def_input_stream(const unparsed_schema_definition& def)
237237
: _is{def.shared_raw()}
238238
, _impl{&_is.istream()} {}
239239

@@ -255,7 +255,7 @@ class parser {
255255
: _parser{}
256256
, _fdp{} {}
257257

258-
const pb::FileDescriptorProto& parse(const canonical_schema& schema) {
258+
const pb::FileDescriptorProto& parse(const unparsed_schema& schema) {
259259
schema_def_input_stream is{schema.def()};
260260
io_error_collector error_collector;
261261
pb::io::Tokenizer t{&is, &error_collector};
@@ -431,7 +431,7 @@ build_file(pb::DescriptorPool& dp, const pb::FileDescriptorProto& fdp) {
431431
ss::future<pb::FileDescriptorProto> build_file_with_refs(
432432
pb::DescriptorPool& dp,
433433
schema_getter& store,
434-
canonical_schema schema,
434+
unparsed_schema schema,
435435
normalize norm) {
436436
for (const auto& ref : schema.def().refs()) {
437437
if (dp.FindFileByName(ref.name)) {
@@ -442,7 +442,7 @@ ss::future<pb::FileDescriptorProto> build_file_with_refs(
442442
co_await build_file_with_refs(
443443
dp,
444444
store,
445-
canonical_schema{subject{ref.name}, std::move(dep.schema).def()},
445+
unparsed_schema{subject{ref.name}, std::move(dep.schema).def()},
446446
normalize::no);
447447
}
448448

@@ -461,7 +461,7 @@ ss::future<pb::FileDescriptorProto> build_file_with_refs(
461461
ss::future<pb::FileDescriptorProto> import_schema(
462462
pb::DescriptorPool& dp,
463463
schema_getter& store,
464-
canonical_schema schema,
464+
unparsed_schema schema,
465465
normalize norm) {
466466
try {
467467
co_return co_await build_file_with_refs(
@@ -587,7 +587,7 @@ operator<<(std::ostream& os, const protobuf_schema_definition& def) {
587587
}
588588

589589
ss::future<protobuf_schema_definition> make_protobuf_schema_definition(
590-
schema_getter& store, canonical_schema schema, normalize norm) {
590+
schema_getter& store, unparsed_schema schema, normalize norm) {
591591
auto refs = schema.def().refs();
592592
auto impl = ss::make_shared<protobuf_schema_definition::impl>();
593593
auto v2_renderer = protobuf_renderer_v2::no;
@@ -608,7 +608,7 @@ ss::future<protobuf_schema_definition> make_protobuf_schema_definition(
608608
}
609609

610610
ss::future<canonical_schema_definition> validate_protobuf_schema(
611-
sharded_store& store, canonical_schema schema, normalize norm) {
611+
sharded_store& store, unparsed_schema schema, normalize norm) {
612612
auto res = co_await make_protobuf_schema_definition(
613613
store, std::move(schema), norm);
614614
co_return canonical_schema_definition{std::move(res)};
@@ -618,9 +618,9 @@ ss::future<canonical_schema> make_canonical_protobuf_schema(
618618
sharded_store& store, unparsed_schema schema, normalize norm) {
619619
auto [sub, unparsed] = std::move(schema).destructure();
620620
auto [def, type, refs] = std::move(unparsed).destructure();
621-
canonical_schema temp{
621+
unparsed_schema temp{
622622
sub,
623-
{canonical_schema_definition::raw_string{std::move(def)()},
623+
{unparsed_schema_definition::raw_string{std::move(def)()},
624624
type,
625625
std::move(refs)}};
626626

src/v/pandaproxy/schema_registry/protobuf.h

+2-6
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,10 @@ class Descriptor;
2121
namespace pandaproxy::schema_registry {
2222

2323
ss::future<protobuf_schema_definition> make_protobuf_schema_definition(
24-
schema_getter& store,
25-
canonical_schema schema,
26-
normalize norm = normalize::no);
24+
schema_getter& store, unparsed_schema schema, normalize norm = normalize::no);
2725

2826
ss::future<canonical_schema_definition> validate_protobuf_schema(
29-
sharded_store& store,
30-
canonical_schema schema,
31-
normalize norm = normalize::no);
27+
sharded_store& store, unparsed_schema schema, normalize norm = normalize::no);
3228

3329
ss::future<canonical_schema> make_canonical_protobuf_schema(
3430
sharded_store& store, unparsed_schema schema, normalize norm = normalize::no);

src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
namespace pandaproxy::schema_registry {
1919

2020
struct get_schemas_ids_id_response {
21-
canonical_schema_definition definition;
21+
unparsed_schema_definition definition;
2222
};
2323

2424
template<typename Buffer>

src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
namespace pandaproxy::schema_registry {
1919

2020
struct post_subject_versions_version_response {
21-
canonical_schema schema;
21+
unparsed_schema schema;
2222
schema_id id;
2323
schema_version version;
2424
};

src/v/pandaproxy/schema_registry/schema_getter.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ class schema_getter {
2525
std::optional<schema_version> version,
2626
include_deleted inc_dec)
2727
= 0;
28-
virtual ss::future<canonical_schema_definition>
28+
virtual ss::future<unparsed_schema_definition>
2929
get_schema_definition(schema_id id) = 0;
30-
virtual ss::future<std::optional<canonical_schema_definition>>
30+
virtual ss::future<std::optional<unparsed_schema_definition>>
3131
maybe_get_schema_definition(schema_id id) = 0;
3232
virtual ~schema_getter() = default;
3333
};

src/v/pandaproxy/schema_registry/seq_writer.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(
247247
.node{_node_id},
248248
.sub{sub},
249249
.version{projected.version}};
250-
auto value = canonical_schema_value{
250+
auto value = unparsed_schema_value{
251251
.schema{std::move(canonical)},
252252
.version{projected.version},
253253
.id{projected.id},
@@ -440,7 +440,7 @@ ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
440440
auto key = schema_key{
441441
.seq{write_at}, .node{_node_id}, .sub{sub}, .version{version}};
442442
vlog(plog.debug, "seq_writer::delete_subject_version {}", key);
443-
auto value = canonical_schema_value{
443+
auto value = unparsed_schema_value{
444444
.schema{std::move(ss.schema)},
445445
.version{version},
446446
.id{ss.id},

0 commit comments

Comments
 (0)