@@ -167,14 +167,10 @@ sharded_store::get_schema_version(subject_schema schema, normalize norm) {
167
167
if (s_res.has_error ()) {
168
168
return ss::make_ready_future ();
169
169
}
170
- auto [raw, type, refs]
171
- = std::move (s_res.value ()).destructure ();
172
170
return this
173
171
->make_canonical_schema (
174
172
unparsed_schema{
175
- subject{},
176
- unparsed_schema_definition{
177
- std::move (raw), type, std::move (refs)}},
173
+ subject{}, std::move (s_res.value ())},
178
174
norm)
179
175
.then ([id, &ret_id, &canonical_def](
180
176
canonical_schema processed) {
@@ -299,22 +295,6 @@ ss::future<bool> sharded_store::upsert(
299
295
unparsed_schema schema,
300
296
schema_id id,
301
297
schema_version version,
302
- is_deleted deleted) {
303
- auto norm = normalize{
304
- config::shard_local_cfg ().schema_registry_normalize_on_startup ()};
305
- co_return co_await upsert (
306
- marker,
307
- co_await make_canonical_schema (std::move (schema), norm),
308
- id,
309
- version,
310
- deleted);
311
- }
312
-
313
- ss::future<bool > sharded_store::upsert (
314
- seq_marker marker,
315
- canonical_schema schema,
316
- schema_id id,
317
- schema_version version,
318
298
is_deleted deleted) {
319
299
auto [sub, def] = std::move (schema).destructure ();
320
300
co_await upsert_schema (id, std::move (def));
@@ -365,6 +345,16 @@ ss::future<subject_schema> sharded_store::has_schema(
365
345
if (
366
346
e.code () == error_code::subject_not_found
367
347
|| e.code () == error_code::subject_version_not_found) {
348
+ } else if (
349
+ // Stored schemas might be invalid if imported improperly
350
+ e.code () == error_code::schema_invalid) {
351
+ vlog (
352
+ plog.warn ,
353
+ " Failed to parse stored schema, subject '{}', version {}. "
354
+ " Error: " ,
355
+ schema.sub (),
356
+ ver,
357
+ e.what ());
368
358
} else {
369
359
throw ;
370
360
}
@@ -378,10 +368,10 @@ ss::future<subject_schema> sharded_store::has_schema(
378
368
379
369
ss::future<std::optional<canonical_schema_definition>>
380
370
sharded_store::maybe_get_schema_definition (schema_id id) {
381
- co_return co_await _store.invoke_on (
371
+ auto unparsed = co_await _store.invoke_on (
382
372
shard_for (id),
383
373
_smp_opts,
384
- [id](store& s) -> std::optional<canonical_schema_definition > {
374
+ [id](store& s) -> std::optional<unparsed_schema_definition > {
385
375
auto s_res = s.get_schema_definition (id);
386
376
if (
387
377
s_res.has_error ()
@@ -390,10 +380,49 @@ sharded_store::maybe_get_schema_definition(schema_id id) {
390
380
}
391
381
return std::move (s_res.value ());
392
382
});
383
+ if (!unparsed.has_value ()) {
384
+ co_return std::nullopt;
385
+ }
386
+
387
+ try {
388
+ auto canonical = co_await make_canonical_schema (
389
+ {{}, std::move (unparsed.value ())});
390
+
391
+ co_return std::move (canonical).def ();
392
+ } catch (const exception & e) {
393
+ vlog (
394
+ plog.warn ,
395
+ " Failed to parse stored schema with id {}: Error {}" ,
396
+ id,
397
+ e.what ());
398
+ throw ;
399
+ }
393
400
}
394
401
395
402
ss::future<canonical_schema_definition>
396
403
sharded_store::get_schema_definition (schema_id id) {
404
+ auto unparsed = co_await _store.invoke_on (
405
+ shard_for (id), _smp_opts, [id](store& s) {
406
+ return s.get_schema_definition (id).value ();
407
+ });
408
+
409
+ try {
410
+ auto canonical = co_await make_canonical_schema (
411
+ {{}, std::move (unparsed)});
412
+
413
+ co_return std::move (canonical).def ();
414
+ } catch (const exception & e) {
415
+ vlog (
416
+ plog.warn ,
417
+ " Failed to parse stored schema with id {}: Error {}" ,
418
+ id,
419
+ e.what ());
420
+ throw ;
421
+ }
422
+ }
423
+
424
+ ss::future<unparsed_schema_definition>
425
+ sharded_store::get_unparsed_schema_definition (schema_id id) {
397
426
co_return co_await _store.invoke_on (
398
427
shard_for (id), _smp_opts, [id](store& s) {
399
428
return s.get_schema_definition (id).value ();
@@ -428,6 +457,17 @@ sharded_store::get_schema_subjects(schema_id id, include_deleted inc_del) {
428
457
co_return subs;
429
458
}
430
459
460
+ ss::future<schema_id>
461
+ sharded_store::get_id (subject sub, std::optional<schema_version> version) {
462
+ auto v_id = co_await _store.invoke_on (
463
+ shard_for (sub), _smp_opts, [sub, version](store& s) {
464
+ return s.get_subject_version_id (sub, version, include_deleted::yes)
465
+ .value ();
466
+ });
467
+
468
+ co_return v_id.id ;
469
+ }
470
+
431
471
ss::future<subject_schema> sharded_store::get_subject_schema (
432
472
subject sub, std::optional<schema_version> version, include_deleted inc_del) {
433
473
auto sub_shard{shard_for (sub)};
@@ -441,11 +481,23 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
441
481
return s.get_schema_definition (id).value ();
442
482
});
443
483
444
- co_return subject_schema{
445
- .schema = {sub, std::move (def)},
446
- .version = v_id.version ,
447
- .id = v_id.id ,
448
- .deleted = v_id.deleted };
484
+ try {
485
+ auto canonical = co_await make_canonical_schema ({sub, std::move (def)});
486
+
487
+ co_return subject_schema{
488
+ .schema = std::move (canonical),
489
+ .version = v_id.version ,
490
+ .id = v_id.id ,
491
+ .deleted = v_id.deleted };
492
+ } catch (const exception & e) {
493
+ vlog (
494
+ plog.warn ,
495
+ " Failed to parse stored schema, subject {}, version {}: {}" ,
496
+ sub,
497
+ v_id.id ,
498
+ e.what ());
499
+ throw ;
500
+ }
449
501
}
450
502
451
503
ss::future<chunked_vector<subject>> sharded_store::get_subjects (
@@ -703,7 +755,7 @@ sharded_store::clear_compatibility(seq_marker marker, subject sub) {
703
755
}
704
756
705
757
ss::future<bool >
706
- sharded_store::upsert_schema (schema_id id, canonical_schema_definition def) {
758
+ sharded_store::upsert_schema (schema_id id, unparsed_schema_definition def) {
707
759
co_await maybe_update_max_schema_id (id);
708
760
co_return co_await _store.invoke_on (
709
761
shard_for (id), _smp_opts, [id, def{std::move (def)}](store& s) mutable {
0 commit comments