Skip to content

Commit 87a01e9

Browse files
authored
Merge pull request #592 from aiven/skip_recompression_in_startup
verify uncompressed file does not exists under /xlog and /timeline [BF-1358]
2 parents 6dce5bf + 60d6bf6 commit 87a01e9

File tree

3 files changed

+180
-60
lines changed

3 files changed

+180
-60
lines changed

pghoard/pghoard.py

Lines changed: 94 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from pathlib import Path
2424
from queue import Empty, Queue
2525
from threading import Event
26-
from typing import Dict, List, Optional
26+
from typing import Dict, List, NamedTuple, Optional
2727

2828
import psycopg2
2929
from rohmu import dates, get_transfer, rohmufile
@@ -39,7 +39,7 @@
3939
replication_connection_string_and_slot_using_pgpass, write_json_file
4040
)
4141
from pghoard.compressor import (
42-
CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionQueue
42+
CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionEvent, WalFileDeletionQueue
4343
)
4444
from pghoard.preservation_request import (
4545
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
@@ -56,6 +56,14 @@ class DeltaBaseBackupFailureInfo:
5656
retries: int = 0
5757

5858

59+
class BackupSitePaths(NamedTuple):
60+
compressed_xlog_path: str
61+
compressed_timeline_path: str
62+
uncompressed_files_path: str
63+
basebackup_path: str
64+
uncompressed_basebackup_path: str
65+
66+
5967
class InotifyAdapter:
6068
def __init__(self, queue):
6169
self.queue = queue
@@ -287,24 +295,25 @@ def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn):
287295
def _get_site_prefix(self, site):
288296
return self.config["backup_sites"][site]["prefix"]
289297

290-
def create_backup_site_paths(self, site):
298+
def create_backup_site_paths(self, site: str) -> BackupSitePaths:
291299
site_path = os.path.join(self.config["backup_location"], self._get_site_prefix(site))
292300
xlog_path = os.path.join(site_path, "xlog")
301+
timeline_path = os.path.join(site_path, "timeline")
293302
basebackup_path = os.path.join(site_path, "basebackup")
294303

295-
paths_to_create = [
296-
site_path,
297-
xlog_path,
298-
xlog_path + "_incoming",
299-
basebackup_path,
300-
basebackup_path + "_incoming",
301-
]
304+
backup_site_paths = BackupSitePaths(
305+
uncompressed_files_path=xlog_path + "_incoming",
306+
compressed_xlog_path=xlog_path,
307+
compressed_timeline_path=timeline_path,
308+
basebackup_path=basebackup_path,
309+
uncompressed_basebackup_path=basebackup_path + "_incoming",
310+
)
302311

303-
for path in paths_to_create:
312+
for path in backup_site_paths:
304313
if not os.path.exists(path):
305314
os.makedirs(path)
306315

307-
return xlog_path, basebackup_path
316+
return backup_site_paths
308317

309318
def delete_remote_wal_before(self, wal_segment, site, pg_version):
310319
self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version)
@@ -577,12 +586,15 @@ def startup_walk_for_missed_files(self):
577586
"""Check xlog and xlog_incoming directories for files that receivexlog has received but not yet
578587
compressed as well as the files we have compressed but not yet uploaded and process them."""
579588
for site in self.config["backup_sites"]:
580-
compressed_xlog_path, _ = self.create_backup_site_paths(site)
581-
uncompressed_xlog_path = compressed_xlog_path + "_incoming"
589+
backup_site_paths = self.create_backup_site_paths(site)
590+
591+
compressed_xlog_path = backup_site_paths.compressed_xlog_path
592+
compressed_timeline_path = backup_site_paths.compressed_timeline_path
593+
uncompressed_files_path = backup_site_paths.uncompressed_files_path
582594

583595
# Process uncompressed files (ie WAL pg_receivexlog received)
584-
for filename in os.listdir(uncompressed_xlog_path):
585-
full_path = os.path.join(uncompressed_xlog_path, filename)
596+
for filename in os.listdir(uncompressed_files_path):
597+
full_path = os.path.join(uncompressed_files_path, filename)
586598
if wal.PARTIAL_WAL_RE.match(filename):
587599
# pg_receivewal may have been in the middle of storing WAL file when PGHoard was stopped.
588600
# If the file is 0 or 16 MiB in size it will continue normally but in some cases the file can be
@@ -609,6 +621,34 @@ def startup_walk_for_missed_files(self):
609621

610622
filetype = FileType.Timeline if wal.TIMELINE_RE.match(filename) else FileType.Wal
611623

624+
# verify if file was already compressed, otherwise the transfer agent will encounter
625+
# duplicated UploadEvents. In case it was compressed, we should just add it to the deletion queue
626+
base_compressed_file_path = (
627+
compressed_timeline_path if filetype is FileType.Timeline else compressed_xlog_path
628+
)
629+
compressed_file_path = os.path.join(base_compressed_file_path, filename)
630+
is_already_compressed = os.path.exists(compressed_file_path)
631+
has_metadata_file = os.path.exists(compressed_file_path + ".metadata")
632+
633+
# the file was compressed correctly
634+
if is_already_compressed and has_metadata_file:
635+
self.log.debug("Uncompressed file %r is already compressed, adding to deletion queue.", full_path)
636+
if filetype is FileType.Timeline:
637+
os.unlink(full_path)
638+
else:
639+
delete_request = WalFileDeletionEvent(backup_site_name=site, file_path=Path(full_path))
640+
self.log.info("Adding an Uncompressed WAL file to deletion queue: %s", full_path)
641+
self.wal_file_deletion_queue.put(delete_request)
642+
continue
643+
644+
# delete compressed file and re-try
645+
if is_already_compressed and not has_metadata_file:
646+
self.log.info(
647+
"Deleting incomplete compressed file %r (missing metadata), compression will be re-tried",
648+
compressed_file_path,
649+
)
650+
os.unlink(compressed_file_path)
651+
612652
compression_event = CompressionEvent(
613653
file_type=filetype,
614654
file_path=FileTypePrefixes[filetype] / filename,
@@ -622,32 +662,35 @@ def startup_walk_for_missed_files(self):
622662
self.compression_queue.put(compression_event)
623663

624664
# Process compressed files (ie things we've processed but not yet uploaded)
625-
for filename in os.listdir(compressed_xlog_path):
626-
if filename.endswith(".metadata"):
627-
continue # silently ignore .metadata files, they're expected and processed below
628-
full_path = os.path.join(compressed_xlog_path, filename)
629-
metadata_path = full_path + ".metadata"
630-
is_xlog = wal.WAL_RE.match(filename)
631-
is_timeline = wal.TIMELINE_RE.match(filename)
632-
if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)):
633-
self.log.warning("Found invalid file %r from compressed xlog directory", full_path)
634-
continue
635-
with open(metadata_path, "r") as fp:
636-
metadata = json.load(fp)
637-
638-
file_type = FileType.Wal if is_xlog else FileType.Timeline
639-
640-
transfer_event = UploadEvent(
641-
file_type=file_type,
642-
backup_site_name=site,
643-
file_size=os.path.getsize(full_path),
644-
file_path=FileTypePrefixes[file_type] / filename,
645-
source_data=Path(full_path),
646-
callback_queue=None,
647-
metadata=metadata
648-
)
649-
self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event)
650-
self.transfer_queue.put(transfer_event)
665+
for compressed_file_dir in [compressed_xlog_path, compressed_timeline_path]:
666+
for filename in os.listdir(compressed_file_dir):
667+
if filename.endswith(".metadata"):
668+
continue # silently ignore .metadata files, they're expected and processed below
669+
670+
full_path = os.path.join(compressed_file_dir, filename)
671+
metadata_path = full_path + ".metadata"
672+
is_xlog = wal.WAL_RE.match(filename)
673+
is_timeline = wal.TIMELINE_RE.match(filename)
674+
675+
if not ((is_xlog or is_timeline) and os.path.exists(metadata_path)):
676+
self.log.warning("Found invalid file %r from compressed xlog directory", full_path)
677+
continue
678+
with open(metadata_path, "r") as fp:
679+
metadata = json.load(fp)
680+
681+
file_type = FileType.Wal if is_xlog else FileType.Timeline
682+
683+
transfer_event = UploadEvent(
684+
file_type=file_type,
685+
backup_site_name=site,
686+
file_size=os.path.getsize(full_path),
687+
file_path=FileTypePrefixes[file_type] / filename,
688+
source_data=Path(full_path),
689+
callback_queue=None,
690+
metadata=metadata
691+
)
692+
self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event)
693+
self.transfer_queue.put(transfer_event)
651694

652695
def start_threads_on_startup(self):
653696
# Startup threads
@@ -668,7 +711,7 @@ def _cleanup_inactive_receivexlogs(self, site):
668711

669712
def handle_site(self, site, site_config):
670713
self.set_state_defaults(site)
671-
xlog_path, basebackup_path = self.create_backup_site_paths(site)
714+
backup_site_paths = self.create_backup_site_paths(site)
672715

673716
if not site_config["active"]:
674717
return # If a site has been marked inactive, don't bother checking anything
@@ -679,7 +722,7 @@ def handle_site(self, site, site_config):
679722

680723
if site not in self.receivexlogs and site not in self.walreceivers:
681724
if site_config["active_backup_mode"] == "pg_receivexlog":
682-
self.receivexlog_listener(site, chosen_backup_node, xlog_path + "_incoming")
725+
self.receivexlog_listener(site, chosen_backup_node, backup_site_paths.uncompressed_files_path)
683726
elif site_config["active_backup_mode"] == "walreceiver":
684727
state_file_path = self.config["json_state_file_path"]
685728
walreceiver_state = {}
@@ -745,7 +788,13 @@ def handle_site(self, site, site_config):
745788
return
746789

747790
self.basebackups_callbacks[site] = Queue()
748-
self.create_basebackup(site, chosen_backup_node, basebackup_path, self.basebackups_callbacks[site], metadata)
791+
self.create_basebackup(
792+
site=site,
793+
connection_info=chosen_backup_node,
794+
basebackup_path=backup_site_paths.basebackup_path,
795+
callback_queue=self.basebackups_callbacks[site],
796+
metadata=metadata,
797+
)
749798

750799
def get_new_backup_details(self, *, now=None, site, site_config):
751800
"""Returns metadata to associate with new backup that needs to be created or None in case no backup should

test/test_compressor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ def _test_compress_to_file(self, filetype, file_size, file_path):
176176
assert transfer_event.metadata.pop("hash-algorithm") == "sha1"
177177
assert getattr(transfer_event, key) == value
178178

179+
compressed_file_path = os.path.join(
180+
self.config["backup_location"], self.config["backup_sites"][self.test_site]["prefix"],
181+
"xlog" if filetype == "xlog" else "timeline", file_path.name
182+
)
183+
# make sure the file was compressed on the expected location
184+
assert os.path.exists(compressed_file_path)
185+
179186
def test_compress_to_memory(self):
180187
ifile = WALTester(self.incoming_path, "0000000100000000000000FF", "random")
181188
filetype = FileType.Wal

test/test_pghoard.py

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -696,26 +696,28 @@ def test_backup_state_file(self):
696696
assert empty_state == state
697697

698698
def test_startup_walk_for_missed_compressed_files(self):
699-
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
700-
with open(os.path.join(compressed_wal_path, "000000010000000000000004"), "wb") as fp:
699+
backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site)
700+
with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004"), "wb") as fp:
701701
fp.write(b"foo")
702-
with open(os.path.join(compressed_wal_path, "000000010000000000000004.metadata"), "wb") as fp:
702+
with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004.metadata"), "wb") as fp:
703703
fp.write(b"{}")
704-
with open(os.path.join(compressed_wal_path, "0000000F.history"), "wb") as fp:
704+
with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history"), "wb") as fp:
705705
fp.write(b"foo")
706-
with open(os.path.join(compressed_wal_path, "0000000F.history.metadata"), "wb") as fp:
706+
with open(os.path.join(backup_site_paths.compressed_timeline_path, "0000000F.history.metadata"), "wb") as fp:
707707
fp.write(b"{}")
708-
with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz"), "wb") as fp:
708+
with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz"), "wb") as fp:
709709
fp.write(b"foo")
710-
with open(os.path.join(compressed_wal_path, "000000010000000000000004xyz.metadata"), "wb") as fp:
710+
with open(os.path.join(backup_site_paths.compressed_xlog_path, "000000010000000000000004xyz.metadata"), "wb") as fp:
711711
fp.write(b"{}")
712712
self.pghoard.startup_walk_for_missed_files()
713713
assert self.pghoard.compression_queue.qsize() == 0
714714
assert self.pghoard.transfer_queue.qsize() == 2
715715

716716
def test_startup_walk_for_missed_uncompressed_files(self):
717-
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
718-
uncompressed_wal_path = compressed_wal_path + "_incoming"
717+
backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site)
718+
719+
uncompressed_wal_path = backup_site_paths.uncompressed_files_path
720+
719721
with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp:
720722
fp.write(b"foo")
721723
with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp:
@@ -730,9 +732,8 @@ def test_startup_walk_for_missed_uncompressed_files(self):
730732
"file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")]
731733
)
732734
def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str):
733-
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
734-
uncompressed_wal_path = compressed_wal_path + "_incoming"
735-
with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp:
735+
backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site)
736+
with open(os.path.join(backup_site_paths.uncompressed_files_path, file_name), "wb") as fp:
736737
fp.write(b"foo")
737738
self.pghoard.startup_walk_for_missed_files()
738739
assert self.pghoard.compression_queue.qsize() == 1
@@ -744,17 +745,80 @@ def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileTyp
744745
"file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")]
745746
)
746747
def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str):
747-
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
748-
with open(os.path.join(compressed_wal_path, file_name), "wb") as fp:
748+
backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site)
749+
750+
if file_type is FileType.Wal:
751+
compressed_file_path = backup_site_paths.compressed_xlog_path
752+
else:
753+
compressed_file_path = backup_site_paths.compressed_timeline_path
754+
755+
with open(os.path.join(compressed_file_path, file_name), "wb") as fp:
749756
fp.write(b"foo")
750-
with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp:
757+
with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp:
751758
fp.write(b"{}")
752759
self.pghoard.startup_walk_for_missed_files()
753760
assert self.pghoard.compression_queue.qsize() == 0
754761
assert self.pghoard.transfer_queue.qsize() == 1
755762
upload_event = self.pghoard.transfer_queue.get(timeout=1.0)
756763
assert upload_event.file_type == file_type
757764

765+
@pytest.mark.parametrize(
766+
"file_type, file_name, invalid_compressed_file_name", [
767+
(FileType.Wal, "000000010000000000000005", "000000010000000000000006"),
768+
(FileType.Timeline, "00000003.history", "00000004.history"),
769+
]
770+
)
771+
def test_startup_walk_skip_compression_if_already_compressed(
772+
self,
773+
file_type: FileType,
774+
file_name: str,
775+
invalid_compressed_file_name: str,
776+
) -> None:
777+
"""
778+
Tests the scenario where an uncompressed file was already compressed, but was not deleted.
779+
"""
780+
backup_site_paths = self.pghoard.create_backup_site_paths(self.test_site)
781+
uncompressed_wal_path = backup_site_paths.uncompressed_files_path
782+
compressed_file_path = (
783+
backup_site_paths.compressed_timeline_path
784+
if file_type is FileType.Timeline else backup_site_paths.compressed_xlog_path
785+
)
786+
787+
# generate uncompressed/compressed files
788+
with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp:
789+
fp.write(b"random")
790+
791+
with open(os.path.join(uncompressed_wal_path, invalid_compressed_file_name), "wb") as fp:
792+
fp.write(b"random")
793+
794+
# compressed
795+
with open(os.path.join(compressed_file_path, file_name), "wb") as fp:
796+
fp.write(b"random")
797+
798+
with open(os.path.join(compressed_file_path, f"{file_name}.metadata"), "wb") as fp:
799+
fp.write(b"{}")
800+
801+
# invalid compressed file should not have a metadata
802+
with open(os.path.join(compressed_file_path, invalid_compressed_file_name), "wb") as fp:
803+
fp.write(b"random")
804+
805+
self.pghoard.startup_walk_for_missed_files()
806+
807+
# only one file should be added for compression (invalid compressed one)
808+
assert self.pghoard.compression_queue.qsize() == 1
809+
compression_event = self.pghoard.compression_queue.get()
810+
assert compression_event.file_path.name == invalid_compressed_file_name
811+
812+
assert self.pghoard.transfer_queue.qsize() == 1
813+
upload_event = self.pghoard.transfer_queue.get()
814+
assert upload_event.file_path.name == file_name
815+
816+
if file_type is FileType.Wal:
817+
assert self.pghoard.wal_file_deletion_queue.qsize() == 1
818+
else:
819+
# uncompressed timeline files are not added to deletion queue, they are immediately unlinked
820+
assert self.pghoard.wal_file_deletion_queue.qsize() == 0
821+
758822

759823
class TestPGHoardWithPG:
760824
def test_auth_alert_files(self, db, pghoard):

0 commit comments

Comments
 (0)