Skip to content

Commit 2966a54

Browse files
authored
Merge pull request #472 from aiven/jjaakola-aiven-schema-backup-tool-crashed-on-null-key-and-value
fix: tolerate null record keys and values on schema backup and restore
2 parents da670e3 + c929e16 commit 2966a54

5 files changed

Lines changed: 59 additions & 11 deletions

File tree

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ repos:
4040
rev: v4.1.0
4141
hooks:
4242
- id: trailing-whitespace
43-
exclude: ^vendor/|^tests/.*/fixtures/.*
43+
exclude: ^vendor/|^tests/.*/fixtures/.*|^tests/integration/test_data/.*
4444
- id: end-of-file-fixer
45-
exclude: ^vendor/|^tests/.*/fixtures/.*
45+
exclude: ^vendor/|^tests/.*/fixtures/.*|^tests/integration/test_data/.*
4646
- id: debug-statements
4747

4848
# https://pre-commit.com/#repository-local-hooks

karapace/schema_backup.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ def export(self, export_func) -> None:
240240
LOG.info("Schema export written to stdout")
241241
self.close()
242242

243-
def encode_key(self, key: Union[JsonData, str]) -> bytes:
243+
def encode_key(self, key: Optional[Union[JsonData, str]]) -> bytes:
244+
if not key:
245+
return b""
244246
if not self.key_formatter:
245247
if isinstance(key, str):
246248
return key.encode("utf8")
@@ -258,9 +260,13 @@ def encode_value(value: Union[JsonData, str]) -> Optional[bytes]:
258260
return json_encode(value, sort_keys=False, binary=True)
259261

260262

261-
def serialize_schema_message(key_bytes: bytes, value_bytes: bytes) -> str:
262-
key = base64.b16encode(key_bytes).decode("utf8")
263-
value = base64.b16encode(value_bytes).decode("utf8")
263+
def serialize_record(key_bytes: Optional[bytes], value_bytes: Optional[bytes]) -> str:
264+
key = b""
265+
if key_bytes is not None:
266+
key = base64.b16encode(key_bytes).decode("utf8")
267+
value = b""
268+
if value_bytes is not None:
269+
value = base64.b16encode(value_bytes).decode("utf8")
264270
return f"{key}\t{value}\n"
265271

266272

@@ -281,7 +287,7 @@ def anonymize_avro_schema_message(key_bytes: bytes, value_bytes: bytes) -> str:
281287
# The schemas topic contain all changes to schema metadata.
282288
if key.get("subject", None):
283289
key["subject"] = anonymize_avro.anonymize_name(key["subject"])
284-
return serialize_schema_message(json.dumps(key).encode("utf8"), json.dumps(value).encode("utf8"))
290+
return serialize_record(json.dumps(key).encode("utf8"), json.dumps(value).encode("utf8"))
285291

286292

287293
def parse_args():
@@ -310,7 +316,7 @@ def main() -> int:
310316
sb = SchemaBackup(config, args.location, args.topic)
311317

312318
if args.command == "get":
313-
sb.export(serialize_schema_message)
319+
sb.export(serialize_record)
314320
return 0
315321
if args.command == "restore":
316322
sb.restore_backup()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/V2
2+
6B65795F6F6E65 76616C75655F6F6E65
3+
6B65795F74776F 76616C75655F74776F
4+
76616C75655F7468726565
5+
6B65795F666F7572
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/V2
22
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20312C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D 7B0A202020202264656C65746564223A2066616C73652C0A20202020226964223A20312C0A2020202022736368656D61223A20225C22737472696E675C22222C0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20310A7D
33
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20322C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D 7B0A202020202264656C65746564223A2066616C73652C0A20202020226964223A20322C0A2020202022736368656D61223A20225C22737472696E675C22222C0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20320A7D
4+
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20322C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D

tests/integration/test_schema_backup.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from karapace.client import Client
99
from karapace.config import set_config_defaults
1010
from karapace.key_format import is_key_in_canonical_format
11-
from karapace.schema_backup import SchemaBackup, serialize_schema_message
11+
from karapace.schema_backup import SchemaBackup, serialize_record
1212
from karapace.utils import Expiration
1313
from pathlib import Path
1414
from tests.integration.utils.cluster import RegistryDescription
@@ -51,7 +51,7 @@ async def test_backup_get(
5151
}
5252
)
5353
sb = SchemaBackup(config, str(backup_location))
54-
sb.export(serialize_schema_message)
54+
sb.export(serialize_record)
5555

5656
# The backup file has been created
5757
assert os.path.exists(backup_location)
@@ -68,6 +68,43 @@ async def test_backup_get(
6868
assert lines == 1
6969

7070

71+
async def test_backup_restore_and_get_non_schema_topic(
72+
kafka_servers: KafkaServers,
73+
tmp_path: Path,
74+
) -> None:
75+
test_topic_name = new_random_name("non-schemas")
76+
77+
config = set_config_defaults(
78+
{
79+
"bootstrap_uri": kafka_servers.bootstrap_servers,
80+
"topic_name": test_topic_name,
81+
}
82+
)
83+
84+
# Restore from backup
85+
test_data_path = Path("tests/integration/test_data/")
86+
restore_location = test_data_path / "test_restore_non_schema_topic_v2.log"
87+
sb = SchemaBackup(config, str(restore_location))
88+
sb.restore_backup()
89+
90+
# Get the backup
91+
backup_location = tmp_path / "non_schemas_topic.log"
92+
sb = SchemaBackup(config, str(backup_location))
93+
sb.export(serialize_record)
94+
# The backup file has been created
95+
assert os.path.exists(backup_location)
96+
97+
restore_file_content = None
98+
with open(restore_location, "r", encoding="utf8") as fp:
99+
restore_file_content = fp.read()
100+
backup_file_content = None
101+
with open(backup_location, "r", encoding="utf8") as fp:
102+
backup_file_content = fp.read()
103+
assert restore_file_content is not None
104+
assert backup_file_content is not None
105+
assert restore_file_content == backup_file_content
106+
107+
71108
def _assert_canonical_key_format(
72109
bootstrap_servers: str,
73110
schemas_topic: str,
@@ -86,7 +123,6 @@ def _assert_canonical_key_format(
86123
while raw_msgs:
87124
for _, messages in raw_msgs.items():
88125
for message in messages:
89-
print(message)
90126
key = json.loads(message.key)
91127
assert is_key_in_canonical_format(key), f"Not in canonical format: {key}"
92128
raw_msgs = consumer.poll()

0 commit comments

Comments
 (0)