Skip to content

Commit a6f69fd

Browse files
authored
Merge pull request #1240 from Aiven-Open/mbasani-fix-metrics-headers
Skip schema request headers for few endpoints
2 parents db9382d + f2d13b5 commit a6f69fd

10 files changed

Lines changed: 108 additions & 38 deletions

File tree

src/karapace/api/content_type.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424
"application/vnd.schemaregistry+json",
2525
JSON_CONTENT_TYPE,
2626
]
27+
SCHEMA_RESPONSE_DEFAULT_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"
2728

2829

29-
def check_schema_headers(request: Request) -> str:
30+
def negotiate_schema_content_type(request: Request) -> str:
31+
"""Validate Accept and Content-Type headers for schema-registry endpoints.
32+
33+
Returns the negotiated response content type on success.
34+
Raises HTTPException 406 or 415 on invalid headers.
35+
"""
3036
method = request.method
31-
response_default_content_type = "application/vnd.schemaregistry.v1+json"
3237

3338
message = Message()
3439
message["Content-Type"] = request.headers.get("Content-Type", JSON_CONTENT_TYPE)
@@ -42,14 +47,11 @@ def check_schema_headers(request: Request) -> str:
4247
detail={
4348
"message": "HTTP 415 Unsupported Media Type",
4449
},
45-
headers={
46-
"Content-Type": response_default_content_type,
47-
},
4850
)
4951
accept_val = request.headers.get("Accept")
5052
if accept_val:
5153
if accept_val in ("*/*", "*") or accept_val.startswith("*/"):
52-
return response_default_content_type
54+
return SCHEMA_RESPONSE_DEFAULT_CONTENT_TYPE
5355
content_type_match = get_best_match(accept_val, SCHEMA_ACCEPT_VALUES)
5456
if not content_type_match:
5557
LOG.debug("Unexpected Accept value: %r", accept_val)
@@ -58,9 +60,6 @@ def check_schema_headers(request: Request) -> str:
5860
detail={
5961
"message": "HTTP 406 Not Acceptable",
6062
},
61-
headers={
62-
"Content-Type": response_default_content_type,
63-
},
6463
)
6564
return content_type_match
66-
return response_default_content_type
65+
return SCHEMA_RESPONSE_DEFAULT_CONTENT_TYPE

src/karapace/api/middlewares/__init__.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from collections.abc import Awaitable, Callable
77
from fastapi import FastAPI, HTTPException, Request, Response
88
from fastapi.responses import JSONResponse
9-
from karapace.api.content_type import check_schema_headers
109
from karapace.api.telemetry.middleware import setup_telemetry_middleware
1110

1211
from karapace.api.oidc.middleware import OIDCMiddleware
@@ -26,23 +25,6 @@ async def set_content_types(request: Request, call_next: Callable[[Request], Awa
2625
if request.url.path in {"/docs", "/docs/oauth2-redirect", "/redoc", "/openapi.json"}:
2726
return await call_next(request)
2827

29-
try:
30-
response_content_type = check_schema_headers(request)
31-
except HTTPException as exc:
32-
return JSONResponse(
33-
status_code=exc.status_code,
34-
headers=exc.headers,
35-
content=exc.detail,
36-
)
37-
38-
# Schema registry supports application/octet-stream, assumption is JSON object body.
39-
# Force internally to use application/json in this case for compatibility.
40-
if request.headers.get("Content-Type") == "application/octet-stream":
41-
new_headers = request.headers.mutablecopy()
42-
new_headers["Content-Type"] = "application/json"
43-
request._headers = new_headers
44-
request.scope.update(headers=request.headers.raw)
45-
4628
# Check for skip paths like /_health and /metrics and bypass
4729
if request.url.path in config.sasl_oauthbearer_skip_auth_paths:
4830
return await call_next(request)
@@ -79,7 +61,11 @@ async def set_content_types(request: Request, call_next: Callable[[Request], Awa
7961
)
8062

8163
response = await call_next(request)
82-
response.headers["Content-Type"] = response_content_type
64+
65+
content_type = getattr(request.state, "schema_response_content_type", None)
66+
if content_type:
67+
response.headers["Content-Type"] = content_type
68+
8369
return response
8470

8571
setup_telemetry_middleware(app=app)

src/karapace/api/routers/compatibility.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from karapace.api.container import SchemaRegistryContainer
99
from karapace.api.controller import KarapaceSchemaRegistryController
1010
from karapace.api.routers.errors import unauthorized
11-
from karapace.api.routers.raw_path_router import RawPathRoute
11+
from karapace.api.routers.raw_path_router import SchemaRegistryRoute
1212
from karapace.api.routers.requests import CompatibilityCheckResponse, SchemaRequest
1313
from karapace.api.user import get_current_user
1414
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
@@ -21,7 +21,7 @@
2121
prefix="/compatibility",
2222
tags=["compatibility"],
2323
responses={404: {"description": "Not found"}},
24-
route_class=RawPathRoute,
24+
route_class=SchemaRegistryRoute,
2525
)
2626

2727

src/karapace/api/routers/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from karapace.api.controller import KarapaceSchemaRegistryController
1010
from karapace.api.forward_client import ForwardClient
1111
from karapace.api.routers.errors import no_primary_url_error, unauthorized
12-
from karapace.api.routers.raw_path_router import RawPathRoute
12+
from karapace.api.routers.raw_path_router import SchemaRegistryRoute
1313
from karapace.api.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse
1414
from karapace.api.user import get_current_user
1515
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
@@ -23,7 +23,7 @@
2323
prefix="/config",
2424
tags=["config"],
2525
responses={404: {"description": "Not found"}},
26-
route_class=RawPathRoute,
26+
route_class=SchemaRegistryRoute,
2727
)
2828

2929

src/karapace/api/routers/mode.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from karapace.api.container import SchemaRegistryContainer
99
from karapace.api.controller import KarapaceSchemaRegistryController
1010
from karapace.api.routers.errors import unauthorized
11-
from karapace.api.routers.raw_path_router import RawPathRoute
11+
from karapace.api.routers.raw_path_router import SchemaRegistryRoute
1212
from karapace.api.routers.requests import ModeResponse
1313
from karapace.api.user import get_current_user
1414
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
@@ -21,7 +21,7 @@
2121
prefix="/mode",
2222
tags=["mode"],
2323
responses={404: {"description": "Not found"}},
24-
route_class=RawPathRoute,
24+
route_class=SchemaRegistryRoute,
2525
)
2626

2727

src/karapace/api/routers/raw_path_router.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@
99
Copyright (c) 2018 Sebastián Ramírez
1010
"""
1111

12-
from fastapi import HTTPException # noqa: E402
12+
from collections.abc import Coroutine, Callable # noqa: E402
13+
from fastapi import HTTPException, Request, Response # noqa: E402
14+
from typing import Any # noqa: E402
15+
from fastapi.responses import JSONResponse # noqa: E402
1316
from fastapi.routing import APIRoute # noqa: E402
17+
from karapace.api.content_type import negotiate_schema_content_type, SCHEMA_RESPONSE_DEFAULT_CONTENT_TYPE # noqa: E402
1418
from starlette.routing import Match # noqa: E402
1519
from starlette.types import Scope # noqa: E402
1620

@@ -42,3 +46,39 @@ def matches(self, scope: Scope) -> tuple[Match, Scope]:
4246
new_path = re.sub(r"\?.*", "", raw_path)
4347
scope["path"] = new_path
4448
return super().matches(scope)
49+
50+
51+
class SchemaRegistryRoute(RawPathRoute):
52+
"""Route class for schema-registry endpoints that require content negotiation.
53+
54+
Validates Accept and Content-Type headers before any dependency injection or
55+
body parsing occurs. Returns 406/415 errors immediately for invalid headers,
56+
and sets the negotiated Content-Type on successful responses.
57+
"""
58+
59+
def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
60+
original_handler = super().get_route_handler()
61+
62+
async def schema_content_handler(request: Request) -> Response:
63+
try:
64+
response_content_type = negotiate_schema_content_type(request)
65+
except HTTPException as exc:
66+
return JSONResponse(
67+
status_code=exc.status_code,
68+
content=exc.detail,
69+
headers={"Content-Type": SCHEMA_RESPONSE_DEFAULT_CONTENT_TYPE},
70+
)
71+
72+
if request.headers.get("Content-Type") == "application/octet-stream":
73+
new_headers = request.headers.mutablecopy()
74+
new_headers["Content-Type"] = "application/json"
75+
request._headers = new_headers
76+
request.scope.update(headers=request.headers.raw)
77+
78+
request.state.schema_response_content_type = response_content_type
79+
80+
response = await original_handler(request)
81+
response.headers["Content-Type"] = response_content_type
82+
return response
83+
84+
return schema_content_handler

src/karapace/api/routers/schemas.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from fastapi import APIRouter, Depends, Query
88
from karapace.api.container import SchemaRegistryContainer
99
from karapace.api.controller import KarapaceSchemaRegistryController
10+
from karapace.api.routers.raw_path_router import SchemaRegistryRoute
1011
from karapace.api.routers.requests import SchemaListingItem, SchemasResponse, SubjectVersion
1112
from karapace.api.user import get_current_user
1213
from karapace.core.auth import AuthenticatorAndAuthorizer, User
@@ -18,6 +19,7 @@
1819
prefix="/schemas",
1920
tags=["schemas"],
2021
responses={404: {"description": "Not found"}},
22+
route_class=SchemaRegistryRoute,
2123
)
2224

2325

src/karapace/api/routers/subjects.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from karapace.api.controller import KarapaceSchemaRegistryController
1010
from karapace.api.forward_client import ForwardClient
1111
from karapace.api.routers.errors import no_primary_url_error, unauthorized
12-
from karapace.api.routers.raw_path_router import RawPathRoute
12+
from karapace.api.routers.raw_path_router import SchemaRegistryRoute
1313
from karapace.api.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse
1414
from karapace.api.user import get_current_user
1515
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
@@ -28,7 +28,7 @@
2828
prefix="/subjects",
2929
tags=["subjects"],
3030
responses={404: {"description": "Not found"}},
31-
route_class=RawPathRoute,
31+
route_class=SchemaRegistryRoute,
3232
)
3333

3434

tests/e2e/instrumentation/test_prometheus.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,24 @@ async def test_metrics_endpoint(registry_async_client: Client) -> None:
2121
assert result.status_code == HTTPStatus.OK.value
2222

2323

24+
async def test_metrics_endpoint_with_prometheus_accept_header(registry_async_client: Client) -> None:
25+
"""Regression test: Prometheus sends Accept headers that don't match schema-registry content types.
26+
27+
The /metrics endpoint must be excluded from the schema-registry content negotiation
28+
middleware, otherwise it returns HTTP 406 Not Acceptable.
29+
"""
30+
prometheus_accept = (
31+
"application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,"
32+
"text/plain;version=0.0.4;q=0.5,*/*;q=0.1"
33+
)
34+
result: Result = await registry_async_client.get(
35+
PrometheusInstrumentation.METRICS_ENDPOINT_PATH,
36+
headers={"Accept": prometheus_accept},
37+
json_response=False,
38+
)
39+
assert result.status_code == HTTPStatus.OK.value
40+
41+
2442
async def test_metrics_endpoint_parsed_response(registry_async_client: Client) -> None:
2543
result: Result = await registry_async_client.get(
2644
PrometheusInstrumentation.METRICS_ENDPOINT_PATH,

tests/integration/test_health_check.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,31 @@
1313
from tests.integration.utils.cluster import RegistryDescription
1414

1515

16+
async def test_health_check_with_non_schema_accept_header(registry_async_client: Client) -> None:
17+
"""Regression test: /_health must be exempt from schema-registry content negotiation.
18+
19+
Monitoring tools (e.g. Kubernetes probes) may send Accept headers that don't match
20+
schema-registry content types. This must not result in HTTP 406.
21+
"""
22+
res = await registry_async_client.get("/_health", headers={"Accept": "text/plain, */*;q=0.1"})
23+
assert res.status_code in (
24+
http.HTTPStatus.OK,
25+
http.HTTPStatus.SERVICE_UNAVAILABLE,
26+
), f"Expected 200 or 503, got {res.status_code}"
27+
28+
29+
async def test_root_with_non_schema_accept_header(registry_async_client: Client) -> None:
30+
"""Regression test: / must be exempt from schema-registry content negotiation."""
31+
res = await registry_async_client.get("/", headers={"Accept": "text/html"})
32+
assert res.ok, f"Expected 200, got {res.status_code}"
33+
34+
35+
async def test_master_available_with_non_schema_accept_header(registry_async_client: Client) -> None:
36+
"""Regression test: /master_available must be exempt from schema-registry content negotiation."""
37+
res = await registry_async_client.get("/master_available", headers={"Accept": "text/plain"})
38+
assert res.ok, f"Expected 200, got {res.status_code}"
39+
40+
1641
async def test_health_check(
1742
registry_cluster: RegistryDescription, registry_async_client: Client, admin_client: KafkaAdminClient
1843
) -> None:

0 commit comments

Comments
 (0)