Skip to content

Commit 439742a

Browse files
authored
feat(render): park in-flight render jobs on Cloud Run shutdown (hardening 1/3) (#754)
## Summary Addresses **hardening item 1 of 3** from `docs/archive/2026-05-06-capacity-resilience-session.md`. When Cloud Run autoscales an instance down (or rolls a deployment) mid-render, the BackgroundTask was killed without writing failure state. The job sat at \`rendering_video\` indefinitely until an operator manually retried. ## Changes - **`render_video_worker.py`**: registers/unregisters with \`worker_registry\` (try/finally on the outer scope so it fires even on unexpected exceptions). New \`park_active_render_jobs_for_shutdown()\` helper transitions any in-flight render jobs to \`RENDER_PENDING_CAPACITY\` (last_code: \`WORKER_SHUTDOWN\`) before exit. Race-safe: skips jobs that have moved past \`RENDERING_VIDEO\` between the registry snapshot and the per-job park. - **`main.py`**: lifespan shutdown waits 480s (down from 600s) then runs the park pass — 120s headroom for Firestore writes before SIGKILL. Cloud Run gen2 default termination grace is 600s. - **`docs/TROUBLESHOOTING.md`**: updated the \"Job stuck in \`rendering_video\`\" runbook to describe the new self-recovery behaviour. - **`docs/archive/2026-05-06-capacity-resilience-session.md`**: marked item 1 as shipped. - **`pyproject.toml`**: 0.174.3 → 0.174.4 The auto-retry scheduler (\`/api/internal/retry-pending-render-jobs\`, every 5 min via Cloud Scheduler) picks up parked jobs. ## Test plan - [x] 6 new tests in \`test_render_video_worker_shutdown.py\`: - register on success, unregister on exception - park transitions in-flight job to \`RENDER_PENDING_CAPACITY\` with \`WORKER_SHUTDOWN\` code - race-safe skip when job moved past \`RENDERING_VIDEO\` - no-op when no render workers active - continues past per-job failures - [x] Existing capacity-error tests (\`test_render_video_worker_capacity.py\`) still pass — register/unregister doesn't break the parking behaviour for actual capacity errors. - [ ] Prod verification: deploy → trigger a render → force a Cloud Run revision rollover mid-render → confirm job lands in \`render_pending_capacity\` and gets retried within 5 min. (Hard to artificially induce SIGTERM without a real load event; will monitor next time autoscaler scales down naturally.) @coderabbitai ignore
1 parent 1d7cefb commit 439742a

6 files changed

Lines changed: 342 additions & 13 deletions

File tree

backend/main.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,37 @@ async def lifespan(app: FastAPI):
103103
yield
104104

105105
# Shutdown - wait for any active workers to complete before terminating
106-
# This prevents Cloud Run from killing workers mid-processing
106+
# This prevents Cloud Run from killing workers mid-processing.
107+
#
108+
# Cloud Run gen2 default termination grace period is 600s. We wait up to
109+
# 480s, leaving ~120s for the cleanup pass below to write Firestore state
110+
# before SIGKILL. Without that headroom, render jobs that miss the wait
111+
# would sit in `rendering_video` forever waiting for an operator.
107112
logger.info("Shutdown requested, checking for active workers...")
108113
if worker_registry.has_active_workers():
109114
active = worker_registry.get_active_workers()
110115
logger.info(f"Active workers found: {active}")
111-
logger.info("Waiting for workers to complete (timeout: 600s)...")
112-
completed = await worker_registry.wait_for_completion(timeout=600) # 10 min max
116+
logger.info("Waiting for workers to complete (timeout: 480s)...")
117+
completed = await worker_registry.wait_for_completion(timeout=480)
113118
if not completed:
114119
logger.error(
115120
"Shutdown timeout - some workers may not have completed cleanly. "
116121
f"Remaining workers: {worker_registry.get_active_workers()}"
117122
)
123+
124+
# Park any still-active render jobs so the auto-retry scheduler can
125+
# recover them. Safe to call unconditionally — it's a no-op if nothing
126+
# render-related is in flight.
127+
try:
128+
from backend.workers.render_video_worker import park_active_render_jobs_for_shutdown
129+
parked = park_active_render_jobs_for_shutdown()
130+
if parked:
131+
logger.warning(
132+
f"Parked {parked} in-flight render job(s) for auto-retry "
133+
f"due to shutdown"
134+
)
135+
except Exception as exc:
136+
logger.error(f"Render-job shutdown park failed: {exc!r}")
118137
else:
119138
logger.info("No active workers, proceeding with shutdown")
120139

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
"""Tests for render-video worker shutdown handling.
2+
3+
When Cloud Run autoscales an instance down (or rolls a deployment), it sends
4+
SIGTERM and waits up to 600s before SIGKILL. Without the changes covered
5+
here, an in-flight render would be killed silently and the job would sit at
6+
`rendering_video` indefinitely until an operator manually retried.
7+
8+
The contract:
9+
- process_render_video must register with worker_registry on entry and
10+
unregister on exit (even on failure)
11+
- park_active_render_jobs_for_shutdown must transition any in-flight render
12+
job to RENDER_PENDING_CAPACITY so the auto-retry scheduler can recover it
13+
- It must NOT touch jobs that have moved past RENDERING_VIDEO between the
14+
registry snapshot and the park attempt (defends against the race where a
15+
worker completes during shutdown)
16+
"""
17+
18+
from unittest.mock import MagicMock, patch, AsyncMock
19+
20+
import pytest
21+
22+
from backend.models.job import JobStatus
23+
from backend.workers.registry import worker_registry
24+
25+
26+
def _build_minimal_job(status=JobStatus.RENDERING_VIDEO):
27+
job = MagicMock()
28+
job.artist = "Test Artist"
29+
job.title = "Test Title"
30+
job.input_media_gcs_path = "jobs/test/audio.flac"
31+
job.style_assets = {}
32+
job.style_params_gcs_path = None
33+
job.subtitle_offset_ms = 0
34+
job.prep_only = False
35+
job.state_data = {}
36+
job.file_urls = {}
37+
job.status = status
38+
return job
39+
40+
41+
@pytest.fixture(autouse=True)
42+
def reset_worker_registry():
43+
"""Each test starts with an empty registry — these tests touch shared state."""
44+
worker_registry._active_workers.clear()
45+
yield
46+
worker_registry._active_workers.clear()
47+
48+
49+
@pytest.mark.asyncio
50+
async def test_worker_registers_and_unregisters_on_success():
51+
"""The render worker must register with worker_registry and unregister
52+
on exit so the shutdown hook can wait for it.
53+
54+
Without registration, Cloud Run would kill the container the moment the
55+
HTTP request that started the BackgroundTask returns — long before the
56+
render finishes.
57+
"""
58+
from backend.workers import render_video_worker as rvw
59+
60+
job = _build_minimal_job()
61+
62+
mock_job_manager = MagicMock()
63+
mock_job_manager.get_job.return_value = job
64+
mock_job_manager.transition_to_state.return_value = True
65+
66+
mock_encoding_service = MagicMock()
67+
mock_encoding_service.is_enabled = True
68+
# Return a successful render result
69+
mock_encoding_service.render_video_on_gce = AsyncMock(return_value={
70+
"output_files": [],
71+
"metadata": {},
72+
})
73+
74+
mock_storage = MagicMock()
75+
mock_storage.file_exists.return_value = False
76+
77+
mock_worker_service = MagicMock()
78+
mock_worker_service.trigger_video_worker = AsyncMock(return_value=True)
79+
80+
with patch.object(rvw, "JobManager", return_value=mock_job_manager), \
81+
patch.object(rvw, "StorageService", return_value=mock_storage), \
82+
patch.object(rvw, "get_settings"), \
83+
patch.object(rvw, "create_job_logger", return_value=MagicMock()), \
84+
patch.object(rvw, "setup_job_logging", return_value=MagicMock()), \
85+
patch.object(rvw, "validate_worker_can_run", return_value=None), \
86+
patch.object(rvw, "get_encoding_service", return_value=mock_encoding_service), \
87+
patch("backend.services.worker_service.get_worker_service",
88+
return_value=mock_worker_service):
89+
90+
result = await rvw.process_render_video("test-job-id")
91+
92+
assert result is True
93+
# After completion, registry must be empty
94+
assert worker_registry.get_active_workers() == {}
95+
96+
97+
@pytest.mark.asyncio
98+
async def test_worker_unregisters_even_on_unexpected_exception():
99+
"""A finally block must unregister even when the work itself blows up.
100+
101+
If unregister is skipped on errors, the registry leaks and shutdown park
102+
would try (incorrectly) to park a job that has already failed.
103+
"""
104+
from backend.workers import render_video_worker as rvw
105+
106+
mock_job_manager = MagicMock()
107+
mock_job_manager.get_job.return_value = _build_minimal_job()
108+
109+
mock_encoding_service = MagicMock()
110+
mock_encoding_service.is_enabled = True
111+
mock_encoding_service.render_video_on_gce = AsyncMock(
112+
side_effect=RuntimeError("unexpected boom")
113+
)
114+
115+
with patch.object(rvw, "JobManager", return_value=mock_job_manager), \
116+
patch.object(rvw, "StorageService"), \
117+
patch.object(rvw, "get_settings"), \
118+
patch.object(rvw, "create_job_logger", return_value=MagicMock()), \
119+
patch.object(rvw, "setup_job_logging", return_value=MagicMock()), \
120+
patch.object(rvw, "validate_worker_can_run", return_value=None), \
121+
patch.object(rvw, "get_encoding_service", return_value=mock_encoding_service):
122+
123+
result = await rvw.process_render_video("test-job-id")
124+
125+
assert result is False
126+
assert worker_registry.get_active_workers() == {}
127+
128+
129+
def test_park_active_render_jobs_parks_in_flight_render():
130+
"""park_active_render_jobs_for_shutdown must transition active render
131+
jobs to RENDER_PENDING_CAPACITY with the WORKER_SHUTDOWN code marker."""
132+
from backend.workers import render_video_worker as rvw
133+
134+
# Manually populate the registry to simulate an in-flight render
135+
worker_registry._active_workers["job-A"] = {"render-video"}
136+
137+
mock_job_manager = MagicMock()
138+
mock_job_manager.get_job.return_value = _build_minimal_job(JobStatus.RENDERING_VIDEO)
139+
mock_job_manager.transition_to_state.return_value = True
140+
141+
with patch.object(rvw, "JobManager", return_value=mock_job_manager):
142+
parked = rvw.park_active_render_jobs_for_shutdown()
143+
144+
assert parked == 1
145+
146+
# Should have written render_pending_capacity metadata with WORKER_SHUTDOWN code
147+
state_calls = [
148+
c for c in mock_job_manager.update_state_data.call_args_list
149+
if c.args[1] == "render_pending_capacity"
150+
]
151+
assert state_calls
152+
pending_meta = state_calls[-1].args[2]
153+
assert pending_meta["last_code"] == rvw.RENDER_WORKER_SHUTDOWN_CODE
154+
155+
# Should transition to RENDER_PENDING_CAPACITY
156+
transitions = mock_job_manager.transition_to_state.call_args_list
157+
assert any(
158+
c.kwargs.get("new_status") == JobStatus.RENDER_PENDING_CAPACITY
159+
for c in transitions
160+
)
161+
162+
163+
def test_park_active_render_jobs_skips_completed_jobs():
164+
"""If the worker completed concurrently and the job moved past
165+
RENDERING_VIDEO, the park must NOT clobber its terminal state.
166+
167+
Race: the worker_registry snapshot is taken at the start of the park;
168+
between that and the per-job park call, the worker may finish and
169+
transition the job to INSTRUMENTAL_SELECTED. Parking it back to
170+
RENDER_PENDING_CAPACITY would re-trigger render and produce duplicate
171+
work.
172+
"""
173+
from backend.workers import render_video_worker as rvw
174+
175+
worker_registry._active_workers["job-already-done"] = {"render-video"}
176+
177+
mock_job_manager = MagicMock()
178+
mock_job_manager.get_job.return_value = _build_minimal_job(
179+
status=JobStatus.INSTRUMENTAL_SELECTED # past render
180+
)
181+
182+
with patch.object(rvw, "JobManager", return_value=mock_job_manager):
183+
parked = rvw.park_active_render_jobs_for_shutdown()
184+
185+
assert parked == 0
186+
mock_job_manager.update_state_data.assert_not_called()
187+
mock_job_manager.transition_to_state.assert_not_called()
188+
189+
190+
def test_park_active_render_jobs_returns_zero_when_no_active_renders():
191+
"""No-op when nothing render-related is in flight (e.g. only audio jobs)."""
192+
from backend.workers import render_video_worker as rvw
193+
194+
worker_registry._active_workers["job-X"] = {"audio", "lyrics"}
195+
196+
mock_job_manager = MagicMock()
197+
198+
with patch.object(rvw, "JobManager", return_value=mock_job_manager):
199+
parked = rvw.park_active_render_jobs_for_shutdown()
200+
201+
assert parked == 0
202+
# JobManager should not even be queried when there's nothing to do
203+
mock_job_manager.get_job.assert_not_called()
204+
205+
206+
def test_park_active_render_jobs_continues_after_per_job_failure():
207+
"""One failing park must not block parking the rest — we have ~120s
208+
before SIGKILL and we need to make a best effort."""
209+
from backend.workers import render_video_worker as rvw
210+
211+
worker_registry._active_workers["job-fails"] = {"render-video"}
212+
worker_registry._active_workers["job-ok"] = {"render-video"}
213+
214+
mock_job_manager = MagicMock()
215+
216+
def get_job_side_effect(job_id):
217+
if job_id == "job-fails":
218+
raise RuntimeError("Firestore unavailable")
219+
return _build_minimal_job(JobStatus.RENDERING_VIDEO)
220+
221+
mock_job_manager.get_job.side_effect = get_job_side_effect
222+
mock_job_manager.transition_to_state.return_value = True
223+
224+
with patch.object(rvw, "JobManager", return_value=mock_job_manager):
225+
parked = rvw.park_active_render_jobs_for_shutdown()
226+
227+
# The successful job should still have been parked
228+
assert parked == 1

backend/workers/render_video_worker.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from backend.services.storage_service import StorageService
4040
from backend.services.job_health_service import validate_worker_can_run
4141
from backend.config import get_settings
42+
from backend.workers.registry import worker_registry
4243
from backend.workers.worker_logging import create_job_logger, setup_job_logging, job_logging_context
4344
from backend.services.tracing import job_span, add_span_event, add_span_attribute
4445
from backend.services.encoding_service import get_encoding_service
@@ -47,6 +48,12 @@
4748
EncodingWorkerStartError,
4849
)
4950

51+
52+
# Sentinel exception code used when the render worker is parked because Cloud
53+
# Run is shutting the container down mid-render. Distinct from real GCE
54+
# capacity codes so log filters can tell them apart.
55+
RENDER_WORKER_SHUTDOWN_CODE = "WORKER_SHUTDOWN"
56+
5057
# Import from lyrics_transcriber (submodule)
5158
from karaoke_gen.lyrics_transcriber.output.generator import OutputGenerator
5259
from karaoke_gen.lyrics_transcriber.output.countdown_processor import CountdownProcessor
@@ -91,12 +98,17 @@ async def process_render_video(job_id: str) -> bool:
9198

9299
# Create job logger for remote debugging FIRST
93100
job_log = create_job_logger(job_id, "render_video")
94-
101+
95102
# Log with structured markers for easy Cloud Logging queries
96103
logger.info(f"[job:{job_id}] WORKER_START worker=render-video")
97104
job_log.info("=== RENDER VIDEO WORKER STARTED ===")
98105
job_log.info(f"Job ID: {job_id}")
99-
106+
107+
# Register with worker registry so the FastAPI shutdown hook can wait for
108+
# this worker AND, if Cloud Run forces termination, park the job in
109+
# RENDER_PENDING_CAPACITY for the auto-retry scheduler to recover.
110+
await worker_registry.register(job_id, "render-video")
111+
100112
# Set up log capture for OutputGenerator
101113
log_handler = setup_job_logging(job_id, "render_video", *RENDER_VIDEO_WORKER_LOGGERS)
102114
job_log.info(f"Log handler attached for {len(RENDER_VIDEO_WORKER_LOGGERS)} loggers")
@@ -568,6 +580,11 @@ def progress_callback(progress: int):
568580
)
569581
job_manager.fail_job(job_id, f"Video render failed: {error_text}")
570582
return False
583+
finally:
584+
# Always unregister — if we don't, the shutdown hook will think this
585+
# render is still in flight and try to park the (already-completed)
586+
# job, which would produce a no-op transition + warning log spam.
587+
await worker_registry.unregister(job_id, "render-video")
571588

572589

573590
def _park_job_for_capacity_retry(
@@ -629,5 +646,70 @@ def _extract_gcs_path(url: str) -> str:
629646
return url
630647

631648

649+
def park_active_render_jobs_for_shutdown() -> int:
650+
"""Park any in-flight render jobs in RENDER_PENDING_CAPACITY before exit.
651+
652+
Called by the FastAPI shutdown hook when Cloud Run is forcing the container
653+
down (autoscaling, deploy, preemption) and one or more render workers
654+
didn't finish in time. Without this, jobs sit at `rendering_video`
655+
indefinitely until an operator manually retries.
656+
657+
Parking transitions the job to RENDER_PENDING_CAPACITY, which the
658+
`/api/internal/retry-pending-render-jobs` Cloud Scheduler job picks up
659+
every 5 minutes.
660+
661+
Only jobs currently in the RENDERING_VIDEO state are parked — if the
662+
worker completed concurrently (or the job was already moved past this
663+
stage by some other code path) we leave it alone.
664+
665+
Returns the number of jobs parked.
666+
"""
667+
active = worker_registry.get_active_workers()
668+
render_job_ids = [
669+
job_id for job_id, worker_types in active.items()
670+
if "render-video" in worker_types
671+
]
672+
if not render_job_ids:
673+
return 0
674+
675+
job_manager = JobManager()
676+
parked = 0
677+
678+
for job_id in render_job_ids:
679+
try:
680+
job = job_manager.get_job(job_id)
681+
if not job:
682+
logger.warning(f"[job:{job_id}] Cannot park for shutdown: job not found")
683+
continue
684+
if job.status != JobStatus.RENDERING_VIDEO:
685+
# Worker completed or job moved past this stage between the
686+
# active-set snapshot and our park attempt — nothing to do.
687+
logger.info(
688+
f"[job:{job_id}] Skipping shutdown park: status is "
689+
f"{job.status} (not RENDERING_VIDEO)"
690+
)
691+
continue
692+
693+
shutdown_marker = EncodingWorkerStartError(
694+
"Render worker terminated by Cloud Run shutdown — auto-retry will resume",
695+
code=RENDER_WORKER_SHUTDOWN_CODE,
696+
vm_name="",
697+
zone="",
698+
)
699+
_park_job_for_capacity_retry(job_manager, job_id, shutdown_marker)
700+
parked += 1
701+
logger.warning(
702+
f"[job:{job_id}] WORKER_END worker=render-video status=shutdown_parked"
703+
)
704+
except Exception as exc: # pragma: no cover - defensive
705+
# Don't let one failed park block the others; we have a tight
706+
# deadline before SIGKILL.
707+
logger.error(
708+
f"[job:{job_id}] Failed to park for shutdown: {exc!r}"
709+
)
710+
711+
return parked
712+
713+
632714
# For compatibility with worker service
633715
render_video_worker = process_render_video

0 commit comments

Comments
 (0)