Skip to content

Commit 6642c7d

Browse files
Merge pull request #632 from aiven/giuseppelillo/topic-configurations-v3
feature: Add topic configurations to v3 format
2 parents 7e024f1 + fdaab32 commit 6642c7d

28 files changed

Lines changed: 425 additions & 83 deletions

File tree

karapace/avro_dataclasses/introspect.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
from .schema import AvroType, FieldSchema, RecordSchema
9+
from collections.abc import Mapping
910
from dataclasses import Field, fields, is_dataclass, MISSING
1011
from enum import Enum
1112
from functools import lru_cache
@@ -30,7 +31,7 @@ class UnsupportedAnnotation(NotImplementedError):
3031
...
3132

3233

33-
class UnderspecifiedArray(UnsupportedAnnotation):
34+
class UnderspecifiedAnnotation(UnsupportedAnnotation):
3435
...
3536

3637

@@ -93,7 +94,7 @@ def _field_type(field: Field, type_: object) -> AvroType: # pylint: disable=too
9394
if origin in sequence_types:
9495
return _field_type_array(field, origin, type_)
9596
if type_ in sequence_types:
96-
raise UnderspecifiedArray("Inner type must be specified for sequence types")
97+
raise UnderspecifiedAnnotation("Inner type must be specified for sequence types")
9798

9899
# Handle enums.
99100
if isinstance(type_, type) and issubclass(type_, Enum):
@@ -107,6 +108,25 @@ def _field_type(field: Field, type_: object) -> AvroType: # pylint: disable=too
107108
}
108109
)
109110

111+
# Handle map types.
112+
if origin is Mapping:
113+
args = get_args(type_)
114+
if len(args) != 2:
115+
raise UnderspecifiedAnnotation("Key and value types must be specified for map types")
116+
if args[0] is not str:
117+
raise UnsupportedAnnotation("Key type must be str")
118+
return FieldSchema(
119+
{
120+
"type": "map",
121+
"values": _field_type(field, args[1]),
122+
**(
123+
{"default": field.default_factory()}
124+
if field.default_factory is not MISSING
125+
else {} # type: ignore[misc]
126+
),
127+
}
128+
)
129+
110130
raise NotImplementedError(
111131
f"Found an unknown type {type_!r} while assembling Avro schema for the field "
112132
f"{field.name!r}. The Avro dataclasses implementation likely needs to be "

karapace/avro_dataclasses/schema.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55
from __future__ import annotations
66

7+
from collections.abc import Mapping
78
from typing import Literal
89
from typing_extensions import NotRequired, TypeAlias, TypedDict
910

@@ -29,9 +30,16 @@ class EnumType(TypedDict):
2930
default: NotRequired[str]
3031

3132

33+
class MapType(TypedDict):
34+
name: str
35+
type: Literal["map"]
36+
values: AvroType
37+
default: NotRequired[Mapping[str, AvroType]]
38+
39+
3240
TypeUnit: TypeAlias = "Primitive | TypeObject"
3341
UnionType: TypeAlias = "list[TypeUnit]"
34-
AvroType: TypeAlias = "TypeUnit | UnionType | RecordSchema | ArrayType | EnumType"
42+
AvroType: TypeAlias = "TypeUnit | UnionType | RecordSchema | ArrayType | EnumType | MapType"
3543

3644

3745
class FieldSchema(TypedDict):

karapace/backup/api.py

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
"""
77
from __future__ import annotations
88

9-
from .backends.reader import BaseBackupReader, BaseItemsBackupReader, ProducerSend, RestoreTopic
9+
from .backends.reader import BaseBackupReader, BaseItemsBackupReader, ProducerSend, RestoreTopic, RestoreTopicLegacy
1010
from .backends.v3.constants import V3_MARKER
1111
from .backends.v3.schema import ChecksumAlgorithm
1212
from .backends.writer import BackupWriter, StdOut
1313
from .encoders import encode_key, encode_value
14-
from .errors import BackupError, EmptyPartition, PartitionCountError, StaleConsumerError
14+
from .errors import BackupError, BackupTopicAlreadyExists, EmptyPartition, PartitionCountError, StaleConsumerError
1515
from .poll_timeout import PollTimeout
16+
from .topic_configurations import ConfigSource, get_topic_configurations
1617
from enum import Enum
1718
from functools import partial
1819
from kafka import KafkaConsumer, KafkaProducer
@@ -27,12 +28,11 @@
2728
from karapace.config import Config
2829
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
2930
from karapace.key_format import KeyFormatter
30-
from karapace.schema_reader import new_schema_topic_from_config
3131
from karapace.utils import assert_never
3232
from pathlib import Path
3333
from rich.console import Console
3434
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
35-
from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar
35+
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, NoReturn, TypeVar
3636

3737
import contextlib
3838
import datetime
@@ -178,31 +178,27 @@ def _admin(config: Config) -> KafkaAdminClient:
178178
wait=wait_fixed(1), # seconds
179179
retry=retry_if_exception_type(KafkaError),
180180
)
181-
def _maybe_create_topic(config: Config, name: str, backup_version: BackupVersion) -> None:
182-
if backup_version in {BackupVersion.V1, BackupVersion.V2}:
183-
topic = new_schema_topic_from_config(config)
184-
185-
if topic.name != name:
186-
LOG.warning(
187-
"Not creating topic, because the name %r from the config and the name %r from the CLI differ.",
188-
topic.name,
189-
name,
190-
)
191-
return
192-
else:
193-
topic = NewTopic(
194-
name=name,
195-
num_partitions=1,
196-
replication_factor=config["replication_factor"],
197-
topic_configs={"cleanup.policy": "compact"},
198-
)
181+
def _maybe_create_topic(
182+
name: str,
183+
*,
184+
config: Config,
185+
replication_factor: int,
186+
topic_configs: Mapping[str, str],
187+
) -> bool:
188+
"""Returns True if topic creation was successful, False if topic already exists"""
189+
topic = NewTopic(
190+
name=name,
191+
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
192+
replication_factor=replication_factor,
193+
topic_configs=topic_configs,
194+
)
199195

200196
with _admin(config) as admin:
201197
try:
202198
admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
203199
except TopicAlreadyExistsError:
204200
LOG.debug("Topic %r already exists", topic.name)
205-
return
201+
return False
206202

207203
LOG.info(
208204
"Created topic %r (partition count: %s, replication factor: %s, config: %s)",
@@ -211,7 +207,7 @@ def _maybe_create_topic(config: Config, name: str, backup_version: BackupVersion
211207
topic.replication_factor,
212208
topic.topic_configs,
213209
)
214-
return
210+
return True
215211

216212

217213
@contextlib.contextmanager
@@ -307,18 +303,38 @@ def _write_partition(
307303
)
308304

309305

310-
def _handle_restore_topic(
311-
instruction: RestoreTopic,
306+
def _handle_restore_topic_legacy(
307+
instruction: RestoreTopicLegacy,
312308
config: Config,
313-
backup_version: BackupVersion,
314309
) -> None:
310+
if config["topic_name"] != instruction.topic_name:
311+
LOG.warning(
312+
"Not creating topic, because the name %r from the config and the name %r from the CLI differ.",
313+
config["topic_name"],
314+
instruction.topic_name,
315+
)
316+
return
315317
_maybe_create_topic(
316318
config=config,
317-
name=instruction.name,
318-
backup_version=backup_version,
319+
name=instruction.topic_name,
320+
replication_factor=config["replication_factor"],
321+
topic_configs={"cleanup.policy": "compact"},
319322
)
320323

321324

325+
def _handle_restore_topic(
326+
instruction: RestoreTopic,
327+
config: Config,
328+
) -> None:
329+
if not _maybe_create_topic(
330+
config=config,
331+
name=instruction.topic_name,
332+
replication_factor=instruction.replication_factor,
333+
topic_configs=instruction.topic_configs,
334+
):
335+
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
336+
337+
322338
def _raise_backup_error(exception: Exception) -> NoReturn:
323339
raise BackupError("Error while producing restored messages") from exception
324340

@@ -347,6 +363,12 @@ def restore_backup(
347363
backup_location: Path | StdOut,
348364
topic_name: TopicName,
349365
) -> None:
366+
"""Restores a backup from the specified location into the configured topic.
367+
368+
:raises Exception: if production fails, concrete exception types are unknown,
369+
see Kafka implementation.
370+
:raises BackupTopicAlreadyExists: if backup version is V3 and topic already exists
371+
"""
350372
if isinstance(backup_location, str):
351373
raise NotImplementedError("Cannot restore backups from stdin")
352374

@@ -377,9 +399,12 @@ def restore_backup(
377399
producer = None
378400

379401
for instruction in backend.read(backup_location, topic_name):
380-
if isinstance(instruction, RestoreTopic):
381-
_handle_restore_topic(instruction, config, backup_version=backup_version)
382-
producer = stack.enter_context(_producer(config, instruction.name))
402+
if isinstance(instruction, RestoreTopicLegacy):
403+
_handle_restore_topic_legacy(instruction, config)
404+
producer = stack.enter_context(_producer(config, instruction.topic_name))
405+
elif isinstance(instruction, RestoreTopic):
406+
_handle_restore_topic(instruction, config)
407+
producer = stack.enter_context(_producer(config, instruction.topic_name))
383408
elif isinstance(instruction, ProducerSend):
384409
if producer is None:
385410
raise RuntimeError("Backend has not yet sent RestoreTopic.")
@@ -396,6 +421,7 @@ def create_backup(
396421
*,
397422
poll_timeout: PollTimeout = PollTimeout.default(),
398423
overwrite: bool = False,
424+
replication_factor: int | None = None,
399425
) -> None:
400426
"""Creates a backup of the configured topic.
401427
@@ -404,6 +430,9 @@ def create_backup(
404430
if not records are received within that time and the target offset has not
405431
been reached an exception is raised. Defaults to one minute.
406432
:param overwrite: the output file if it exists.
433+
:param replication_factor: Value will be stored in metadata, and used when
434+
creating topic during restoration. This is required for Version 3 backup,
435+
but has no effect on earlier versions, as they don't handle metadata.
407436
408437
:raises Exception: if consumption fails, concrete exception types are unknown,
409438
see Kafka implementation.
@@ -416,6 +445,8 @@ def create_backup(
416445
"""
417446
if version is BackupVersion.V3 and not isinstance(backup_location, Path):
418447
raise RuntimeError("Backup format version 3 does not support writing to stdout.")
448+
if version is BackupVersion.V3 and replication_factor is None:
449+
raise RuntimeError("Backup format version 3 needs a replication factor to be specified.")
419450

420451
start_time = datetime.datetime.now(datetime.timezone.utc)
421452
backend = version.writer()
@@ -426,6 +457,10 @@ def create_backup(
426457
version.name,
427458
topic_name,
428459
)
460+
with _admin(config) as admin:
461+
topic_configurations = get_topic_configurations(
462+
admin=admin, topic_name=topic_name, config_source_filter={ConfigSource.TOPIC_CONFIG}
463+
)
429464

430465
# Note: It's expected that we at some point want to introduce handling of
431466
# multi-partition topics here. The backend interface is built with that in
@@ -464,6 +499,8 @@ def create_backup(
464499
started_at=start_time,
465500
finished_at=end_time,
466501
partition_count=1,
502+
replication_factor=replication_factor if replication_factor is not None else config["replication_factor"],
503+
topic_configurations=topic_configurations,
467504
data_files=[data_file] if data_file else [],
468505
)
469506

@@ -506,6 +543,9 @@ def inspect(backup_location: Path | StdOut) -> None:
506543
"topic_name": metadata.topic_name,
507544
"topic_id": None if metadata.topic_id is None else str(metadata.topic_id),
508545
"partition_count": metadata.partition_count,
546+
"record_count": metadata.record_count,
547+
"replication_factor": metadata.replication_factor,
548+
"topic_configurations": metadata.topic_configurations,
509549
"checksum_algorithm": metadata.checksum_algorithm.value,
510550
"data_files": tuple(
511551
{

karapace/backup/backends/reader.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from karapace.dataclasses import default_dataclass
88
from karapace.typing import JsonData, JsonObject
99
from pathlib import Path
10-
from typing import Callable, ClassVar, Final, Generator, IO, Iterator, Optional, Sequence, TypeVar, Union
10+
from typing import Callable, ClassVar, Final, Generator, IO, Iterator, Mapping, Optional, Sequence, TypeVar, Union
1111
from typing_extensions import TypeAlias
1212

1313
import abc
@@ -17,10 +17,18 @@
1717
PARTITION_ZERO: Final = 0
1818

1919

20+
@default_dataclass
21+
class RestoreTopicLegacy:
22+
topic_name: str
23+
partition_count: int
24+
25+
2026
@default_dataclass
2127
class RestoreTopic:
22-
name: str
28+
topic_name: str
2329
partition_count: int
30+
replication_factor: int
31+
topic_configs: Mapping[str, str]
2432

2533

2634
@default_dataclass
@@ -33,7 +41,7 @@ class ProducerSend:
3341
timestamp: int | None = None
3442

3543

36-
Instruction: TypeAlias = "RestoreTopic | ProducerSend"
44+
Instruction: TypeAlias = "RestoreTopicLegacy | RestoreTopic | ProducerSend"
3745

3846

3947
KeyEncoder: TypeAlias = Callable[[Union[JsonObject, str]], Optional[bytes]]
@@ -78,8 +86,8 @@ def read(
7886
path: Path,
7987
topic_name: str,
8088
) -> Generator[Instruction, None, None]:
81-
yield RestoreTopic(
82-
name=topic_name,
89+
yield RestoreTopicLegacy(
90+
topic_name=topic_name,
8391
partition_count=1,
8492
)
8593
with path.open("r") as buffer:

karapace/backup/backends/v3/avro/Metadata.avsc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
"name": "partition_count",
4949
"type": "int"
5050
},
51+
{
52+
"name": "replication_factor",
53+
"type": "int"
54+
},
55+
{
56+
"name": "topic_configurations",
57+
"type": {
58+
"type": "map",
59+
"values": "string"
60+
}
61+
},
5162
{
5263
"name": "data_files",
5364
"type": {

0 commit comments

Comments
 (0)