Skip to content

Commit a314cac

Browse files
committed
feat: add configurable validation strategy by topic
1 parent 101d69e commit a314cac

File tree

8 files changed

+215
-32
lines changed

8 files changed

+215
-32
lines changed

karapace/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ class NameStrategy(Enum):
169169
topic_name = "topic_name"
170170
record_name = "record_name"
171171
topic_record_name = "topic_record_name"
172+
no_validation = "no_validation_strategy"
172173

173174

174175
def parse_env_value(value: str) -> str | int | bool:

karapace/in_memory_database.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
from __future__ import annotations
88

99
from dataclasses import dataclass, field
10+
from karapace.config import NameStrategy
1011
from karapace.schema_models import SchemaVersion, TypedSchema
1112
from karapace.schema_references import Reference, Referents
12-
from karapace.typing import ResolvedVersion, SchemaId, Subject
13+
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
1314
from threading import Lock, RLock
1415
from typing import Iterable, Sequence
1516

@@ -32,6 +33,7 @@ def __init__(self) -> None:
3233
self.schemas: dict[SchemaId, TypedSchema] = {}
3334
self.schema_lock_thread = RLock()
3435
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
36+
self.topic_validation_strategies: dict[TopicName, NameStrategy] = {}
3537

3638
# Content based deduplication of schemas. This is used to reduce memory
3739
# usage when the same schema is produce multiple times to the same or
@@ -229,6 +231,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
229231
if schema_version.deleted is False
230232
}
231233

234+
def get_topic_strategy(self, *, topic_name: TopicName) -> NameStrategy:
235+
if topic_name not in self.topic_validation_strategies:
236+
return NameStrategy.topic_name
237+
238+
return self.topic_validation_strategies[topic_name]
239+
240+
def override_topic_strategy(self, *, topic_name: TopicName, name_strategy: NameStrategy) -> None:
241+
self.topic_validation_strategies[topic_name] = name_strategy
242+
232243
def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
233244
with self.schema_lock_thread:
234245
for schema_version in self.subjects[subject].schemas.values():

karapace/kafka_rest_apis/__init__.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
TopicAuthorizationFailedError,
1414
UnknownTopicOrPartitionError,
1515
)
16-
from karapace.config import Config, create_client_ssl_context
16+
from karapace.config import Config, create_client_ssl_context, NameStrategy
1717
from karapace.errors import InvalidSchema
1818
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
1919
from karapace.kafka_rest_apis.authentication import (
@@ -29,7 +29,7 @@
2929
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
3030
from karapace.schema_type import SchemaType
3131
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
32-
from karapace.typing import SchemaId, Subject
32+
from karapace.typing import SchemaId, Subject, TopicName
3333
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
3434
from typing import Callable, Dict, List, Optional, Tuple, Union
3535

@@ -773,26 +773,32 @@ async def get_schema_id(
773773
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None
774774
)
775775
schema_str = data.get(f"{prefix}_schema")
776+
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic))
776777

777778
if schema_id is None and schema_str is None:
778779
raise InvalidSchema()
779780

780781
if schema_id is None:
781782
parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str)
782-
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)
783+
784+
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type, naming_strategy)
783785
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
784786
else:
785787

786788
def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
787-
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type)
789+
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type, naming_strategy)
788790
return subject not in subjects
789791

790792
parsed_schema, valid_subjects = await self._query_schema_and_subjects(
791793
schema_id,
792794
need_new_call=subject_not_included,
793795
)
794796

795-
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
797+
if (
798+
self.config["name_strategy_validation"]
799+
and naming_strategy != NameStrategy.no_validation
800+
and subject_not_included(parsed_schema, valid_subjects)
801+
):
796802
raise InvalidSchema()
797803

798804
return schema_id

karapace/schema_reader.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from avro.schema import Schema as AvroSchema
1010
from contextlib import closing, ExitStack
11+
from enum import Enum
1112
from jsonschema.validators import Draft7Validator
1213
from kafka import KafkaConsumer, TopicPartition
1314
from kafka.admin import KafkaAdminClient, NewTopic
@@ -20,7 +21,7 @@
2021
TopicAlreadyExistsError,
2122
)
2223
from karapace import constants
23-
from karapace.config import Config
24+
from karapace.config import Config, NameStrategy
2425
from karapace.dependency import Dependency
2526
from karapace.errors import InvalidReferences, InvalidSchema
2627
from karapace.in_memory_database import InMemoryDatabase
@@ -31,7 +32,7 @@
3132
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
3233
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
3334
from karapace.statsd import StatsClient
34-
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
35+
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName
3536
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
3637
from threading import Event, Thread
3738
from typing import Final, Mapping, Sequence
@@ -58,6 +59,14 @@
5859
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"
5960

6061

62+
class MessageType(Enum):
63+
config = "CONFIG"
64+
schema = "SCHEMA"
65+
delete_subject = "DELETE_SUBJECT"
66+
schema_strategy = "SCHEMA_STRATEGY"
67+
no_operation = "NOOP"
68+
69+
6170
def _create_consumer_from_config(config: Config) -> KafkaConsumer:
6271
# Group not set on purpose, all consumers read the same data
6372
session_timeout_ms = config["session_timeout_ms"]
@@ -429,6 +438,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
429438
LOG.info("Deleting subject: %r, value: %r", subject, value)
430439
self.database.delete_subject(subject=subject, version=version)
431440

441+
def _handle_msg_schema_strategy(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
442+
assert isinstance(value, dict)
443+
topic, strategy = value["topic"], value["strategy"]
444+
self.database.override_topic_strategy(topic_name=TopicName(topic), name_strategy=NameStrategy(strategy))
445+
432446
def _handle_msg_schema_hard_delete(self, key: dict) -> None:
433447
subject, version = key["subject"], key["version"]
434448

@@ -522,14 +536,27 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
522536
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)
523537

524538
def handle_msg(self, key: dict, value: dict | None) -> None:
525-
if key["keytype"] == "CONFIG":
526-
self._handle_msg_config(key, value)
527-
elif key["keytype"] == "SCHEMA":
528-
self._handle_msg_schema(key, value)
529-
elif key["keytype"] == "DELETE_SUBJECT":
530-
self._handle_msg_delete_subject(key, value)
531-
elif key["keytype"] == "NOOP": # for spec completeness
532-
pass
539+
if "keytype" in key:
540+
try:
541+
message_type = MessageType(key["keytype"])
542+
543+
if message_type == MessageType.config:
544+
self._handle_msg_config(key, value)
545+
elif message_type == MessageType.schema:
546+
self._handle_msg_schema(key, value)
547+
elif message_type == MessageType.delete_subject:
548+
self._handle_msg_delete_subject(key, value)
549+
elif message_type == MessageType.schema_strategy:
550+
self._handle_msg_schema_strategy(key, value)
551+
elif message_type == MessageType.no_operation:
552+
pass
553+
except ValueError:
554+
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])
555+
556+
else:
557+
LOG.error(
558+
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
559+
)
533560

534561
def remove_referenced_by(
535562
self,

karapace/schema_registry.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from contextlib import AsyncExitStack, closing
88
from karapace.compatibility import check_compatibility, CompatibilityModes
99
from karapace.compatibility.jsonschema.checks import is_incompatible
10-
from karapace.config import Config
10+
from karapace.config import Config, NameStrategy
1111
from karapace.dependency import Dependency
1212
from karapace.errors import (
1313
IncompatibleSchema,
@@ -27,9 +27,9 @@
2727
from karapace.messaging import KarapaceProducer
2828
from karapace.offset_watcher import OffsetWatcher
2929
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
30-
from karapace.schema_reader import KafkaSchemaReader
30+
from karapace.schema_reader import KafkaSchemaReader, MessageType
3131
from karapace.schema_references import LatestVersionReference, Reference
32-
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
32+
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version
3333
from typing import Mapping, Sequence
3434

3535
import asyncio
@@ -466,6 +466,19 @@ def send_schema_message(
466466
value = None
467467
self.producer.send_message(key=key, value=value)
468468

469+
def get_validation_strategy_for_topic(self, *, topic_name: TopicName) -> NameStrategy:
470+
return self.database.get_topic_strategy(topic_name=topic_name)
471+
472+
def send_validation_strategy_for_topic(
473+
self,
474+
*,
475+
topic_name: TopicName,
476+
validation_strategy: NameStrategy,
477+
) -> None:
478+
key = {"topic": topic_name, "keytype": MessageType.schema_strategy.value, "magic": 0}
479+
value = {"strategy": validation_strategy.value, "topic": topic_name}
480+
self.producer.send_message(key=key, value=value)
481+
469482
def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None:
470483
key = {"subject": subject, "magic": 0, "keytype": "CONFIG"}
471484
value = {"compatibilityLevel": compatibility_level.value}

karapace/schema_registry_apis.py

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from karapace.auth import HTTPAuthorizer, Operation, User
1212
from karapace.compatibility import check_compatibility, CompatibilityModes
1313
from karapace.compatibility.jsonschema.checks import is_incompatible
14-
from karapace.config import Config
14+
from karapace.config import Config, NameStrategy
1515
from karapace.errors import (
1616
IncompatibleSchema,
1717
InvalidReferences,
@@ -28,13 +28,13 @@
2828
SubjectSoftDeletedException,
2929
VersionNotFoundException,
3030
)
31-
from karapace.karapace import KarapaceBase
31+
from karapace.karapace import empty_response, KarapaceBase
3232
from karapace.protobuf.exception import ProtobufUnresolvedDependencyException
3333
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
3434
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
3535
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
3636
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
37-
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
37+
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
3838
from karapace.utils import JSONDecodeError
3939
from typing import Any
4040

@@ -301,6 +301,23 @@ def _add_schema_registry_routes(self) -> None:
301301
json_body=False,
302302
auth=self._auth,
303303
)
304+
self.route(
305+
"/topic/<topic:path>/name_strategy",
306+
callback=self.subject_validation_strategy_get,
307+
method="GET",
308+
schema_request=True,
309+
json_body=False,
310+
auth=None,
311+
)
312+
self.route(
313+
"/topic/<topic:path>/name_strategy/<strategy:path>",
314+
callback=self.subject_validation_strategy_set,
315+
method="POST",
316+
schema_request=True,
317+
with_request=True,
318+
json_body=False,
319+
auth=None,
320+
)
304321

305322
async def close(self) -> None:
306323
async with AsyncExitStack() as stack:
@@ -985,6 +1002,38 @@ def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType
9851002
)
9861003
return schema_type
9871004

1005+
def _validate_topic_name(self, topic: str) -> TopicName:
1006+
valid_topic_names = self.schema_registry.schema_reader.admin_client.list_topics()
1007+
1008+
if topic in valid_topic_names:
1009+
return TopicName(topic)
1010+
1011+
self.r(
1012+
body={
1013+
"error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
1014+
"message": f"The topic {topic} isn't existing, proceed with creating it first",
1015+
},
1016+
content_type=JSON_CONTENT_TYPE,
1017+
status=HTTPStatus.UNPROCESSABLE_ENTITY,
1018+
)
1019+
1020+
def _validate_name_strategy(self, name_strategy: str) -> NameStrategy:
1021+
try:
1022+
strategy = NameStrategy(name_strategy)
1023+
return strategy
1024+
except ValueError:
1025+
valid_strategies = [strategy.value for strategy in NameStrategy]
1026+
error_message = f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}"
1027+
1028+
self.r(
1029+
body={
1030+
"error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
1031+
"message": error_message,
1032+
},
1033+
content_type=JSON_CONTENT_TYPE,
1034+
status=HTTPStatus.UNPROCESSABLE_ENTITY,
1035+
)
1036+
9881037
def _validate_schema_key(self, content_type: str, body: dict) -> None:
9891038
if "schema" not in body:
9901039
self.r(
@@ -1238,6 +1287,44 @@ async def subject_post(
12381287
url = f"{master_url}/subjects/{subject}/versions"
12391288
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")
12401289

1290+
async def subject_validation_strategy_get(self, content_type: str, *, topic: str) -> None:
1291+
strategy_name = self.schema_registry.get_validation_strategy_for_topic(topic_name=TopicName(topic)).value
1292+
reply = {"strategy": strategy_name}
1293+
self.r(reply, content_type)
1294+
1295+
async def subject_validation_strategy_set(
1296+
self,
1297+
content_type: str,
1298+
request: HTTPRequest,
1299+
*,
1300+
topic: str,
1301+
strategy: str,
1302+
) -> None:
1303+
# proceeding with the strategy first since it's cheaper
1304+
strategy_name = self._validate_name_strategy(strategy)
1305+
# real validation of the topic name commented, do we need to do that? does it make sense?
1306+
topic_name = TopicName(topic) # self._validate_topic_name(topic)
1307+
1308+
are_we_master, master_url = await self.schema_registry.get_master()
1309+
if are_we_master:
1310+
self.schema_registry.send_validation_strategy_for_topic(
1311+
topic_name=topic_name,
1312+
validation_strategy=strategy_name,
1313+
)
1314+
empty_response()
1315+
else:
1316+
# I don't really like it, in theory we should parse the URL and change only the host portion while
1317+
# keeping the rest the same
1318+
url = f"{master_url}/topic/{topic}/name_strategy"
1319+
1320+
await self._forward_request_remote(
1321+
request=request,
1322+
body=None,
1323+
url=url,
1324+
content_type=content_type,
1325+
method="POST",
1326+
)
1327+
12411328
def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
12421329
schema_id = self.schema_registry.database.get_schema_id_if_exists(
12431330
subject=subject, schema=schema, include_deleted=include_deleted

0 commit comments

Comments
 (0)