Skip to content

Commit b9e7bfb

Browse files
handle a graceful shutdown on request
[BF-1358]
1 parent 87a01e9 commit b9e7bfb

File tree

3 files changed

+97
-3
lines changed

3 files changed

+97
-3
lines changed

pghoard/compressor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class BaseCompressorEvent:
4343
file_path: Path
4444
backup_site_name: str
4545
source_data: Union[BinaryIO, Path]
46-
callback_queue: CallbackQueue
46+
callback_queue: Optional[CallbackQueue]
4747
metadata: Dict[str, str]
4848

4949

pghoard/pghoard.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,8 +983,50 @@ def _get_all_threads(self):
983983
all_threads.extend(self.transfer_agents)
984984
return all_threads
985985

986+
def _wait_for_queue_to_be_emptied(
987+
self,
988+
queue: Queue,
989+
queue_name: str,
990+
timeout: Optional[int] = None,
991+
) -> None:
992+
start = time.monotonic()
993+
while True:
994+
if queue.empty():
995+
self.log.info("%r queue has been emptied.", queue_name)
996+
break
997+
998+
if timeout is not None and time.monotonic() - start > timeout:
999+
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
1000+
break
1001+
1002+
time.sleep(0.1)
1003+
9861004
def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
9871005
self.log.warning("Quitting, signal: %r", _signal)
1006+
if _signal == signal.SIGTERM:
1007+
self.graceful_shutdown()
1008+
else:
1009+
self.quit()
1010+
1011+
def graceful_shutdown(self) -> None:
1012+
"""
1013+
Makes sure all missing files are compressed, uploaded and deleted before all threads are inactive.
1014+
1015+
Steps to follow:
1016+
- Shutdown receivexlogs and walreceivers threads
1017+
- Wait for compression, transfer and deletion queues to be empty
1018+
- Quit (stop remaining threads and write state file)
1019+
"""
1020+
self.log.info("Gracefully shutting down...")
1021+
self.running = False
1022+
for thread in [*self.receivexlogs.values(), *self.walreceivers.values()]:
1023+
thread.running = False
1024+
1025+
# wait for all queues to be emptied
1026+
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
1027+
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
1028+
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")
1029+
9881030
self.quit()
9891031

9901032
def quit(self):

test/test_pghoard.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@
1111
import time
1212
from pathlib import Path
1313
from typing import Any, Dict
14-
from unittest.mock import Mock, patch
14+
from unittest.mock import MagicMock, Mock, patch
1515

1616
import pytest
1717

1818
import pghoard.pghoard as pghoard_module
19-
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
19+
from pghoard.common import (
20+
BaseBackupFormat, FileType, FileTypePrefixes, create_alert_file, delete_alert_file, write_json_file
21+
)
22+
from pghoard.compressor import CompressionEvent
2023
from pghoard.pghoard import PGHoard
2124
from pghoard.pgutil import create_connection_string
25+
from pghoard.receivexlog import PGReceiveXLog
26+
from pghoard.transfer import TransferAgent
2227

2328
from .base import PGHoardTestCase
2429
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
@@ -819,6 +824,53 @@ def test_startup_walk_skip_compression_if_already_compressed(
819824
# uncompressed timeline files are not added to deletion queue, they are immediately unlinked
820825
assert self.pghoard.wal_file_deletion_queue.qsize() == 0
821826

827+
@patch("pghoard.compressor.wal.verify_wal", Mock())
828+
@patch.object(PGReceiveXLog, "run", Mock())
829+
@patch.object(TransferAgent, "get_object_storage")
830+
def test_graceful_shutdown(
831+
self,
832+
mocked_get_object_storage: MagicMock,
833+
) -> None:
834+
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
835+
uncompressed_wal_path = compressed_wal_path + "_incoming"
836+
837+
file_name = "000000010000000000000008"
838+
uncompressed_file_path = os.path.join(uncompressed_wal_path, file_name)
839+
with open(uncompressed_file_path, "wb") as fp:
840+
fp.write(b"foo")
841+
842+
self.pghoard.compression_queue.put(
843+
CompressionEvent(
844+
file_type=FileType.Wal,
845+
file_path=FileTypePrefixes[FileType.Wal] / file_name,
846+
delete_file_after_compression=True,
847+
backup_site_name=self.test_site,
848+
source_data=Path(uncompressed_file_path),
849+
callback_queue=None,
850+
metadata={}
851+
)
852+
)
853+
854+
# run compressors, transfer_agents and wal_file_deleter
855+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
856+
thread.start()
857+
858+
self.pghoard.graceful_shutdown()
859+
860+
assert self.pghoard.compression_queue.qsize() == 0
861+
assert self.pghoard.transfer_queue.qsize() == 0
862+
assert self.pghoard.wal_file_deletion_queue.qsize() == 0
863+
864+
# called once for uploading renamed partial file
865+
assert mocked_get_object_storage.call_count == 1
866+
867+
# uncompressed file should still exist since WALDeletionThread always keeps last file
868+
assert os.path.exists(uncompressed_file_path)
869+
870+
# verify compressors, transfer_agents and wal_file_deleter are not running
871+
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
872+
assert thread.is_alive() is False
873+
822874

823875
class TestPGHoardWithPG:
824876
def test_auth_alert_files(self, db, pghoard):

0 commit comments

Comments
 (0)