Skip to content

Commit e8e409a

Browse files
feat: GET /mode and GET /mode/<subject:path> endpoints
Response contains static "READWRITE" mode for global and subject modes.
1 parent 2ff823b commit e8e409a

5 files changed

Lines changed: 128 additions & 2 deletions

File tree

karapace/schema_registry.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
3030
from karapace.schema_reader import KafkaSchemaReader
3131
from karapace.schema_references import LatestVersionReference, Reference
32-
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
32+
from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject, Version
3333
from typing import Mapping, Sequence
3434

3535
import asyncio
@@ -439,6 +439,12 @@ def get_subject_versions_for_schema(
439439
subject_versions = sorted(subject_versions, key=lambda s: (s["subject"], s["version"]))
440440
return subject_versions
441441

442+
def get_global_mode(self) -> Mode:
443+
return Mode.readwrite
444+
445+
def get_subject_mode(self) -> Mode:
446+
return Mode.readwrite
447+
442448
def send_schema_message(
443449
self,
444450
*,

karapace/schema_registry_apis.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
3535
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
3636
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
37-
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
37+
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
3838
from karapace.utils import JSONDecodeError
3939
from typing import Any
4040

@@ -302,6 +302,24 @@ def _add_schema_registry_routes(self) -> None:
302302
json_body=False,
303303
auth=self._auth,
304304
)
305+
self.route(
306+
"/mode",
307+
callback=self.get_global_mode,
308+
method="GET",
309+
schema_request=True,
310+
with_request=False,
311+
json_body=False,
312+
auth=self._auth,
313+
)
314+
self.route(
315+
"/mode/<subject:path>",
316+
callback=self.get_subject_mode,
317+
method="GET",
318+
schema_request=True,
319+
with_request=False,
320+
json_body=False,
321+
auth=self._auth,
322+
)
305323

306324
async def close(self) -> None:
307325
self.log.info("Closing karapace_schema_registry_controller")
@@ -1245,6 +1263,44 @@ async def subject_post(
12451263
url = f"{master_url}/subjects/{subject}/versions"
12461264
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")
12471265

1266+
async def get_global_mode(
1267+
self,
1268+
content_type: str,
1269+
*,
1270+
user: User | None = None,
1271+
) -> None:
1272+
self._check_authorization(user, Operation.Read, "Config:")
1273+
self.r(
1274+
body={"mode": str(self.schema_registry.get_global_mode())},
1275+
content_type=content_type,
1276+
status=HTTPStatus.OK,
1277+
)
1278+
1279+
async def get_subject_mode(
1280+
self,
1281+
content_type: str,
1282+
*,
1283+
subject: str,
1284+
user: User | None = None,
1285+
) -> None:
1286+
self._check_authorization(user, Operation.Read, f"Subject:{subject}")
1287+
1288+
if self.schema_registry.database.find_subject(subject=Subject(subject)) is None:
1289+
self.r(
1290+
body={
1291+
"error_code": SchemaErrorCodes.SUBJECT_NOT_FOUND.value,
1292+
"message": SchemaErrorMessages.SUBJECT_NOT_FOUND_FMT.value.format(subject=subject),
1293+
},
1294+
content_type=content_type,
1295+
status=HTTPStatus.NOT_FOUND,
1296+
)
1297+
1298+
self.r(
1299+
body={"mode": str(self.schema_registry.get_global_mode())},
1300+
content_type=content_type,
1301+
status=HTTPStatus.OK,
1302+
)
1303+
12481304
def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
12491305
schema_id = self.schema_registry.database.get_schema_id_if_exists(
12501306
subject=subject, schema=schema, include_deleted=include_deleted

karapace/typing.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,8 @@ class SubjectType(StrEnum, Enum):
4949
value = "value"
5050
# partition it's a function of `str` and StrEnum its inherits from it.
5151
partition_ = "partition"
52+
53+
54+
@unique
55+
class Mode(StrEnum):
56+
readwrite = "READWRITE"

tests/integration/test_schema_registry_auth.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None:
100100
res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}")
101101
assert res.status_code == 401
102102

103+
res = await registry_async_client_auth.get("mode")
104+
assert res.status_code == 401
105+
106+
res = await registry_async_client_auth.get(f"mode/{quote(subject)}")
107+
assert res.status_code == 401
108+
103109

104110
async def test_sr_list_subjects(registry_async_client_auth: Client) -> None:
105111
cavesubject = new_random_name("cave-")
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""
2+
Copyright (c) 2024 Aiven Ltd
3+
See LICENSE for details
4+
"""
5+
from karapace.client import Client
6+
from karapace.typing import Mode
7+
from tests.utils import create_schema_name_factory, create_subject_name_factory
8+
9+
import json
10+
import pytest
11+
12+
13+
@pytest.mark.parametrize("trail", ["", "/"])
14+
async def test_global_mode(registry_async_client: Client, trail: str) -> None:
15+
res = await registry_async_client.get(f"/mode{trail}")
16+
assert res.status_code == 200
17+
json_res = res.json()
18+
assert json_res == {"mode": str(Mode.readwrite)}
19+
20+
21+
@pytest.mark.parametrize("trail", ["", "/"])
22+
async def test_subject_mode(registry_async_client: Client, trail: str) -> None:
23+
subject_name_factory = create_subject_name_factory(f"test_schema_same_subject_{trail}")
24+
schema_name = create_schema_name_factory(f"test_schema_same_subject_{trail}")()
25+
26+
schema_str = json.dumps(
27+
{
28+
"type": "record",
29+
"name": schema_name,
30+
"fields": [
31+
{
32+
"name": "f",
33+
"type": "string",
34+
}
35+
],
36+
}
37+
)
38+
subject = subject_name_factory()
39+
res = await registry_async_client.post(
40+
f"subjects/{subject}/versions",
41+
json={"schema": schema_str},
42+
)
43+
assert res.status_code == 200
44+
45+
res = await registry_async_client.get(f"/mode/{subject}{trail}")
46+
assert res.status_code == 200
47+
json_res = res.json()
48+
assert json_res == {"mode": str(Mode.readwrite)}
49+
50+
res = await registry_async_client.get(f"/mode/unknown_subject{trail}")
51+
assert res.status_code == 404
52+
json_res = res.json()
53+
assert json_res == {"error_code": 40401, "message": "Subject 'unknown_subject' not found."}

0 commit comments

Comments
 (0)