Skip to content

Commit 6a78754

Browse files
handle a graceful shutdown on request
[BF-1358]
1 parent 6dce5bf commit 6a78754

File tree

3 files changed

+140
-2
lines changed

3 files changed

+140
-2
lines changed

pghoard/compressor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class BaseCompressorEvent:
4343
file_path: Path
4444
backup_site_name: str
4545
source_data: Union[BinaryIO, Path]
46-
callback_queue: CallbackQueue
46+
callback_queue: Optional[CallbackQueue]
4747
metadata: Dict[str, str]
4848

4949

pghoard/pghoard.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
)
4747
from pghoard.receivexlog import PGReceiveXLog
4848
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
49+
from pghoard.wal import WAL_SEG_SIZE
4950
from pghoard.walreceiver import WALReceiver
5051
from pghoard.webserver import WebServer
5152

@@ -934,8 +935,97 @@ def _get_all_threads(self):
934935
all_threads.extend(self.transfer_agents)
935936
return all_threads
936937

938+
def _rename_partial_files_if_completed(self, site: str, wal_location: str) -> None:
939+
"""
940+
Check for remaining partial WAL files generated by pg_receivewal, in case the partial file is completed,
941+
it will be renamed.
942+
"""
943+
# consider only the last partial file (pg_receivewal should only generate one)
944+
last_partial_file = None
945+
for partial_file in os.listdir(wal_location):
946+
if not partial_file.endswith(".partial"):
947+
continue
948+
949+
if not last_partial_file or partial_file > last_partial_file:
950+
last_partial_file = partial_file
951+
952+
if last_partial_file is None:
953+
return
954+
955+
# check if the partial file needs to be renamed
956+
partial_file_path = os.path.join(wal_location, last_partial_file)
957+
renamed_partial_file_path = partial_file_path.replace(".partial", "")
958+
file_stats = os.stat(os.path.join(wal_location, last_partial_file))
959+
960+
if file_stats.st_size != WAL_SEG_SIZE:
961+
# TODO: handle partial files that are incompleted
962+
return
963+
964+
# this will not trigger inotify
965+
os.rename(partial_file_path, renamed_partial_file_path)
966+
967+
compression_event = CompressionEvent(
968+
file_type=FileType.Wal,
969+
file_path=FileTypePrefixes[FileType.Wal] / last_partial_file.replace(".partial", ""),
970+
delete_file_after_compression=True,
971+
backup_site_name=site,
972+
source_data=Path(renamed_partial_file_path),
973+
callback_queue=None,
974+
metadata={}
975+
)
976+
self.compression_queue.put(compression_event)
977+
978+
def _wait_for_queue_to_be_emptied(
979+
self,
980+
queue: Queue,
981+
queue_name: str,
982+
timeout: Optional[int] = None,
983+
) -> None:
984+
start = time.monotonic()
985+
while True:
986+
if queue.empty():
987+
self.log.info("%r queue has been emptied.", queue_name)
988+
break
989+
990+
if timeout is not None and time.monotonic() - start > timeout:
991+
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
992+
break
993+
994+
time.sleep(0.1)
995+
937996
def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
938997
self.log.warning("Quitting, signal: %r", _signal)
998+
if _signal == signal.SIGTERM:
999+
self.graceful_shutdown()
1000+
else:
1001+
self.quit()
1002+
1003+
def graceful_shutdown(self) -> None:
1004+
"""
1005+
Makes sure all missing files are compressed, uploaded and deleted. Also handles completed partial files
1006+
that might have not been renamed after shutting down receivexlogs.
1007+
1008+
Steps to follow:
1009+
- Shutdown receivexlogs and walreceivers threads
1010+
- Check for partial segments and rename them (if completed)
1011+
- Wait for compression, transfer and deletion queues to be empty
1012+
- Quit (stop remaining threads and write state file)
1013+
"""
1014+
self.log.info("Gracefully shutting down...")
1015+
self.running = False
1016+
for site, thread in {**self.receivexlogs, **self.walreceivers}.items():
1017+
thread.running = False
1018+
1019+
if not isinstance(thread, PGReceiveXLog):
1020+
continue
1021+
1022+
self._rename_partial_files_if_completed(site=site, wal_location=thread.wal_location)
1023+
1024+
# wait for all queues to be emptied
1025+
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
1026+
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
1027+
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")
1028+
9391029
self.quit()
9401030

9411031
def quit(self):

test/test_pghoard.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
import time
1212
from pathlib import Path
1313
from typing import Any, Dict
14-
from unittest.mock import Mock, patch
14+
from unittest.mock import MagicMock, Mock, patch
1515

1616
import pytest
1717

1818
import pghoard.pghoard as pghoard_module
1919
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
2020
from pghoard.pghoard import PGHoard
2121
from pghoard.pgutil import create_connection_string
22+
from pghoard.receivexlog import PGReceiveXLog
23+
from pghoard.transfer import TransferAgent
24+
from pghoard.wal import WAL_SEG_SIZE
2225

2326
from .base import PGHoardTestCase
2427
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
@@ -755,6 +758,51 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType,
755758
upload_event = self.pghoard.transfer_queue.get(timeout=1.0)
756759
assert upload_event.file_type == file_type
757760

761+
@patch("pghoard.compressor.wal.verify_wal", Mock())
762+
@patch.object(PGReceiveXLog, "run", Mock())
763+
@patch.object(TransferAgent, "get_object_storage")
764+
@pytest.mark.parametrize("is_completed", (True, False))
765+
def test_graceful_shutdown_with_partial_wal_file(
766+
self,
767+
mocked_get_object_storage: MagicMock,
768+
is_completed: bool,
769+
) -> None:
770+
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
771+
uncompressed_wal_path = compressed_wal_path + "_incoming"
772+
773+
self.config["backup_sites"][self.test_site]["active_backup_mode"] = "pg_receivexlog"
774+
775+
self.pghoard.receivexlog_listener(
776+
self.test_site, self.config["backup_sites"][self.test_site]["nodes"][0], uncompressed_wal_path
777+
)
778+
779+
assert len(self.pghoard.receivexlogs) == 1
780+
781+
partial_file_name = "000000010000000000000008.partial"
782+
783+
# generate a .partial file
784+
with open(os.path.join(uncompressed_wal_path, partial_file_name), "wb") as fp:
785+
if is_completed:
786+
fp.seek(WAL_SEG_SIZE - 1)
787+
fp.write(b"\0")
788+
789+
# run compressors, transfer_agents and wal_file_deleter
790+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
791+
thread.start()
792+
793+
self.pghoard.graceful_shutdown()
794+
795+
assert self.pghoard.compression_queue.qsize() == 0
796+
assert self.pghoard.transfer_queue.qsize() == 0
797+
assert self.pghoard.wal_file_deletion_queue.qsize() == 0
798+
799+
# called once for uploading renamed partial file
800+
assert mocked_get_object_storage.call_count == (1 if is_completed else 0)
801+
802+
# verify compressors, transfer_agents and wal_file_deleter are not running
803+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
804+
assert thread.is_alive() is False
805+
758806

759807
class TestPGHoardWithPG:
760808
def test_auth_alert_files(self, db, pghoard):

0 commit comments

Comments
 (0)