|
| 1 | +"""Persistent record of a single KB ingestion run. |
| 2 | +
|
| 3 | +One row per invocation of ``KBIngestionHelper.perform_ingestion``. |
| 4 | +Captures the aggregate counters and per-item outcomes produced by a |
| 5 | +``KBIngestionSource`` so the Phase 2 visibility UI can drill into a run |
| 6 | +without re-walking the vector store. |
| 7 | +
|
| 8 | +Design notes: |
| 9 | +
|
| 10 | +* ``job_id`` links back to the existing ``job`` table (one-to-one). |
| 11 | + Nullable so component-path ingestions (which don't go through the |
| 12 | + job service today) can still record a run. |
| 13 | +* ``source_config`` stores source-specific configuration *minus* |
| 14 | + credentials — cloud connectors in Phase 3 should reference variables |
| 15 | + by name, not embed secret values here. |
| 16 | +* ``items`` is a JSON array of per-file outcomes. Chosen over a |
| 17 | + separate ``ingestion_run_item`` table because the typical run size |
| 18 | + (tens to hundreds of files) fits comfortably in a single row, and |
| 19 | + Phase 2's drill-down reads the full list anyway. If future phases |
| 20 | + need sharded per-item queries, we expand (Phase 1+N) by adding a |
| 21 | + dedicated table in a later EXPAND migration. |
| 22 | +* ``status`` is a Python-side string so we don't need a second enum |
| 23 | + migration when new outcomes are added; the allow-list is enforced in |
| 24 | + the application layer. |
| 25 | +""" |
| 26 | + |
| 27 | +from datetime import datetime, timezone |
| 28 | +from enum import Enum |
| 29 | +from typing import Any |
| 30 | +from uuid import UUID, uuid4 |
| 31 | + |
| 32 | +import sqlalchemy as sa |
| 33 | +from sqlalchemy import JSON, BigInteger, CheckConstraint, Column, DateTime, ForeignKey, func |
| 34 | +from sqlalchemy.dialects.postgresql import JSONB |
| 35 | +from sqlmodel import Field, SQLModel |
| 36 | + |
| 37 | +# JSONB on Postgres (binary, dedup, GIN-indexable on the ``items`` |
| 38 | +# column), JSON elsewhere. Matches the variant used in migration |
| 39 | +# ``72df732be86b``. |
| 40 | +JsonVariant = JSON().with_variant(JSONB(), "postgresql") |
| 41 | + |
| 42 | +# Status allow-list — mirrors ``IngestionRunStatus``. Keep in sync |
| 43 | +# with migration ``72df732be86b``. |
| 44 | +_RUN_STATUS_VALUES = ("pending", "running", "succeeded", "partial", "failed", "cancelled") |
| 45 | + |
| 46 | + |
| 47 | +class IngestionRunStatus(str, Enum): |
| 48 | + """Run-level outcome visible to the UI. |
| 49 | +
|
| 50 | + Kept distinct from ``JobStatus`` — job status describes scheduling |
| 51 | + (queued/in_progress/completed/etc.), while run status describes the |
| 52 | + ingestion *outcome* (did every file land?). |
| 53 | + """ |
| 54 | + |
| 55 | + PENDING = "pending" |
| 56 | + RUNNING = "running" |
| 57 | + SUCCEEDED = "succeeded" |
| 58 | + PARTIAL = "partial" # some items failed but run completed |
| 59 | + FAILED = "failed" |
| 60 | + CANCELLED = "cancelled" |
| 61 | + |
| 62 | + |
| 63 | +class IngestionRunBase(SQLModel): |
| 64 | + id: UUID = Field(default_factory=uuid4, primary_key=True) |
| 65 | + job_id: UUID | None = Field(default=None, index=True, nullable=True) |
| 66 | + # ``kb_name`` is the legacy pointer (kept through an |
| 67 | + # expand-contract transition); ``kb_id`` is the new FK. |
| 68 | + kb_name: str = Field(index=True, nullable=False) |
| 69 | + # ``kb_id`` is the FK; ``ON DELETE SET NULL`` is set at the DB |
| 70 | + # layer so KB deletion does not orphan run history — rows remain |
| 71 | + # visible with a null kb_id. The physical FK constraint is |
| 72 | + # created by migration ``e728126476a8``; this ``sa_column`` makes |
| 73 | + # the ORM aware of it so relationships resolve correctly. |
| 74 | + kb_id: UUID | None = Field( |
| 75 | + default=None, |
| 76 | + sa_column=Column( |
| 77 | + sa.Uuid(), |
| 78 | + ForeignKey("knowledge_base.id", ondelete="SET NULL"), |
| 79 | + index=True, |
| 80 | + nullable=True, |
| 81 | + ), |
| 82 | + ) |
| 83 | + user_id: UUID | None = Field(default=None, index=True, nullable=True) |
| 84 | + |
| 85 | + source_type: str = Field(index=True, nullable=False) |
| 86 | + source_config: dict[str, Any] = Field( |
| 87 | + default_factory=dict, |
| 88 | + sa_column=Column(JsonVariant, nullable=False), |
| 89 | + ) |
| 90 | + |
| 91 | + status: str = Field(default=IngestionRunStatus.PENDING.value, index=True, nullable=False) |
| 92 | + error_message: str | None = Field(default=None, nullable=True) |
| 93 | + |
| 94 | + total_items: int = Field(default=0, nullable=False) |
| 95 | + succeeded: int = Field(default=0, nullable=False) |
| 96 | + failed: int = Field(default=0, nullable=False) |
| 97 | + skipped: int = Field(default=0, nullable=False) |
| 98 | + # BigInteger: cumulative byte count of an ingestion run can |
| 99 | + # exceed the int32 ~2GB cap when ingesting large cloud blobs. |
| 100 | + # Matches ``knowledge_base.size_bytes``. |
| 101 | + total_bytes: int = Field( |
| 102 | + default=0, |
| 103 | + sa_column=Column(BigInteger, nullable=False, server_default="0"), |
| 104 | + ) |
| 105 | + chunks_created: int = Field(default=0, nullable=False) |
| 106 | + |
| 107 | + items: list[dict[str, Any]] = Field( |
| 108 | + default_factory=list, |
| 109 | + sa_column=Column(JsonVariant, nullable=False), |
| 110 | + ) |
| 111 | + |
| 112 | + # User-supplied run-level metadata (tags, categories, custom fields). |
| 113 | + # Persisted here so the run-history UI can render the tags applied to |
| 114 | + # the batch without scanning chunk-level ``source_metadata`` blobs. |
| 115 | + # Per-file overrides are NOT stored here — they live on each chunk's |
| 116 | + # ``source_metadata`` and on the per-item entries in ``items``. |
| 117 | + user_metadata: dict[str, Any] = Field( |
| 118 | + default_factory=dict, |
| 119 | + sa_column=Column(JsonVariant, nullable=False, server_default="{}"), |
| 120 | + ) |
| 121 | + |
| 122 | + # ``server_default=func.now()`` keeps ``started_at`` populated on |
| 123 | + # raw-SQL inserts. Indexed because list endpoints order by this |
| 124 | + # column — without the index, large run histories sequential-scan. |
| 125 | + started_at: datetime = Field( |
| 126 | + default_factory=lambda: datetime.now(timezone.utc), |
| 127 | + sa_column=Column(DateTime(timezone=True), nullable=False, server_default=func.now(), index=True), |
| 128 | + ) |
| 129 | + finished_at: datetime | None = Field( |
| 130 | + default=None, |
| 131 | + sa_column=Column(DateTime(timezone=True), nullable=True), |
| 132 | + ) |
| 133 | + |
| 134 | + |
| 135 | +class IngestionRun(IngestionRunBase, table=True): # type: ignore[call-arg] |
| 136 | + __tablename__ = "ingestion_run" |
| 137 | + __table_args__ = ( |
| 138 | + CheckConstraint( |
| 139 | + "status IN (" + ", ".join(f"'{v}'" for v in _RUN_STATUS_VALUES) + ")", |
| 140 | + name="ck_ingestion_run_status", |
| 141 | + ), |
| 142 | + ) |
0 commit comments