Skip to content

Commit 2122fd0

Browse files
committed
fix(backend): ensure processing job reconciliation against ScaleODM works well
also fixes tests. Assisted by: Opus 4.6 LLM
1 parent da83fef commit 2122fd0

4 files changed

Lines changed: 93 additions & 42 deletions

File tree

src/backend/app/projects/project_logic.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async def reconcile_project_level_odm(
131131
132132
Fetches the single project-level ScaleODM task status and reconciles:
133133
- Completed but webhook missed -> enqueue finalize job (reproject + state)
134-
- Failed/canceled -> mark project FAILED
134+
- Failed/canceled -> enqueue finalize job (captures error + cleanup)
135135
- Not found (age > threshold) -> mark project FAILED
136136
- Otherwise -> report current status
137137
@@ -250,19 +250,25 @@ async def reconcile_project_level_odm(
250250
}
251251

252252
if status_code in (30, 50):
253-
# Failed or canceled - mark project FAILED.
254-
try:
255-
await update_processing_status(
256-
db, project.id, ImageProcessingStatus.FAILED
257-
)
258-
log.warning(
259-
"Reconciled project-level ODM failure: project={} odm_uuid={} status={}",
260-
project.id,
261-
project_odm_uuid,
262-
status_label,
263-
)
264-
except Exception as e:
265-
log.error("Failed to reconcile project ODM failure: {}", e)
253+
# Failed or canceled - enqueue finalize to capture error message + cleanup.
254+
scaleodm_endpoint = (
255+
getattr(project, "odm_endpoint_used", None) or settings.ODM_ENDPOINT
256+
)
257+
await redis_pool.enqueue_job(
258+
"finalize_scaleodm_task",
259+
scaleodm_url=scaleodm_endpoint,
260+
dtm_project_id=str(project.id),
261+
odm_task_uuid=project_odm_uuid,
262+
odm_status_code=30, # normalize 50 (canceled) → 30 (failed)
263+
_job_id=f"odm-assets:project:{project.id}",
264+
_queue_name="default_queue",
265+
)
266+
log.warning(
267+
"Reconciling project-level ODM failure: project={} odm_uuid={} status={}",
268+
project.id,
269+
project_odm_uuid,
270+
status_label,
271+
)
266272

267273
display_label = "Failed (canceled)" if status_code == 50 else status_label
268274
task_entry = project_schemas.OdmQueueTask(

src/backend/app/projects/project_routes.py

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ async def odm_webhook_for_processing_whole_project(
558558
if not odm_task_id or not status:
559559
raise HTTPException(status_code=400, detail="Invalid webhook payload")
560560

561-
if status["code"] in {30, 40}:
561+
if status["code"] in {30, 40, 50}:
562562
# Use the endpoint persisted when processing started so finalize jobs
563563
# target the correct server even if config has changed since.
564564
scaleodm_url = await _get_project_odm_endpoint(db, dtm_project_id)
@@ -567,7 +567,7 @@ async def odm_webhook_for_processing_whole_project(
567567
scaleodm_url=scaleodm_url,
568568
dtm_project_id=str(dtm_project_id),
569569
odm_task_uuid=odm_task_id,
570-
odm_status_code=status["code"],
570+
odm_status_code=30 if status["code"] == 50 else status["code"],
571571
_job_id=f"odm-assets:project:{dtm_project_id}",
572572
_queue_name="default_queue",
573573
)
@@ -612,6 +612,24 @@ async def odm_webhook_for_processing_a_single_task(
612612
status["code"],
613613
)
614614

615+
# Reject stale webhooks from a previous ScaleODM submission so they
616+
# can't corrupt state after a user re-submits a failed task.
617+
async with db.cursor(row_factory=dict_row) as _cur:
618+
await _cur.execute(
619+
"SELECT odm_task_uuid FROM tasks WHERE id = %(tid)s AND project_id = %(pid)s",
620+
{"tid": str(dtm_task_id), "pid": str(dtm_project_id)},
621+
)
622+
_row = await _cur.fetchone()
623+
current_odm_uuid = _row["odm_task_uuid"] if _row else None
624+
if current_odm_uuid and odm_task_id != str(current_odm_uuid):
625+
log.warning(
626+
"ODM webhook for task {}: payload uuid {} does not match current odm_task_uuid {} - ignoring stale webhook",
627+
dtm_task_id,
628+
odm_task_id,
629+
current_odm_uuid,
630+
)
631+
return {"message": "Webhook ignored (stale)", "odm_task_id": odm_task_id}
632+
615633
if status["code"] == 40:
616634
await redis_pool.enqueue_job(
617635
"finalize_scaleodm_task",
@@ -626,7 +644,7 @@ async def odm_webhook_for_processing_a_single_task(
626644
_queue_name="default_queue",
627645
)
628646

629-
elif status["code"] == 30 and state_value != State.IMAGE_PROCESSING_FAILED:
647+
elif status["code"] in {30, 50} and state_value != State.IMAGE_PROCESSING_FAILED:
630648
# Finalize handler records the failure and clears the ScaleODM row.
631649
# We don't pre-flip state here so the finalize job can pull the real
632650
# errorMessage from ScaleODM and write a more descriptive comment.
@@ -1384,29 +1402,32 @@ async def fetch_task_info(
13841402
odm_uuid,
13851403
)
13861404
else:
1387-
# Failed or canceled - just flip the state
1388-
try:
1389-
await task_logic.update_task_state_system(
1390-
db,
1391-
project.id,
1392-
dtm_task_id,
1393-
f"Reconciled: ScaleODM reports {status_label}",
1394-
State.IMAGE_PROCESSING_STARTED,
1395-
State.IMAGE_PROCESSING_FAILED,
1396-
timestamp(),
1397-
)
1398-
reconciled.append(
1399-
f"Task {task_index} ({dtm_task_id}): STARTED -> FAILED ({status_label})"
1400-
)
1401-
log.warning(
1402-
"Reconciled stuck task: project={} dtm_task={} odm_uuid={} odm_status={}",
1403-
project.id,
1404-
dtm_task_id,
1405-
odm_uuid,
1406-
status_label,
1407-
)
1408-
except Exception as e:
1409-
log.error("Failed to reconcile task {}: {}", dtm_task_id, e)
1405+
# Failed or canceled - enqueue finalize to capture the real
1406+
# error message from ScaleODM and clean up the ScaleODM row.
1407+
# Direct update_task_state_system here would skip cleanup and
1408+
# cause the subsequent ScaleODM webhook to be silently ignored.
1409+
await redis_pool.enqueue_job(
1410+
"finalize_scaleodm_task",
1411+
scaleodm_url=odm_url,
1412+
dtm_project_id=str(project.id),
1413+
odm_task_uuid=odm_uuid,
1414+
state_name=State.IMAGE_PROCESSING_STARTED.name,
1415+
message=f"Reconciled: ScaleODM reports {status_label}",
1416+
dtm_task_id=str(dtm_task_id),
1417+
odm_status_code=30, # normalize 50 (canceled) → 30 (failed)
1418+
_job_id=f"odm-assets:task:{dtm_task_id}",
1419+
_queue_name="default_queue",
1420+
)
1421+
reconciled.append(
1422+
f"Task {task_index} ({dtm_task_id}): STARTED -> FAILED ({status_label}, enqueued finalize)"
1423+
)
1424+
log.warning(
1425+
"Reconciling failed/canceled task: project={} dtm_task={} odm_uuid={} odm_status={}",
1426+
project.id,
1427+
dtm_task_id,
1428+
odm_uuid,
1429+
status_label,
1430+
)
14101431
elif fetch_failed:
14111432
# Could not reach ScaleODM for this task (timeout, network error).
14121433
# Do NOT reconcile - the task may still be running. Show it as

src/backend/tests/test_project_processing.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,32 @@
1010
from app.arq import tasks as arq_tasks
1111

1212

13+
class _FakeCursor:
14+
"""Minimal async context-manager cursor; fetchone() returns None by default."""
15+
16+
def __init__(self, fetchone_result=None):
17+
self._fetchone_result = fetchone_result
18+
self.executed = []
19+
20+
async def execute(self, query, params=None):
21+
self.executed.append({"query": query, "params": params})
22+
23+
async def fetchone(self):
24+
return self._fetchone_result
25+
26+
async def __aenter__(self):
27+
return self
28+
29+
async def __aexit__(self, exc_type, exc, tb):
30+
return False
31+
32+
1333
class _FakeConn:
14-
def __init__(self):
34+
def __init__(self, cursor_fetchone=None):
1535
self.commit_calls = 0
1636
self.rollback_calls = 0
1737
self.executed = []
38+
self._cursor_fetchone = cursor_fetchone
1839

1940
async def execute(self, query, params=None):
2041
self.executed.append({"query": query, "params": params})
@@ -25,6 +46,9 @@ async def commit(self):
2546
async def rollback(self):
2647
self.rollback_calls += 1
2748

49+
def cursor(self, **kwargs):
50+
return _FakeCursor(self._cursor_fetchone)
51+
2852

2953
class _FakePoolConnection:
3054
def __init__(self, conn):

tasks/start

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ all:
3636
# ScaleODM runs inside a local Talos k8s cluster (itself Docker containers).
3737
# Workflow pods need to reach DroneTM's S3 (RustFS), which is published on
3838
# the Docker host at port 9000. The reachable address from inside those pods
39-
# is the Docker network's gateway IP not localhost.
39+
# is the Docker network's gateway IP - not localhost.
4040
#
4141
# We only auto-detect when SCALEODM_S3_ENDPOINT is unset/empty, so an
4242
# explicit value in .env always wins.

0 commit comments

Comments
 (0)