Skip to content

Commit c3c6a3c

Browse files
committed
Update build worker to track QueueJob status
The worker now looks up the QueueJob by arq job ID and transitions it through in_progress, completed, or failed states alongside the build.
1 parent 9a0656b commit c3c6a3c

File tree

1 file changed

+31
-2
lines changed

1 file changed

+31
-2
lines changed

src/docverse/worker/functions/build_processing.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from docverse.storage.build_store import BuildStore
2525
from docverse.storage.objectstore import ObjectStore
2626
from docverse.storage.organization_store import OrganizationStore
27+
from docverse.storage.queue_job_store import QueueJobStore
2728

2829
#: Maximum number of concurrent upload tasks.
2930
_UPLOAD_CONCURRENCY = 50
@@ -68,8 +69,9 @@ async def build_processing(
6869
)
6970
build_store = BuildStore(session=session, logger=logger)
7071
org_store = OrganizationStore(session=session, logger=logger)
72+
queue_job_store = QueueJobStore(session=session, logger=logger)
7173

72-
# Phase 1: Read-only transaction to load metadata
74+
# Phase 1: Load metadata and mark QueueJob as in_progress
7375
async with session.begin():
7476
build = await build_store.get_by_id(build_id)
7577
if build is None:
@@ -90,6 +92,8 @@ async def build_processing(
9092
org_id=org_id, service_label=service_label
9193
)
9294

95+
queue_job_id = await _start_queue_job(ctx, queue_job_store)
96+
9397
# Phase 2: Upload files and mark build complete
9498
try:
9599
async with object_store, session.begin():
@@ -100,20 +104,45 @@ async def build_processing(
100104
logger=logger,
101105
)
102106
except Exception:
103-
# Phase 3: Mark build as failed in a separate transaction
107+
# Phase 3a: Mark build and queue job as failed
104108
logger.exception("Build processing failed")
105109
async with session.begin():
106110
build_service = factory.create_build_service()
107111
await build_service.fail(build_id=build_id)
112+
if queue_job_id is not None:
113+
await queue_job_store.fail(queue_job_id)
108114
return "failed"
109115
else:
116+
# Phase 3b: Mark queue job as complete
117+
if queue_job_id is not None:
118+
async with session.begin():
119+
await queue_job_store.complete(queue_job_id)
110120
logger.info("Build processing completed")
111121
return "completed"
112122

113123
msg = "No database session available"
114124
raise RuntimeError(msg)
115125

116126

127+
async def _start_queue_job(
128+
ctx: dict[str, Any],
129+
queue_job_store: QueueJobStore,
130+
) -> int | None:
131+
"""Look up and start the QueueJob for this arq job.
132+
133+
Returns the queue job's internal ID, or ``None`` if no matching
134+
QueueJob exists.
135+
"""
136+
arq_job_id: str | None = ctx.get("job_id")
137+
if arq_job_id is None:
138+
return None
139+
queue_job = await queue_job_store.get_by_backend_job_id(arq_job_id)
140+
if queue_job is None:
141+
return None
142+
await queue_job_store.start(queue_job.id)
143+
return queue_job.id
144+
145+
117146
async def _process_build(
118147
*,
119148
object_store: ObjectStore,

0 commit comments

Comments
 (0)