Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,36 @@ These unique (per instance of the schema registry) consumer group names are pref
.. _`documentation`: https://docs.confluent.io/platform/current/schema-registry/security/index.html#authorizing-access-to-the-schemas-topic
.. _`permissions`: https://docs.confluent.io/platform/current/kafka/authorization.html#group-resource-type-operations

REST Proxy read-only schema lookup
==================================

When running Karapace REST Proxy together with Schema Registry, you can control
how the REST Proxy interacts with the registry when resolving schema IDs.

By default, Karapace REST Proxy registers schemas on demand by calling
``POST /subjects/{subject}/versions`` when a new schema is seen in a produce
request. This requires ``Write`` permissions on the corresponding ``Subject:``
resources in the Schema Registry ACL configuration.

If you want REST Proxy to operate in a read-only mode with respect to Schema
Registry, enable the configuration option::

rest_lookup_schema_before_register = true

When this option is enabled, REST Proxy will first try to look up an existing
schema under the subject using::

POST /subjects/(string: subject)

If the schema is found, the existing schema ID is used and no new registration
is performed. Only if the lookup fails (schema not found) will REST Proxy fall
back to registering the schema with ``POST /subjects/{subject}/versions``.

Together with Schema Registry ACLs, you can enforce that REST Proxy has only
``Read`` permissions on ``Subject:`` resources (allowing lookups) while a
separate service with ``Write`` permissions is responsible for registering new
schemas.

OAuth2 authentication and authorization of Karapace
===================================================

Expand Down
1 change: 1 addition & 0 deletions src/karapace/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Config(BaseSettings):
registry_authfile: str | None = None
rest_authorization: bool = False
rest_base_uri: str | None = None
rest_lookup_schema_before_register: bool = False
log_handler: str | None = "stdout"
log_level: str = "DEBUG"
log_format: str = "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s"
Expand Down
59 changes: 49 additions & 10 deletions src/karapace/core/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import avro.schema
import io
import struct
from http import HTTPStatus

START_BYTE = 0x0
HEADER_FORMAT = ">bI"
Expand Down Expand Up @@ -123,21 +124,52 @@ def __init__(
self.client = Client(server_uri=schema_registry_url, server_ca=server_ca, session_auth=session_auth)
self.base_url = schema_registry_url

async def post_new_schema(
self, subject: str, schema: ValidatedTypedSchema, references: Reference | None = None
) -> SchemaId:
@staticmethod
def _build_schema_payload(schema: ValidatedTypedSchema, references: Reference | None = None) -> dict[str, object]:
if schema.schema_type is SchemaType.PROTOBUF:
payload: dict[str, object] = {
"schema": str(schema),
"schemaType": schema.schema_type.value,
}
if references:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()}
else:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value}
payload["references"] = references.json()
else:
payload = {"schema": json_encode(schema.to_dict()), "schemaType": schema.schema_type.value}
payload = {
"schema": json_encode(schema.to_dict()),
"schemaType": schema.schema_type.value,
}
return payload

async def post_new_schema(
self, subject: str, schema: ValidatedTypedSchema, references: Reference | None = None
) -> SchemaId:
payload = self._build_schema_payload(schema, references)
result = await self.client.post(f"subjects/{quote(subject)}/versions", json=payload)
if not result.ok:
raise SchemaRetrievalError(result.json())
return SchemaId(result.json()["id"])

async def lookup_schema(
self,
subject: str,
schema: ValidatedTypedSchema,
references: Reference | None = None,
) -> SchemaId | None:
payload = self._build_schema_payload(schema, references)
result = await self.client.post(f"subjects/{quote(subject)}", json=payload)

if result.status_code == HTTPStatus.NOT_FOUND:
return None

if not result.ok:
raise SchemaRetrievalError(result.json())

json_result = result.json()
if "id" not in json_result:
raise SchemaRetrievalError(f"Invalid result format: {json_result}")

return SchemaId(json_result["id"])

async def _get_schema_recursive(
self,
subject: Subject,
Expand Down Expand Up @@ -321,16 +353,23 @@ async def get_schema_for_subject(self, subject: Subject) -> TypedSchema:
self.ids_to_schemas[schema_id] = schema
return schema

async def upsert_id_for_schema(self, schema_typed: ValidatedTypedSchema, subject: str) -> SchemaId:
async def upsert_id_for_schema(
self, schema_typed: ValidatedTypedSchema, subject: str, lookup_first: bool = False
) -> SchemaId:
assert self.registry_client, "must not call this method after the object is closed."

schema_ser = str(schema_typed)

if schema_ser in self.schemas_to_ids:
return self.schemas_to_ids[schema_ser]

# note: the post is idempotent, so it is like a get or insert (aka upsert)
schema_id = await self.registry_client.post_new_schema(subject, schema_typed)
schema_id: SchemaId | None = None

if lookup_first:
schema_id = await self.registry_client.lookup_schema(subject, schema_typed)

if schema_id is None:
schema_id = await self.registry_client.post_new_schema(subject, schema_typed)

async with self.state_lock:
self.schemas_to_ids[schema_ser] = schema_id
Expand Down
4 changes: 3 additions & 1 deletion src/karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,9 @@ async def _query_schema_id_from_cache_or_registry(
schema_id = self.topic_schema_cache.get_schema_id(subject_name, parsed_schema)
if schema_id is None:
log.debug("[resolve schema id] Registering / Retrieving ID for %s and schema %s", subject_name, schema_str)
schema_id = await self.serializer.upsert_id_for_schema(parsed_schema, subject_name)
schema_id = await self.serializer.upsert_id_for_schema(
parsed_schema, subject_name, self.config.rest_lookup_schema_before_register
)
log.debug("[resolve schema id] Found schema id %s from registry for subject %s", schema_id, subject_name)
self.topic_schema_cache.set_schema(subject_name, schema_id, parsed_schema)
else:
Expand Down
63 changes: 63 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,69 @@ async def fixture_registry_async_client_custom_client_id(
await client.close()


@pytest.fixture(scope="function", name="rest_async_lookup_first")
async def fixture_rest_async_lookup_first(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[KafkaRest | None]:
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config = Config()
config.admin_metadata_max_age = 2
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES
config.waiting_time_before_acting_as_master_ms = 500
config.rest_lookup_schema_before_register = True
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
rest.serializer.registry_client.client = registry_async_client
try:
yield rest
finally:
await rest.close()


@pytest.fixture(scope="function", name="rest_async_lookup_first_client")
async def fixture_rest_async_lookup_first_client(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
rest_async_lookup_first: KafkaRest,
aiohttp_client: AiohttpClient,
) -> AsyncIterator[Client]:
rest_url = request.config.getoption("rest_url")

# client and server_uri are incompatible settings.
if rest_url:
client = Client(server_uri=rest_url)
else:

async def get_client(**kwargs) -> TestClient:
return await aiohttp_client(rest_async_lookup_first.app)

client = Client(client_factory=get_client)

try:
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client
finally:
await client.close()


@pytest.fixture(scope="function", name="rest_async_custom_client_id")
async def fixture_rest_async_custom_client_id(
request: SubRequest,
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/kafka_rest_apis/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,35 @@ async def test_avro_publish(
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_avro_publish_with_lookup_only(
rest_async_lookup_first_client: Client,
registry_async_client: Client,
admin_client: KafkaAdminClient,
) -> None:
"""Publish Avro records via REST Proxy with rest_lookup_schema_before_register=True.

Verifies that when schema is pre-registered, REST Proxy resolves its ID
via lookup (not register) and produces successfully with the correct schema_id.
"""
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_lookup_first_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)

subject = f"{topic_name}-value"
register_response = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": schema_avro_json})
assert register_response.ok
expected_schema_id = register_response.json()["id"]

payload = {"value_schema": schema_avro_json, "records": [{"value": value} for value in test_objects_avro]}
publish_response = await rest_async_lookup_first_client.post(
f"/topics/{topic_name}",
json=payload,
headers=REST_HEADERS["avro"],
)

check_successful_publish_response(publish_response, test_objects_avro)
assert publish_response.json()["value_schema_id"] == expected_schema_id


async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaAdminClient) -> None:
topic_name = new_topic(admin_client)
prepared_records = [
Expand Down
Loading