feat(api): Add import job REST API and service layer#34
feat(api): Add import job REST API and service layer#34
Conversation
Add REST API endpoints and business logic for GEDCOM import job management:
API Endpoints (6):
- POST /api/import-jobs - Create job with file upload
- GET /api/import-jobs - List user's jobs (paginated, filterable)
- GET /api/import-jobs/{job_id} - Get job detail with stages
- POST /api/import-jobs/{job_id}/pause - Pause running job
- POST /api/import-jobs/{job_id}/resume - Resume paused job
- DELETE /api/import-jobs/{job_id} - Cancel job and delete files
Service Layer:
- create_import_job() - Upload GEDCOM, transition UPLOADED → QUEUED
- get_job_with_stages() - Fetch job with pipeline stages
- list_user_jobs() - Paginated job list with filtering
- pause_job() / resume_job() - Job lifecycle management
- cancel_job() - Delete job and cleanup storage
- claim_next_job() - Worker job claiming (lease-based)
- complete_stage() / fail_job() - Stage execution updates
Database Changes:
- Add order field to ImportJobStage for pipeline ordering
- Change ImportJob.user_id from UUID to string (for Google JWT subject IDs)
Tests:
- 17 comprehensive endpoint tests (100% passing)
- Coverage: routes 72%, service layer 26% (worker functions untested)
File Storage:
- Pattern: /data/gedcom/{user_id}/{job_id}/original.ged
- user_id is string (Google subject), job_id is UUID
State Machine:
- Jobs: UPLOADED → QUEUED → IN_PROGRESS → PAUSED/COMPLETED/FAILED/CANCELLED
- Stages: PENDING → IN_PROGRESS → COMPLETED/FAILED/RETRYING
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (46.32%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage.
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 19 files with indirect coverage changes 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds an import-jobs feature to the FastAPI backend, introducing REST endpoints and a service layer to manage GEDCOM import job lifecycle (create/list/detail/pause/resume/delete) and worker-side job claiming helpers.
Changes:
- Introduces
/api/import-jobsREST routes with file upload handling and job lifecycle actions. - Adds
import_pipelineservice functions for job creation, listing, cancellation, and worker helpers (claim/heartbeat/stage transitions). - Extends DB models/settings to support stage ordering and configurable GEDCOM storage path, plus adds endpoint tests.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| apps/api/src/api/routes/import_jobs.py | New REST API router + response models for import job management |
| apps/api/src/api/services/import_pipeline.py | New service layer implementing job lifecycle and worker helper operations |
| apps/api/src/api/database.py | Schema updates: ImportJob.user_id string and ImportJobStage.order field |
| apps/api/src/api/settings.py | Adds gedcom_storage_path configuration |
| apps/api/src/api/app.py | Registers the new import-jobs router under /api |
| apps/api/src/api/services/init.py | Adds services package marker/docstring |
| apps/api/tests/test_import_jobs_routes.py | New endpoint tests for all import-job routes |
| apps/api/pyproject.toml | Adds python-multipart dependency for multipart file uploads |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Get total count | ||
| count_result = await db.execute(query) | ||
| total = len(list(count_result.scalars().all())) | ||
|
|
There was a problem hiding this comment.
list_user_jobs() computes total by executing the full query and doing len(all()), which loads every job row into memory and does extra work on every request. Use a proper SELECT COUNT(*) (e.g., via func.count() on a subquery) so pagination remains fast as the table grows.
There was a problem hiding this comment.
✅ Fixed - Changed to use SELECT COUNT(*) subquery:
# Now uses efficient count query
count_stmt = select(func.count()).select_from(stmt.subquery())
count_result = await db.execute(count_stmt)
total = count_result.scalar_one()This avoids loading all rows into memory and keeps pagination fast as the table grows.
| heartbeat_at=job.claimed_at.isoformat() if job.claimed_at else None, | ||
| stages=stage_responses, | ||
| created_at=job.created_at.isoformat(), | ||
| updated_at=job.created_at.isoformat(), # TODO: Add updated_at to model |
There was a problem hiding this comment.
ImportJobResponse.updated_at is populated with job.created_at even though the ImportJob model doesn't have an updated_at field. This makes the API response misleading for clients. Prefer setting updated_at=None until the DB model has a real updated_at column, or add/update the column and populate it correctly.
| updated_at=job.created_at.isoformat(), # TODO: Add updated_at to model | |
| updated_at=None, # TODO: Add updated_at to model |
There was a problem hiding this comment.
✅ Fixed - Changed to updated_at=None with TODO:
updated_at=None, # TODO: Add updated_at to modelThis prevents misleading clients until we add a real updated_at column to the database model.
| app.include_router(auth.router, prefix='/auth') | ||
| app.include_router(health.router, prefix='/health') | ||
| app.include_router(import_jobs.router, prefix='/api') | ||
| app.include_router(user.router, prefix='/user') |
There was a problem hiding this comment.
Including import_jobs introduces a DELETE /api/import-jobs/{job_id} endpoint, but the CORS middleware is configured with allow_methods=['GET','POST'] (see above in this file). Browser clients will fail preflight for DELETE/OPTIONS. Update CORS allow_methods to include DELETE (and OPTIONS) or use ['*'] if appropriate.
There was a problem hiding this comment.
✅ Fixed - Added DELETE and OPTIONS to CORS:
allow_methods=['GET', 'POST', 'DELETE', 'OPTIONS']Browser clients can now successfully complete preflight checks for DELETE operations.
| id: UUID = Field(default_factory=uuid4, primary_key=True) | ||
| user_id: UUID = Field(foreign_key='app_users.id', index=True) | ||
| # user_id stores Google subject ID directly (not FK to app_users) | ||
| # because auth uses JWT validation, not database user lookups | ||
| user_id: str = Field(index=True) | ||
| source_type: str # gedcom | wikitree-export | manual |
There was a problem hiding this comment.
Changing ImportJob.user_id to str is a schema-breaking change. Since the app uses SQLModel.metadata.create_all() (which won't alter existing tables), existing DBs won't be upgraded automatically and may fail at runtime. Consider adding migrations (e.g., Alembic) or documenting/handling required schema upgrades.
There was a problem hiding this comment.
✅ Documented - Added comprehensive migration documentation:
# SCHEMA CHANGE: user_id changed from UUID to str (Google subject IDs)
# Migration required for existing deployments:
# 1. For SQLite: Create new table with correct schema, copy data
# 2. For PostgreSQL: ALTER TABLE import_jobs ALTER COLUMN user_id TYPE TEXT;
# TODO: Add Alembic migration in future PRThe change is necessary to support Google JWT subject IDs like "google|108291827364728364" which are not UUIDs.
| import_job_id: UUID = Field(foreign_key='import_jobs.id', index=True) | ||
| stage_name: str # parse | normalize | search | match | review | ||
| stage_name: str # validate | parse | normalize | search | match | review | ||
| order: int # Pipeline execution order (0=validate, 1=parse, etc.) |
There was a problem hiding this comment.
Adding the non-null ImportJobStage.order column is a schema-breaking change. With SQLModel.metadata.create_all() (no migrations), existing DBs won't get this column automatically, and any queries ordering by ImportJobStage.order will fail. Consider adding a migration/backfill or making the column nullable with a backfill path.
| order: int # Pipeline execution order (0=validate, 1=parse, etc.) | |
| order: int | None = None # Nullable for compatibility with existing DBs until backfilled |
There was a problem hiding this comment.
✅ Fixed - Made field nullable for backward compatibility:
order: int | None = None # Nullable for compatibility with existing DBs until backfilledExisting databases without this column will continue to work. Added TODO for future backfill migration.
| job = ImportJob( | ||
| user_id=test_user.userid, | ||
| source_type='gedcom', | ||
| original_filename='test.ged', | ||
| stored_path=f'{test_user.userid}/job/original.ged', |
There was a problem hiding this comment.
test_delete_job_success doesn't create any ImportJobStage rows for the job. Since delete should also remove stages (and may fail under FK enforcement in Postgres), add at least one stage and assert the delete succeeds and stages are removed to catch regressions.
There was a problem hiding this comment.
✅ Fixed - Enhanced test to create and verify stage deletion:
# Create 2 stages to verify cascade delete
stage1 = ImportJobStage(...)
stage2 = ImportJobStage(...)
db.add_all([stage1, stage2])
await db.commit()
# After delete, verify stages are removed
result = await db.execute(select(ImportJobStage).where(...))
assert result.scalars().all() == []The test now creates 2 stages and asserts they are deleted along with the job.
| # Load stages | ||
| stages_result = await db.execute( | ||
| select(ImportJobStage) | ||
| .where(ImportJobStage.import_job_id == job.id) | ||
| .order_by(ImportJobStage.id) | ||
| ) | ||
| job.stages = list(stages_result.scalars().all()) # pyrefly: ignore[assignment] | ||
|
|
There was a problem hiding this comment.
Stages are loaded with .order_by(ImportJobStage.id) and then assigned via job.stages = .... This risks incorrect stage sequencing (should use ImportJobStage.order) and may raise at runtime because ImportJob doesn't define a stages field. Prefer returning stages separately or defining an ORM relationship.
| # Load stages | |
| stages_result = await db.execute( | |
| select(ImportJobStage) | |
| .where(ImportJobStage.import_job_id == job.id) | |
| .order_by(ImportJobStage.id) | |
| ) | |
| job.stages = list(stages_result.scalars().all()) # pyrefly: ignore[assignment] |
There was a problem hiding this comment.
✅ Fixed - Changed to order by order field and return tuple:
# Order by the order field, not id
.order_by(ImportJobStage.order) # pyrefly: ignore
# Return job and stages as tuple to avoid assignment error
return job, stagesStages are now correctly sequenced by the order field. The function returns a tuple to avoid Pydantic field assignment errors since ImportJob doesn't define a stages field.
| job_id: UUID, | ||
| stage_name: str, | ||
| records_processed: int, | ||
| checkpoint_data: dict | None = None, | ||
| ) -> None: |
There was a problem hiding this comment.
update_stage_progress() accepts records_processed but never uses it, and currently only updates checkpoint_json. Either remove the unused parameter or persist progress (e.g., increment a counter / set timestamps) so worker-side callers get the behavior implied by the function name and docstring.
There was a problem hiding this comment.
✅ Fixed - Removed unused parameter:
async def update_stage_progress(
db: AsyncSession,
job_id: UUID,
stage_name: str,
checkpoint_json: dict[str, Any] | None = None,
) -> None:Removed the records_processed parameter since it wasn't being persisted. The function name and behavior now match - it only updates the checkpoint JSON.
| offset: Pagination offset | ||
| status_filter: Optional status filter | ||
| (pending/running/paused/completed/failed) | ||
|
|
||
| Returns: |
There was a problem hiding this comment.
The docs for status_filter mention values like pending/running, but the implementation maps via ImportJobStatus[status_filter.upper()] (e.g., queued, in_progress, paused, ...). Either update the documented allowed values or add explicit mapping so pending/running work as advertised.
There was a problem hiding this comment.
✅ Fixed - Updated docstring to use actual enum values:
@param status_filter: Filter by status (queued, in_progress, paused, completed, failed, cancelled)The documentation now lists the actual ImportJobStatus enum values that the API accepts.
| limit: Maximum jobs to return (1-100) | ||
| offset: Pagination offset | ||
| status: Optional status filter (pending/running/paused/completed/failed) | ||
|
|
There was a problem hiding this comment.
The status query param docstring lists pending/running/..., but the backend currently expects enum-like values such as queued and in_progress. Align the documented allowed values with what the service actually accepts, or map pending->queued / running->in_progress so clients can use the documented filter values.
There was a problem hiding this comment.
✅ Fixed - Updated docstring to match actual enum values:
status: str | None = Query(
None,
description='Filter by status: queued, in_progress, paused, completed, failed, cancelled'
)The parameter description now lists the exact ImportJobStatus enum values that the service layer expects.
Performance improvements: - Use COUNT(*) for pagination instead of loading all rows into memory - Stream GEDCOM files to disk (8KB chunks) to avoid memory doubling - Compute SHA256 hash incrementally during upload Bug fixes: - Fix get_current_stage() to use .first() instead of scalar_one_or_none() - Fix cancel_job() to properly DELETE stages instead of SELECT - Order stages by order field instead of id API improvements: - Add DELETE and OPTIONS to CORS allowed methods - Set updated_at to None until model has the field - Change logging from INFO to DEBUG for _job_to_response() - Update status filter docs to match actual enum values Schema compatibility: - Make ImportJobStage.order nullable for backward compatibility - Add migration TODOs for schema-breaking changes - Document that SQLModel.metadata.create_all() doesn't auto-upgrade Testing: - Add stage creation/deletion verification to test_delete_job_success - Verify cascade delete works correctly Code cleanup: - Remove unused records_processed parameter from update_stage_progress()
PR Review Feedback AddressedAll 14 review comments have been fixed in commit Performance Improvements✅ COUNT(*) pagination - Changed ✅ Streaming file uploads - Changed Bug Fixes✅ get_current_stage() MultipleResultsFound fix - Changed from ✅ cancel_job() stage deletion - Fixed to use API Improvements✅ CORS methods - Added ✅ updated_at field - Set to ✅ Logging level - Changed ✅ Status filter documentation - Updated docstrings to use actual enum values: Schema Compatibility✅ Nullable order field - Made ✅ user_id schema change docs - Added comprehensive comment explaining that changing Testing✅ Stage deletion test - Enhanced Code Cleanup✅ Unused parameter removed - Removed All tests passing: ✅ 17/17 tests pass with these changes. All review feedback has been addressed. Ready for re-review! |
Overview
Implements REST API endpoints and business logic for GEDCOM import job management. This is the first deliverable of PR #6 as outlined in the import job implementation plan.
API Endpoints (6)
/api/import-jobs- Create job with file upload/api/import-jobs- List user's jobs (paginated, filterable)/api/import-jobs/{job_id}- Get job detail with stages/api/import-jobs/{job_id}/pause- Pause running job/api/import-jobs/{job_id}/resume- Resume paused job/api/import-jobs/{job_id}- Cancel job and delete filesService Layer
New
import_pipeline.pywith 11 functions:create_import_job()- Upload GEDCOM, transition UPLOADED → QUEUEDget_job_with_stages()- Fetch job with pipeline stageslist_user_jobs()- Paginated job list with filteringpause_job()/resume_job()- Job lifecycle managementcancel_job()- Delete job and cleanup storageclaim_next_job()- Worker job claiming (lease-based)complete_stage()/fail_job()- Stage execution updatesget_next_stage()/heartbeat_job()- Worker helpersDatabase Changes
order: intfield toImportJobStagefor pipeline orderingImportJob.user_idfrom UUID to string (for Google JWT subject IDs)WikiTreeConnectionpattern for authenticationFile Storage
/data/gedcom/{user_id}/{job_id}/original.geduser_idis string (Google subject),job_idis UUIDState Machine
Jobs: UPLOADED → QUEUED → IN_PROGRESS → PAUSED/COMPLETED/FAILED/CANCELLED
Stages: PENDING → IN_PROGRESS → COMPLETED/FAILED/RETRYING
Testing
✅ 17/17 tests passing (100% success rate)
Note: Lower service coverage expected - functions like
claim_next_job(),complete_stage(), andfail_job()are called by the worker process (not yet implemented), not the API routes.Quality Checks
Files Changed
New:
apps/api/src/api/routes/import_jobs.py(113 lines)apps/api/src/api/services/import_pipeline.py(154 lines)apps/api/tests/test_import_jobs_routes.py(481 lines)Modified:
apps/api/src/api/database.py- Add ImportJobStage.order fieldapps/api/src/api/app.py- Register import_jobs routerapps/api/src/api/settings.py- Add gedcom_storage_path settingNext Steps
This establishes the API foundation. Follow-up PRs will implement: