Skip to content

Commit 64a8b29

Browse files
Merge pull request #484 from aiven/anatolii/backup-restore-fix
Handling None properly during backup
2 parents 0dda1d6 + 35a026a commit 64a8b29

3 files changed

Lines changed: 16 additions & 18 deletions

File tree

karapace/schema_backup.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ def _create_schema_topic_if_needed(self) -> None:
140140
return
141141

142142
self.init_admin_client()
143-
144143
start_time = time.monotonic()
145144
wait_time = constants.MINUTE
146145
while True:
@@ -210,9 +209,10 @@ def _restore_backup_version_1_single_array(self, fp: IO) -> None:
210209

211210
def _restore_backup_version_2(self, fp: IO) -> None:
212211
for line in fp:
213-
hex_key, hex_value = line.split("\t")
214-
key = base64.b16decode(hex_key).decode("utf8")
215-
value = base64.b16decode(hex_value.strip()).decode("utf8") # strip to remove the linefeed
212+
hex_key, hex_value = [val.strip() for val in line.split("\t")] # strip to remove the linefeed
213+
214+
key = base64.b16decode(hex_key).decode("utf8") if hex_key != "null" else hex_key
215+
value = base64.b16decode(hex_value.strip()).decode("utf8") if hex_value != "null" else hex_value
216216
self._handle_restore_message((key, value))
217217

218218
def export(self, export_func) -> None:
@@ -240,9 +240,9 @@ def export(self, export_func) -> None:
240240
LOG.info("Schema export written to stdout")
241241
self.close()
242242

243-
def encode_key(self, key: Optional[Union[JsonData, str]]) -> bytes:
244-
if not key:
245-
return b""
243+
def encode_key(self, key: Optional[Union[JsonData, str]]) -> Optional[bytes]:
244+
if key == "null":
245+
return None
246246
if not self.key_formatter:
247247
if isinstance(key, str):
248248
return key.encode("utf8")
@@ -261,12 +261,8 @@ def encode_value(value: Union[JsonData, str]) -> Optional[bytes]:
261261

262262

263263
def serialize_record(key_bytes: Optional[bytes], value_bytes: Optional[bytes]) -> str:
264-
key = b""
265-
if key_bytes is not None:
266-
key = base64.b16encode(key_bytes).decode("utf8")
267-
value = b""
268-
if value_bytes is not None:
269-
value = base64.b16encode(value_bytes).decode("utf8")
264+
key = base64.b16encode(key_bytes).decode("utf8") if key_bytes is not None else "null"
265+
value = base64.b16encode(value_bytes).decode("utf8") if value_bytes is not None else "null"
270266
return f"{key}\t{value}\n"
271267

272268

tests/integration/test_data/test_restore_non_schema_topic_v2.log

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
6B65795F74776F 76616C75655F74776F
44
76616C75655F7468726565
55
6B65795F666F7572
6+
6B65795F666F7572 null
7+
null 76616C75655F7468726565

tests/integration/test_schema_backup.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from kafka import KafkaConsumer
88
from karapace.client import Client
99
from karapace.config import set_config_defaults
10+
from karapace.kafka_rest_apis import KafkaRestAdminClient
1011
from karapace.key_format import is_key_in_canonical_format
1112
from karapace.schema_backup import SchemaBackup, serialize_record
1213
from karapace.utils import Expiration
@@ -69,27 +70,26 @@ async def test_backup_get(
6970

7071

7172
async def test_backup_restore_and_get_non_schema_topic(
72-
kafka_servers: KafkaServers,
73-
tmp_path: Path,
73+
kafka_servers: KafkaServers, tmp_path: Path, admin_client: KafkaRestAdminClient
7474
) -> None:
7575
test_topic_name = new_random_name("non-schemas")
7676

7777
config = set_config_defaults(
7878
{
7979
"bootstrap_uri": kafka_servers.bootstrap_servers,
80-
"topic_name": test_topic_name,
8180
}
8281
)
82+
admin_client.new_topic(name=test_topic_name)
8383

8484
# Restore from backup
8585
test_data_path = Path("tests/integration/test_data/")
8686
restore_location = test_data_path / "test_restore_non_schema_topic_v2.log"
87-
sb = SchemaBackup(config, str(restore_location))
87+
sb = SchemaBackup(config, str(restore_location), topic_option=test_topic_name)
8888
sb.restore_backup()
8989

9090
# Get the backup
9191
backup_location = tmp_path / "non_schemas_topic.log"
92-
sb = SchemaBackup(config, str(backup_location))
92+
sb = SchemaBackup(config, str(backup_location), topic_option=test_topic_name)
9393
sb.export(serialize_record)
9494
# The backup file has been created
9595
assert os.path.exists(backup_location)

0 commit comments

Comments
 (0)