Skip to content

Commit 9678c4b

Browse files
committed
fix(members): make backfill remote state visible and stop-safe
The remote state object was first written only at the --checkpoint-every-th success (default 25), so short or slow runs appeared to write nothing to S3 until exit — and the exit flush relied on a finally block that never runs under SIGTERM, meaning a container stop would silently discard all progress since the last checkpoint. Remote state now flushes immediately on the first record (the object appears within seconds and access problems surface at profile 1), announces every checkpoint on stdout with the exact URI, and SIGTERM is converted to a graceful exit (143) that persists progress before the process dies. SIGKILL remains the only loss window, bounded at checkpoint-every - 1 profiles.
1 parent 7e39c03 commit 9678c4b

3 files changed

Lines changed: 123 additions & 16 deletions

File tree

docs/newprofile/backfill_memberships_guide.md

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,24 @@ or `AWS_*` environment variables). The role needs `s3:GetObject` and
123123
`s3:PutObject` on the state key.
124124

125125
Object stores cannot append, so remote state rewrites the whole object
126-
every `--checkpoint-every` N processed profiles (default 25) and once
127-
more on completion. Consequences:
126+
at checkpoints: immediately after the **first** processed profile (so
127+
the object appears within seconds of starting and S3 access problems
128+
surface at profile 1), every `--checkpoint-every` N processed profiles
129+
thereafter (default 25), and once more on exit. Each checkpoint prints
130+
a confirmation:
128131

129-
- A hard kill (OOM, SIGKILL) loses at most N−1 profiles of progress;
130-
those profiles are simply re-processed on resume. The operation is
132+
```
133+
checkpoint: 25 username(s) -> s3://kc-profiles-ops/backfill/state.txt
134+
```
135+
136+
Consequences:
137+
138+
- **SIGTERM (container stop, `docker stop`, ECS/Kubernetes shutdown)
139+
is handled gracefully**: progress is flushed before the process
140+
exits (status 143). Only SIGKILL — e.g. the OOM killer, or the
141+
follow-up kill after a stop grace period expires — can lose
142+
progress, and then at most N−1 profiles since the last checkpoint;
143+
those are simply re-processed on resume. The operation is
131144
idempotent, so this is safe.
132145
- Lower N for more durability at the cost of one S3 PUT per N
133146
profiles. `--checkpoint-every 1` writes after every profile, which
@@ -159,7 +172,10 @@ abort the run.
159172
- **Run resumed but reprocessed some profiles.** Expected with remote
160173
state: up to `--checkpoint-every − 1` profiles since the last
161174
checkpoint are retried.
162-
- **State object never appears in S3.** Check for an exception in the
163-
command output: state is first written after the
164-
`--checkpoint-every`-th profile, so very short runs with large N
165-
only write on completion. Verify the role has `s3:PutObject`.
175+
- **State object never appears in S3.** The first write happens
176+
immediately after the first successfully processed profile, and each
177+
checkpoint prints a `checkpoint: … -> s3://…` line. If no checkpoint
178+
line appears, no profile has completed yet (look for `Failed:` lines
179+
— errored profiles are never recorded); if the line appears but the
180+
object is missing, you are looking at the wrong key — the line
181+
prints the exact URI being written.

knowledge_commons_profiles/newprofile/management/commands/backfill_memberships.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import json
2222
import logging
23+
import signal
2324
import time
2425
from pathlib import Path
2526

@@ -66,16 +67,20 @@ class _RemoteStateStore:
6667
Resume state in an object store (e.g. s3://) via smart_open.
6768
6869
Object stores cannot append, so the whole state object is rewritten
69-
every ``checkpoint_every`` records and once more on close. A hard
70-
kill therefore loses at most ``checkpoint_every - 1`` records of
71-
progress; those profiles are simply re-processed on resume.
70+
on the first record (so the object appears immediately and access
71+
problems surface at profile 1), every ``checkpoint_every`` records
72+
thereafter, and once more on close. A hard kill therefore loses at
73+
most ``checkpoint_every - 1`` records of progress; those profiles
74+
are simply re-processed on resume.
7275
"""
7376

74-
def __init__(self, uri: str, checkpoint_every: int):
77+
def __init__(self, uri: str, checkpoint_every: int, announce=None):
7578
self._uri = uri
7679
self._every = max(1, checkpoint_every)
80+
self._announce = announce
7781
self._done: set[str] = set()
7882
self._unflushed = 0
83+
self._primed = False
7984

8085
def load(self) -> set[str]:
8186
try:
@@ -91,14 +96,19 @@ def load(self) -> set[str]:
9196
def record(self, username: str) -> None:
9297
self._done.add(username)
9398
self._unflushed += 1
94-
if self._unflushed >= self._every:
99+
if not self._primed or self._unflushed >= self._every:
95100
self._flush()
96101

97102
def _flush(self) -> None:
98103
content = "".join(f"{name}\n" for name in sorted(self._done))
99104
with smart_open.open(self._uri, "w") as handle:
100105
handle.write(content)
106+
self._primed = True
101107
self._unflushed = 0
108+
if self._announce:
109+
self._announce(
110+
f"checkpoint: {len(self._done)} username(s) -> {self._uri}"
111+
)
102112

103113
def close(self) -> None:
104114
if self._unflushed:
@@ -213,18 +223,27 @@ def _build_queryset(options):
213223

214224
return qs
215225

216-
@staticmethod
217-
def _make_store(options):
226+
def _make_store(self, options):
218227
"""
219228
Build the resume-state store for --state-file, if requested.
220229
"""
221230
target = options["state_file"]
222231
if not target:
223232
return None
224233
if "://" in target:
225-
return _RemoteStateStore(target, options["checkpoint_every"])
234+
return _RemoteStateStore(
235+
target,
236+
options["checkpoint_every"],
237+
announce=self.stdout.write,
238+
)
226239
return _LocalStateStore(target)
227240

241+
@staticmethod
242+
def _exit_on_sigterm(signum, frame):
243+
# container stops deliver SIGTERM; raise so the finally block
244+
# persists resume state before the process dies
245+
raise SystemExit(143)
246+
228247
def _backfill_profile(self, profile, options) -> bool:
229248
"""
230249
Backfill one profile; return True if its memberships changed.
@@ -249,6 +268,10 @@ def handle(self, *args, **options):
249268
state_store = self._make_store(options)
250269
done = state_store.load() if state_store else set()
251270

271+
previous_sigterm = signal.signal(
272+
signal.SIGTERM, self._exit_on_sigterm
273+
)
274+
252275
try:
253276
for profile in self._build_queryset(options).iterator():
254277
if profile.username in done:
@@ -291,6 +314,7 @@ def handle(self, *args, **options):
291314
if options["sleep"]:
292315
time.sleep(options["sleep"])
293316
finally:
317+
signal.signal(signal.SIGTERM, previous_sigterm)
294318
if state_store:
295319
state_store.close()
296320

knowledge_commons_profiles/newprofile/tests/test_backfill_memberships.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
"""
1111

1212
import json
13+
import os
14+
import signal
1315
import tempfile
16+
import time
1417
from io import StringIO
1518
from pathlib import Path
1619
from unittest.mock import patch
@@ -362,6 +365,70 @@ def flaky(profile):
362365
remote.objects[self.URI].split(), ["clyde"]
363366
)
364367

368+
def test_remote_state_first_record_flushes_immediately(self):
369+
# even with a large checkpoint interval the object must appear
370+
# as soon as the first profile is done, so operators can see
371+
# the run is working and IAM problems surface at profile 1
372+
self._profile_with_role("bonnie")
373+
self._profile_with_role("clyde")
374+
remote = self._fake_remote()
375+
376+
self._call("--state-file", self.URI, "--checkpoint-every", "100")
377+
378+
self.assertEqual(remote.history[0].split(), ["bonnie"])
379+
380+
def test_remote_state_checkpoints_are_announced(self):
381+
self._profile_with_role("bonnie")
382+
remote = self._fake_remote()
383+
384+
out, _ = self._call("--state-file", self.URI)
385+
386+
self.assertIn(self.URI, out)
387+
self.assertTrue(remote.history)
388+
389+
def test_sigterm_flushes_remote_state_and_exits(self):
390+
# container stops deliver SIGTERM; the run must persist its
391+
# progress before exiting instead of dying silently
392+
self._profile_with_role("bonnie")
393+
self._profile_with_role("clyde")
394+
remote = self._fake_remote()
395+
396+
def sentinel(signum, frame):
397+
msg = "SIGTERM was not handled gracefully"
398+
raise AssertionError(msg)
399+
400+
previous = signal.signal(signal.SIGTERM, sentinel)
401+
self.addCleanup(signal.signal, signal.SIGTERM, previous)
402+
403+
real_refresh = (
404+
"knowledge_commons_profiles.rest_api.sync."
405+
"ExternalSync.refresh_local_memberships"
406+
)
407+
original = ExternalSync.refresh_local_memberships
408+
409+
def stopped_mid_run(profile):
410+
if profile.username == "clyde":
411+
os.kill(os.getpid(), signal.SIGTERM)
412+
time.sleep(1) # the handler interrupts this sleep
413+
return original(profile)
414+
415+
with (
416+
patch(real_refresh, side_effect=stopped_mid_run),
417+
self.assertRaises(SystemExit),
418+
):
419+
call_command(
420+
"backfill_memberships",
421+
"--state-file",
422+
self.URI,
423+
"--checkpoint-every",
424+
"100",
425+
"--no-notify",
426+
stdout=StringIO(),
427+
)
428+
429+
# bonnie's progress survived the stop; clyde retries on resume
430+
self.assertEqual(remote.objects[self.URI].split(), ["bonnie"])
431+
365432
def test_remote_state_dry_run_writes_nothing(self):
366433
self._profile_with_role("bonnie")
367434
remote = self._fake_remote()

0 commit comments

Comments
 (0)