Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sanic/worker/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import pickle # nosec B403
import signal

from collections.abc import Iterable, MutableMapping
Expand Down Expand Up @@ -442,7 +443,7 @@ def _sync_states(self):
for process in self.processes:
try:
state = self.worker_state[process.name].get("state")
except KeyError:
except (KeyError, pickle.UnpicklingError):
process.set_state(ProcessState.TERMINATED, True)
continue
# Skip state sync if process is restarting to avoid race condition
Expand Down
9 changes: 8 additions & 1 deletion sanic/worker/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import pickle # nosec B403

from collections.abc import MutableMapping
from datetime import datetime, timezone
Expand Down Expand Up @@ -99,6 +100,7 @@ def exit(self):
ConnectionRefusedError,
ConnectionResetError,
EOFError,
pickle.UnpicklingError,
):
logger.debug("Monitor process has already exited.")
except KeyError:
Expand All @@ -115,7 +117,12 @@ def terminate(self):
)
try:
self.set_state(ProcessState.TERMINATED, force=True)
except (BrokenPipeError, ConnectionResetError, EOFError):
except (
BrokenPipeError,
ConnectionResetError,
EOFError,
pickle.UnpicklingError,
):
pass
try:
os.kill(self.pid, SIGINT)
Expand Down
77 changes: 75 additions & 2 deletions tests/worker/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import pickle
from logging import ERROR, INFO
from signal import SIGINT
from unittest.mock import Mock, call, patch
from unittest.mock import Mock, MagicMock, call, patch

import pytest

from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
from sanic.worker.constants import RestartOrder
from sanic.worker.manager import WorkerManager
from sanic.worker.process import Worker
from sanic.worker.process import ProcessState, Worker


if not OS_IS_WINDOWS:
Expand Down Expand Up @@ -449,3 +450,75 @@ def test_remove_untracked_worker(manager: WorkerManager, caplog):
assert len(manager.transient) == 1
assert len(manager.durable) == 0
assert ("sanic.root", 20, message) in caplog.record_tuples


@patch("sanic.worker.process.os")
def test_terminate_handles_unpickling_error(os_mock: Mock):
"""terminate() catches UnpicklingError from corrupted multiprocessing proxy during shutdown."""
process = Mock()
process.pid = 1234
context = Mock()
context.Process.return_value = process
worker_state = {}
manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), worker_state)

wp = next(manager.processes)

# Replace worker_state with one that raises UnpicklingError on read
# (simulates corrupted multiprocessing proxy connection from signal re-entrancy)
corrupted_ws = MagicMock()
corrupted_ws.__getitem__.side_effect = pickle.UnpicklingError(
"invalid load key, '\\x07'."
)
wp.worker_state = corrupted_ws

# Should not raise - the error should be caught during shutdown
wp.terminate()
# os.kill should still be called to send SIGINT to the worker
os_mock.kill.assert_called_once_with(1234, SIGINT)


def test_exit_handles_unpickling_error():
"""exit() catches UnpicklingError when cleaning up worker state."""
process = Mock()
process.pid = 1234
process.is_alive.return_value = False
context = Mock()
context.Process.return_value = process
worker_state = {}
manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), worker_state)

wp = next(manager.processes)

# Replace worker_state with one that raises UnpicklingError on delete
corrupted_ws = MagicMock()
corrupted_ws.__delitem__.side_effect = pickle.UnpicklingError(
"invalid load key, '\\x07'."
)
wp.worker_state = corrupted_ws

# Should not raise
wp.exit()


def test_sync_states_handles_unpickling_error():
"""_sync_states() catches UnpicklingError from corrupted proxy during shutdown."""
process = Mock()
process.pid = 1234
context = Mock()
context.Process.return_value = process
worker_state = {}
manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), worker_state)

wp = next(manager.processes)
wp.set_state(ProcessState.STARTED, force=True)

# Replace worker_state with one that raises UnpicklingError on read
corrupted_ws = MagicMock()
corrupted_ws.__getitem__.side_effect = pickle.UnpicklingError(
"invalid load key, '\\x07'."
)
manager.worker_state = corrupted_ws

# Should not raise - treats corrupted state same as missing key
manager._sync_states()
Loading