Skip to content

Commit 0956760

Browse files
committed
Drop APScheduler, convert to native asyncio
1 parent bde327c commit 0956760

9 files changed

Lines changed: 421 additions & 264 deletions

File tree

docs/architecture.md

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,38 @@
55
The sapporo-service is a FastAPI application that accepts WES API requests, prepares a run directory for each workflow execution, and delegates the actual workflow engine invocation to a shell script (`run.sh`). Each workflow engine runs inside its own Docker container, spawned as a sibling container via the host's Docker socket. See [Installation - Volume Mounts](installation.md#volume-mounts-docker-in-docker) for details on the DinD volume mount requirements.
66

77
```text
8-
+--------+ +---------+ +--------+ +---------------------+
9-
| Client | --> | FastAPI | --> | run.py | --> | run.sh (subprocess) |
10-
+--------+ +---------+ +--------+ +---------------------+
11-
|
12-
docker run
13-
|
14-
+------------+
15-
| Engine |
16-
| Container |
17-
+------------+
8+
+-----------+
9+
| Client |
10+
+-----------+
11+
|
12+
HTTP request
13+
|
14+
v
15+
+-----------+
16+
| FastAPI |
17+
+-----------+
18+
|
19+
Python call
20+
|
21+
v
22+
+-----------+
23+
| run.py |
24+
+-----------+
25+
|
26+
subprocess
27+
|
28+
v
29+
+-----------+
30+
| run.sh |
31+
+-----------+
32+
|
33+
docker run
34+
|
35+
v
36+
+-----------+
37+
| Engine |
38+
| Container |
39+
+-----------+
1840
```
1941

2042
The Python side (`run.py`) never calls a workflow engine directly. It prepares the run directory, writes all input files, then forks `run.sh` as a subprocess. All run data is persisted to the filesystem, with a SQLite index for fast listing.
@@ -114,41 +136,23 @@ runs/
114136
| `system_logs.json` | System-level logs |
115137
| `workflow_engine_params.txt` | Engine-specific parameters |
116138

117-
## Orphan Recovery
139+
## Reconciliation
118140

119-
When the sapporo process restarts (e.g., container recreation), any `run.sh` subprocesses from the previous instance are dead. Runs that were in a non-terminal state are now orphans — their `state.txt` still says `RUNNING` or `QUEUED`, but no process is driving them forward.
141+
Detects runs stuck in `RUNNING`/`QUEUED` after a process restart and marks them as `SYSTEM_ERROR`.
120142

121-
At startup, **before** the SQLite index is built, `recover_orphaned_runs()` scans all run directories and transitions orphaned runs to `SYSTEM_ERROR`.
143+
`reconcile_runs()` runs at startup (before `init_db()`) and periodically in the background (at the snapshot interval, default: 30 minutes). For each run in a non-terminal state, it reads `run.pid` and checks process liveness via `os.kill(pid, 0)`:
122144

123-
### Target States
145+
| PID file | Process alive | Action |
146+
|---|---|---|
147+
| Present | Yes | Skip (running normally) |
148+
| Present | No | Set `SYSTEM_ERROR` (reason: "process vanished") |
149+
| Absent | N/A | Set `SYSTEM_ERROR` (reason: "no pid file") |
124150

125-
Runs in the following non-terminal states are recovered:
126-
127-
- `INITIALIZING`
128-
- `QUEUED`
129-
- `RUNNING`
130-
- `PAUSED`
131-
- `PREEMPTED`
132-
- `CANCELING`
133-
- `DELETING`
134-
135-
Runs in terminal states (`COMPLETE`, `EXECUTOR_ERROR`, `SYSTEM_ERROR`, `CANCELED`, `DELETED`) and `UNKNOWN` are left unchanged.
136-
137-
### Recovery Actions
138-
139-
For each orphaned run, the recovery process:
140-
141-
1. Sets `state.txt` to `SYSTEM_ERROR`
142-
2. Writes the current timestamp to `end_time.txt`
143-
3. Appends a descriptive message to `system_logs.json`
144-
145-
### Ordering
146-
147-
`recover_orphaned_runs()` runs before `init_db()` in the application lifespan, so the SQLite index reflects the corrected states from its first build.
151+
Runs in terminal states (`COMPLETE`, `EXECUTOR_ERROR`, `SYSTEM_ERROR`, `CANCELED`, `DELETED`) and `UNKNOWN` are skipped. For each reconciled run, `state.txt` is set to `SYSTEM_ERROR`, the current timestamp is written to `end_time.txt`, and the reason is logged to `system_logs.json`.
148152

149153
## SQLite Index
150154

151-
The SQLite database (`sapporo.db`) is an **index**, not a data store. It is rebuilt at a configurable interval (default: 30 minutes) by scanning the run directories and can be deleted at any time without data loss. It exists solely to make `GET /runs` (list all runs) fast. Individual run queries (`GET /runs/{run_id}`) always read from the filesystem.
155+
The SQLite database (`sapporo.db`) is an **index**, not a data store. It is rebuilt at a configurable interval (default: 30 minutes) by a background asyncio task that scans the run directories, and can be deleted at any time without data loss. It exists solely to make `GET /runs` (list all runs) fast. Individual run queries (`GET /runs/{run_id}`) always read from the filesystem.
152156

153157
## RO-Crate
154158

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ classifiers = [
2626
"Topic :: Scientific/Engineering :: Bio-Informatics",
2727
]
2828
dependencies = [
29-
"apscheduler>=3,<4",
3029
"argon2-cffi>=21,<26",
3130
"cachetools>=5,<8",
3231
"fastapi>=0.100,<1",
@@ -53,6 +52,7 @@ tests = [
5352
"hypothesis",
5453
"mutmut",
5554
"mypy",
55+
"pytest-asyncio",
5656
"pytest-mock",
5757
"pytest-randomly",
5858
"pytest",
@@ -79,6 +79,7 @@ packages = ["sapporo"]
7979

8080
[tool.pytest.ini_options]
8181
addopts = "-v --tb=short --strict-markers"
82+
asyncio_mode = "strict"
8283
testpaths = ["tests/unit"]
8384
markers = [
8485
"slow: marks tests as slow (deselect with '-m \"not slow\"')",

sapporo/app.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import asyncio
12
import logging
23
import logging.config
34
from collections.abc import AsyncGenerator
45
from contextlib import asynccontextmanager
56

67
import uvicorn
7-
from apscheduler.schedulers.background import BackgroundScheduler
88
from fastapi import FastAPI, Request
99
from fastapi.exceptions import RequestValidationError
1010
from fastapi.middleware.cors import CORSMiddleware
@@ -18,7 +18,7 @@
1818
from sapporo.database import init_db
1919
from sapporo.factory import create_executable_wfs, create_service_info
2020
from sapporo.routers import router
21-
from sapporo.run import recover_orphaned_runs, remove_old_runs
21+
from sapporo.run import reconcile_runs, remove_old_runs
2222
from sapporo.schemas import ErrorResponse
2323
from sapporo.utils import mask_sensitive
2424

@@ -158,21 +158,34 @@ def init_app_state() -> None:
158158

159159
@asynccontextmanager
160160
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
161-
recover_orphaned_runs()
161+
reconcile_runs()
162162
init_db()
163163

164164
snapshot_interval = get_config().snapshot_interval
165-
scheduler = BackgroundScheduler()
166-
scheduler.add_job(init_db, "interval", minutes=snapshot_interval)
167-
scheduler.add_job(remove_old_runs, "interval", minutes=snapshot_interval)
168-
scheduler.start()
169-
LOGGER.info("DB snapshot scheduler started")
165+
tasks: set[asyncio.Task[None]] = set()
166+
167+
async def _reconciliation_loop() -> None:
168+
while True:
169+
await asyncio.sleep(snapshot_interval * 60)
170+
await asyncio.to_thread(reconcile_runs)
171+
172+
async def _db_sync_loop() -> None:
173+
while True:
174+
await asyncio.sleep(snapshot_interval * 60)
175+
await asyncio.to_thread(init_db)
176+
await asyncio.to_thread(remove_old_runs)
177+
178+
tasks.add(asyncio.create_task(_reconciliation_loop()))
179+
tasks.add(asyncio.create_task(_db_sync_loop()))
180+
LOGGER.info("Background loops started (interval=%d min)", snapshot_interval)
170181

171182
try:
172183
yield
173184
finally:
174-
scheduler.shutdown()
175-
LOGGER.info("DB snapshot scheduler stopped")
185+
for task in tasks:
186+
task.cancel()
187+
await asyncio.gather(*tasks, return_exceptions=True)
188+
LOGGER.info("Background loops stopped")
176189

177190

178191
def create_app() -> FastAPI:

sapporo/auth.py

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55
import os
66
import re
7-
import time
87
from functools import cache
98
from threading import RLock
109
from typing import Any, Literal
@@ -182,13 +181,24 @@ async def create_access_token(username: str, password: str) -> str:
182181
return await external_create_access_token(username, password)
183182

184183

185-
def decode_token(token: str) -> TokenPayload:
184+
async def decode_token(token: str) -> TokenPayload:
186185
auth_config = get_auth_config()
187186
if auth_config.idp_provider == "sapporo":
188187
payload = spr_decode_token(token)
189188
check_valid_username(payload.sub)
189+
190190
return payload
191-
return external_decode_token(token)
191+
192+
return await external_decode_token(token)
193+
194+
195+
async def resolve_username(token: str | None) -> str | None:
196+
"""Extract username from a token, or return None if no token."""
197+
if token is None:
198+
return None
199+
payload = await decode_token(token)
200+
201+
return extract_username(payload)
192202

193203

194204
def sanitize_username(username: str) -> str:
@@ -365,38 +375,39 @@ def _validate_https_url(url: str, context: str) -> None:
365375
)
366376

367377

368-
def _fetch_once(url: str) -> httpx.Response:
378+
async def _fetch_once(url: str) -> httpx.Response:
369379
"""Execute a single HTTP GET request with timeout."""
370-
with httpx.Client(timeout=HTTPX_TIMEOUT) as client:
371-
res = client.get(url, follow_redirects=True, headers={"User-Agent": user_agent()})
380+
async with httpx.AsyncClient(timeout=HTTPX_TIMEOUT) as client:
381+
res = await client.get(url, follow_redirects=True, headers={"User-Agent": user_agent()})
372382
res.raise_for_status()
383+
373384
return res
374385

375386

376-
def _fetch_with_retry(url: str, context: str) -> httpx.Response:
387+
async def _fetch_with_retry(url: str, context: str) -> httpx.Response:
377388
"""Fetch a URL with retry and exponential backoff."""
378389
last_exc: Exception | None = None
379390
for attempt in range(_MAX_RETRIES):
380391
try:
381-
return _fetch_once(url)
392+
return await _fetch_once(url)
382393
except httpx.HTTPError as exc: # noqa: PERF203
383394
last_exc = exc
384395
LOGGER.warning("Attempt %d/%d to fetch %s failed: %s", attempt + 1, _MAX_RETRIES, context, exc)
385396
if attempt < _MAX_RETRIES - 1:
386-
time.sleep(_RETRY_BASE_DELAY * (2**attempt))
397+
await asyncio.sleep(_RETRY_BASE_DELAY * (2**attempt))
387398
msg = f"Failed to fetch {context} after {_MAX_RETRIES} retries: {last_exc}"
388399
raise HTTPException(status_code=500, detail=msg)
389400

390401

391-
def _fetch_endpoint_metadata_sync() -> ExternalEndpointMetadata:
392-
"""Fetch endpoint metadata synchronously."""
402+
async def _fetch_endpoint_metadata_impl() -> ExternalEndpointMetadata:
403+
"""Fetch endpoint metadata from the IdP."""
393404
auth_config = get_auth_config()
394405
idp_url = auth_config.external_config.idp_url
395406
_validate_https_url(idp_url, "External IdP URL")
396407

397408
well_known_url = f"{idp_url}/.well-known/openid-configuration"
398409
try:
399-
res = _fetch_with_retry(well_known_url, "IdP metadata")
410+
res = await _fetch_with_retry(well_known_url, "IdP metadata")
400411
metadata = ExternalEndpointMetadata.model_validate(res.json())
401412

402413
_validate_https_url(metadata.token_endpoint, "Token endpoint")
@@ -407,21 +418,22 @@ def _fetch_endpoint_metadata_sync() -> ExternalEndpointMetadata:
407418
else:
408419
with _cache_lock:
409420
_metadata_cache["metadata"] = metadata
421+
410422
return metadata
411423

412424

413-
def fetch_endpoint_metadata() -> ExternalEndpointMetadata:
425+
async def fetch_endpoint_metadata() -> ExternalEndpointMetadata:
414426
with _cache_lock:
415427
cached: ExternalEndpointMetadata | None = _metadata_cache.get("metadata")
416428
if cached is not None:
417429
return cached
418430

419-
return _fetch_endpoint_metadata_sync()
431+
return await _fetch_endpoint_metadata_impl()
420432

421433

422434
async def external_create_access_token(username: str, password: str) -> str:
423435
auth_config = get_auth_config()
424-
metadata = await asyncio.to_thread(fetch_endpoint_metadata)
436+
metadata = await fetch_endpoint_metadata()
425437
token_url = metadata.token_endpoint
426438
data: dict[str, str | None] = {
427439
"grant_type": "password",
@@ -444,7 +456,7 @@ async def external_create_access_token(username: str, password: str) -> str:
444456
raise_invalid_credentials()
445457

446458

447-
def external_decode_token(token: str) -> TokenPayload:
459+
async def external_decode_token(token: str) -> TokenPayload:
448460
try:
449461
unverified_header = jwt.get_unverified_header(token)
450462
except jwt.exceptions.DecodeError:
@@ -458,13 +470,13 @@ def external_decode_token(token: str) -> TokenPayload:
458470
if alg not in ALLOWED_ALGORITHMS:
459471
raise_invalid_token()
460472

461-
metadata = fetch_endpoint_metadata()
462-
jwks = fetch_jwks()
473+
metadata = await fetch_endpoint_metadata()
474+
jwks = await fetch_jwks()
463475

464476
jwk_key = next((k.key for k in jwks.keys if k.key_id == kid), None)
465477
if jwk_key is None:
466478
# Key rotation: re-fetch JWKS and try again
467-
jwks = fetch_jwks(force_refresh=True)
479+
jwks = await fetch_jwks(force_refresh=True)
468480
jwk_key = next((k.key for k in jwks.keys if k.key_id == kid), None)
469481
if jwk_key is None:
470482
raise_invalid_token()
@@ -486,25 +498,27 @@ def external_decode_token(token: str) -> TokenPayload:
486498
raise_invalid_token()
487499

488500

489-
def _fetch_jwks_sync(force_refresh: bool = False) -> PyJWKSet:
490-
"""Fetch JWKS synchronously."""
491-
jwks_uri = fetch_endpoint_metadata().jwks_uri
501+
async def _fetch_jwks_impl(force_refresh: bool = False) -> PyJWKSet:
502+
"""Fetch JWKS from the IdP."""
503+
metadata = await fetch_endpoint_metadata()
504+
jwks_uri = metadata.jwks_uri
492505
try:
493-
res = _fetch_with_retry(jwks_uri, "JWKS")
506+
res = await _fetch_with_retry(jwks_uri, "JWKS")
494507
jwk_set = PyJWKSet.from_dict(res.json())
495508
except (ValueError, KeyError):
496509
raise_internal_error("Failed to fetch JWKS from the IdP")
497510
else:
498511
with _cache_lock:
499512
_jwks_cache["jwks"] = jwk_set
513+
500514
return jwk_set
501515

502516

503-
def fetch_jwks(force_refresh: bool = False) -> PyJWKSet:
517+
async def fetch_jwks(force_refresh: bool = False) -> PyJWKSet:
504518
if not force_refresh:
505519
with _cache_lock:
506520
cached: PyJWKSet | None = _jwks_cache.get("jwks")
507521
if cached is not None:
508522
return cached
509523

510-
return _fetch_jwks_sync(force_refresh)
524+
return await _fetch_jwks_impl(force_refresh)

0 commit comments

Comments
 (0)