Skip to content

Commit 2db19a6

Browse files
handle a graceful shutdown on request
[BF-1358]
1 parent 6dce5bf commit 2db19a6

File tree

3 files changed

+105
-3
lines changed

3 files changed

+105
-3
lines changed

pghoard/compressor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class BaseCompressorEvent:
4343
file_path: Path
4444
backup_site_name: str
4545
source_data: Union[BinaryIO, Path]
46-
callback_queue: CallbackQueue
46+
callback_queue: Optional[CallbackQueue]
4747
metadata: Dict[str, str]
4848

4949

pghoard/pghoard.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,8 +934,50 @@ def _get_all_threads(self):
934934
all_threads.extend(self.transfer_agents)
935935
return all_threads
936936

937+
def _wait_for_queue_to_be_emptied(
938+
self,
939+
queue: Queue,
940+
queue_name: str,
941+
timeout: Optional[int] = None,
942+
) -> None:
943+
start = time.monotonic()
944+
while True:
945+
if queue.empty():
946+
self.log.info("%r queue has been emptied.", queue_name)
947+
break
948+
949+
if timeout is not None and time.monotonic() - start > timeout:
950+
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
951+
break
952+
953+
time.sleep(0.1)
954+
937955
def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
938956
self.log.warning("Quitting, signal: %r", _signal)
957+
if _signal == signal.SIGTERM:
958+
self.graceful_shutdown()
959+
else:
960+
self.quit()
961+
962+
def graceful_shutdown(self) -> None:
963+
"""
964+
Makes sure all missing files are compressed, uploaded and deleted before all threads are inactive.
965+
966+
Steps to follow:
967+
- Shutdown receivexlogs and walreceivers threads
968+
- Wait for compression, transfer and deletion queues to be empty
969+
- Quit (stop remaining threads and write state file)
970+
"""
971+
self.log.info("Gracefully shutting down...")
972+
self.running = False
973+
for thread in [*self.receivexlogs.values(), *self.walreceivers.values()]:
974+
thread.running = False
975+
976+
# wait for all queues to be emptied
977+
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
978+
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
979+
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")
980+
939981
self.quit()
940982

941983
def quit(self):

test/test_pghoard.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@
1111
import time
1212
from pathlib import Path
1313
from typing import Any, Dict
14-
from unittest.mock import Mock, patch
14+
from unittest.mock import MagicMock, Mock, patch
1515

1616
import pytest
1717

1818
import pghoard.pghoard as pghoard_module
19-
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
19+
from pghoard.common import (
20+
BaseBackupFormat, FileType, FileTypePrefixes, create_alert_file, delete_alert_file, write_json_file
21+
)
22+
from pghoard.compressor import CompressionEvent
2023
from pghoard.pghoard import PGHoard
2124
from pghoard.pgutil import create_connection_string
25+
from pghoard.receivexlog import PGReceiveXLog
26+
from pghoard.transfer import TransferAgent
2227

2328
from .base import PGHoardTestCase
2429
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
@@ -755,6 +760,61 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType,
755760
upload_event = self.pghoard.transfer_queue.get(timeout=1.0)
756761
assert upload_event.file_type == file_type
757762

763+
@patch("pghoard.compressor.wal.verify_wal", Mock())
764+
@patch.object(PGReceiveXLog, "run", Mock())
765+
@patch.object(TransferAgent, "get_object_storage")
766+
def test_graceful_shutdown_with_partial_wal_file(
767+
self,
768+
mocked_get_object_storage: MagicMock,
769+
) -> None:
770+
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
771+
uncompressed_wal_path = compressed_wal_path + "_incoming"
772+
773+
self.config["backup_sites"][self.test_site]["active_backup_mode"] = "pg_receivexlog"
774+
775+
self.pghoard.receivexlog_listener(
776+
self.test_site, self.config["backup_sites"][self.test_site]["nodes"][0], uncompressed_wal_path
777+
)
778+
779+
assert len(self.pghoard.receivexlogs) == 1
780+
781+
file_name = "000000010000000000000008"
782+
uncompressed_file_path = os.path.join(uncompressed_wal_path, file_name)
783+
with open(uncompressed_file_path, "wb") as fp:
784+
fp.write(b"foo")
785+
786+
self.pghoard.compression_queue.put(
787+
CompressionEvent(
788+
file_type=FileType.Wal,
789+
file_path=FileTypePrefixes[FileType.Wal] / file_name,
790+
delete_file_after_compression=True,
791+
backup_site_name=self.test_site,
792+
source_data=Path(uncompressed_file_path),
793+
callback_queue=None,
794+
metadata={}
795+
)
796+
)
797+
798+
# run compressors, transfer_agents and wal_file_deleter
799+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
800+
thread.start()
801+
802+
self.pghoard.graceful_shutdown()
803+
804+
assert self.pghoard.compression_queue.qsize() == 0
805+
assert self.pghoard.transfer_queue.qsize() == 0
806+
assert self.pghoard.wal_file_deletion_queue.qsize() == 0
807+
808+
# called once for uploading renamed partial file
809+
assert mocked_get_object_storage.call_count == 1
810+
811+
# uncompressed file should still exist since WALDeletionThread always keeps last file
812+
assert os.path.exists(uncompressed_file_path)
813+
814+
# verify compressors, transfer_agents and wal_file_deleter are not running
815+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
816+
assert thread.is_alive() is False
817+
758818

759819
class TestPGHoardWithPG:
760820
def test_auth_alert_files(self, db, pghoard):

0 commit comments

Comments
 (0)