diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4281d1a2..49e49ac0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,7 +53,7 @@ repos: # Requires opt-in: `pre-commit install --hook-type commit-msg` # (the default `pre-commit install` only wires the pre-commit stage). - repo: https://github.com/compilerla/conventional-pre-commit - rev: v3.6.0 + rev: v4.4.0 hooks: - id: conventional-pre-commit stages: [commit-msg] diff --git a/backend/syft_space/alembic/versions/758674b03ba7_drop_legacy_file__columns_from_.py b/backend/syft_space/alembic/versions/758674b03ba7_drop_legacy_file__columns_from_.py new file mode 100644 index 00000000..d6ec0141 --- /dev/null +++ b/backend/syft_space/alembic/versions/758674b03ba7_drop_legacy_file__columns_from_.py @@ -0,0 +1,127 @@ +"""drop legacy file_* columns from ingestion_jobs + +Revision ID: 758674b03ba7 +Revises: f0a1b2c3d4e5 +Create Date: 2026-06-05 17:05:10.229827 + +Completes the source-agnostic ``ingestion_jobs`` re-key. The file-shaped +columns (``file_path`` / ``file_name`` / ``file_size`` / +``file_mtime_ns``) and the inline ``(dataset_id, file_path)`` UNIQUE +constraint are removed; ``external_id`` and ``fingerprint`` take their +place and become NOT NULL. The new uniqueness invariant is +``(dataset_id, external_id)`` enforced via a UNIQUE INDEX (SQLite +can't ALTER TABLE ADD CONSTRAINT). + +DB engine: SQLite. +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +import sqlmodel +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "758674b03ba7" +down_revision: str | None = "f0a1b2c3d4e5" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # The companion non-unique index is dropped first; batch mode below + # recreates the table without the legacy columns, which also clears + # the inline UNIQUE (dataset_id, file_path) constraint. + op.drop_index("idx_ingestion_job_dataset_external", table_name="ingestion_jobs") + + with op.batch_alter_table("ingestion_jobs") as batch_op: + batch_op.drop_column("file_path") + batch_op.drop_column("file_name") + batch_op.drop_column("file_size") + batch_op.drop_column("file_mtime_ns") + batch_op.alter_column( + "external_id", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ) + batch_op.alter_column( + "fingerprint", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ) + + op.create_index( + "uq_ingestion_job_dataset_external", + "ingestion_jobs", + ["dataset_id", "external_id"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index("uq_ingestion_job_dataset_external", table_name="ingestion_jobs") + + # Re-add the legacy columns nullable so backfill can populate before + # batch mode recreates the table with NOT NULL + inline UNIQUE. + with op.batch_alter_table("ingestion_jobs") as batch_op: + batch_op.add_column( + sa.Column("file_path", sqlmodel.sql.sqltypes.AutoString(), nullable=True) + ) + batch_op.add_column( + sa.Column("file_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True) + ) + batch_op.add_column(sa.Column("file_size", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("file_mtime_ns", sa.Integer(), nullable=True)) + + # Best-effort backfill: copy external_id into file_path / file_name; the + # original size/mtime are unrecoverable, so zeros stand in. + op.execute( + """ + UPDATE ingestion_jobs + SET file_path = external_id, + file_name = external_id, + file_size = 0, + file_mtime_ns = 0 + WHERE file_path IS NULL + """ + ) + + with op.batch_alter_table( + "ingestion_jobs", + table_args=[ + sa.UniqueConstraint( + "dataset_id", "file_path", name="uq_ingestion_job_dataset_file" + ) + ], + ) as batch_op: + batch_op.alter_column( + "file_path", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ) + batch_op.alter_column( + "file_name", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ) + batch_op.alter_column("file_size", existing_type=sa.Integer(), nullable=False) + batch_op.alter_column( + "file_mtime_ns", existing_type=sa.Integer(), nullable=False + ) + batch_op.alter_column( + "external_id", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=True, + ) + batch_op.alter_column( + "fingerprint", + existing_type=sqlmodel.sql.sqltypes.AutoString(), + nullable=True, + ) + + op.create_index( + "idx_ingestion_job_dataset_external", + "ingestion_jobs", + ["dataset_id", "external_id"], + unique=False, + ) diff --git a/backend/syft_space/components/ingestion/entities.py b/backend/syft_space/components/ingestion/entities.py index 83b7855a..ca61986f 100644 --- a/backend/syft_space/components/ingestion/entities.py +++ b/backend/syft_space/components/ingestion/entities.py @@ -31,19 +31,11 @@ class IngestionJob(SQLModel, table=True): __tablename__ = "ingestion_jobs" __table_args__ = ( - # Deprecated uniqueness on (dataset_id, file_path). Will be - # replaced by (dataset_id, external_id) once the legacy file_* - # columns are dropped. UniqueConstraint( - "dataset_id", "file_path", name="uq_ingestion_job_dataset_file" + "dataset_id", "external_id", name="uq_ingestion_job_dataset_external" ), - # Index for efficient tenant-scoped queries Index("idx_ingestion_job_tenant_status", "tenant_id", "status"), - # Index for dataset lookups Index("idx_ingestion_job_dataset_id", "dataset_id"), - # Source-agnostic lookup path. Non-unique during the dual-write - # transition; promoted to UNIQUE once file_path is dropped. - Index("idx_ingestion_job_dataset_external", "dataset_id", "external_id"), ) id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) @@ -62,29 +54,19 @@ class IngestionJob(SQLModel, table=True): description="Dataset this job belongs to", ) - # Source-agnostic identifiers. - # ``external_id`` is the source-unique opaque key (filesystem path, - # WP post id, RSS guid, S3 key, ...). ``fingerprint`` is the - # source-controlled change-detection token compared as an opaque - # string. Both are nullable during the dual-write transition; they - # will become NOT NULL once the legacy file_* columns are dropped. - external_id: str | None = Field( - default=None, + # Source-agnostic identifiers. ``external_id`` is the source-unique + # opaque key (filesystem path, WP post id, RSS guid, S3 key, ...). + # ``fingerprint`` is the source-controlled change-detection token + # compared as an opaque string. + external_id: str = Field( + ..., index=True, description="Source-unique identifier (path, post id, guid, ...)", ) - fingerprint: str | None = Field( - default=None, - description="Source-defined change-detection token (opaque)", + fingerprint: str = Field( + ..., description="Source-defined change-detection token (opaque)" ) - # Deprecated file_*-specific columns. Populated by dual-write - # alongside external_id/fingerprint until they're dropped. - file_path: str = Field(..., description="Absolute path to the file") - file_name: str = Field(..., description="File name (basename)") - file_size: int = Field(..., description="File size in bytes") - file_mtime_ns: int = Field(..., description="File modification time in nanoseconds") - # Status tracking status: str = Field( default=IngestionJobStatus.PENDING.value, diff --git a/backend/syft_space/components/ingestion/manager.py b/backend/syft_space/components/ingestion/manager.py index 5d89ba88..3b949bca 100644 --- a/backend/syft_space/components/ingestion/manager.py +++ b/backend/syft_space/components/ingestion/manager.py @@ -12,8 +12,6 @@ """ import asyncio -import json -from pathlib import Path as SyncPath from typing import TYPE_CHECKING from uuid import UUID @@ -67,13 +65,21 @@ def _build_dataset_type(self, dataset: Dataset): return dataset_type_cls(dataset.configuration) def _has_source(self, dataset: Dataset) -> bool: - """Whether this dataset's binding exposes a ``BaseSource`` instance.""" + """Whether this dataset's binding exposes an active ``BaseSource``. + + ``NoOpSource`` instances (used by externally-fed bindings like + remote Weaviate) are skipped — spawning a per-dataset task to + iterate an empty change stream is wasted bookkeeping. + """ try: dataset_type = self._build_dataset_type(dataset) except Exception as e: logger.warning(f"Cannot build dataset_type for {dataset.id}: {e}") return False - return getattr(dataset_type, "source", None) is not None + source = getattr(dataset_type, "source", None) + if source is None: + return False + return not getattr(source, "IS_NOOP", False) # ------------------------------------------------------------------------- # Lifecycle @@ -267,38 +273,15 @@ async def _handle_source_event( ) return - file_size, file_mtime_ns = self._derive_legacy_file_fields(event.fingerprint) await self._ingestion_repository.upsert_by_external_id( tenant_id=tenant_id, dataset_id=dataset_id, external_id=event.external_id, fingerprint=event.fingerprint, - # Deprecated dual-write — will be removed with the legacy columns. - file_path=event.external_id, - file_name=SyncPath(event.external_id).name, - file_size=file_size, - file_mtime_ns=file_mtime_ns, ) if self._job_signal is not None: self._job_signal.set() - @staticmethod - def _derive_legacy_file_fields(fingerprint: str | None) -> tuple[int, int]: - """Decode the LocalFileSource fingerprint into ``(size, mtime_ns)``. - - Transitional dual-write helper. ``fingerprint`` is the canonical - change-detection token; the deprecated ``file_size`` / - ``file_mtime_ns`` columns still need values until they're - dropped. Non-decodable fingerprints (non-filesystem sources) - get zeros — those rows never reach chunking, since their - dataset_type is the consumer. - """ - try: - data = json.loads(fingerprint) if fingerprint else {} - except (json.JSONDecodeError, TypeError): - data = {} - return int(data.get("size", 0)), int(data.get("mtime_ns", 0)) - # ------------------------------------------------------------------------- # Job processing # ------------------------------------------------------------------------- @@ -365,7 +348,7 @@ async def _process_single_job(self, job: IngestionJob) -> None: return source, dataset_type = resolved - external_id = job.external_id or job.file_path + external_id = job.external_id try: # Fingerprint-drift check via the source. Opaque string compare — # if the item changed since the job was queued, re-upsert with @@ -375,19 +358,11 @@ async def _process_single_job(self, job: IngestionJob) -> None: current_fp = source.fingerprint(external_id) if current_fp != job.fingerprint: logger.info(f"Item changed during processing: {external_id}") - file_size, file_mtime_ns = self._derive_legacy_file_fields( - current_fp - ) await self._ingestion_repository.upsert_by_external_id( tenant_id=job.tenant_id, dataset_id=job.dataset_id, external_id=external_id, fingerprint=current_fp, - # Deprecated dual-write — will be removed with the legacy columns. - file_path=external_id, - file_name=job.file_name, - file_size=file_size, - file_mtime_ns=file_mtime_ns, ) await self._ingestion_repository.update_status( job.id, diff --git a/backend/syft_space/components/ingestion/repository.py b/backend/syft_space/components/ingestion/repository.py index a8751c0d..893816c6 100644 --- a/backend/syft_space/components/ingestion/repository.py +++ b/backend/syft_space/components/ingestion/repository.py @@ -39,11 +39,7 @@ async def upsert_by_external_id( tenant_id: UUID, dataset_id: UUID, external_id: str, - fingerprint: str | None, - file_path: str, - file_name: str, - file_size: int, - file_mtime_ns: int, + fingerprint: str, ) -> IngestionJob: """Create or update an ingestion job by source-unique id. @@ -55,11 +51,6 @@ async def upsert_by_external_id( The fingerprint comparison is an opaque string equality check — sources define the format (see ``BaseSource.fingerprint``); the repository treats it as a blob. - - ``file_path`` / ``file_name`` / ``file_size`` / ``file_mtime_ns`` are - deprecated columns dual-written alongside ``external_id`` / - ``fingerprint`` and derived by the caller from the source event. - They will be dropped once nothing reads from them. """ async with self.db.get_session() as session: result = await session.exec( @@ -83,9 +74,6 @@ async def upsert_by_external_id( return existing existing.fingerprint = fingerprint - # Deprecated dual-write — will be removed with the legacy columns. - existing.file_size = file_size - existing.file_mtime_ns = file_mtime_ns existing.status = IngestionJobStatus.PENDING.value existing.error_message = None existing.updated_at = now @@ -103,11 +91,6 @@ async def upsert_by_external_id( dataset_id=dataset_id, external_id=external_id, fingerprint=fingerprint, - # Deprecated dual-write — will be removed with the legacy columns. - file_path=file_path, - file_name=file_name, - file_size=file_size, - file_mtime_ns=file_mtime_ns, status=IngestionJobStatus.PENDING.value, created_at=now, updated_at=now, diff --git a/backend/syft_space/components/ingestion/schemas.py b/backend/syft_space/components/ingestion/schemas.py index 20eac20c..5f21d52b 100644 --- a/backend/syft_space/components/ingestion/schemas.py +++ b/backend/syft_space/components/ingestion/schemas.py @@ -10,15 +10,12 @@ class IngestionJobResponse(BaseModel): """Response model for a single ingestion job.""" id: UUID = Field(..., description="Job UUID") - external_id: str | None = Field( - None, description="Source-unique identifier (path, post id, guid, ...)" + external_id: str = Field( + ..., description="Source-unique identifier (path, post id, guid, ...)" ) - fingerprint: str | None = Field( - None, description="Source-defined change-detection token (opaque)" + fingerprint: str = Field( + ..., description="Source-defined change-detection token (opaque)" ) - file_path: str = Field(..., description="Absolute file path") - file_name: str = Field(..., description="File name") - file_size: int = Field(..., description="File size in bytes") status: str = Field(..., description="Job status") error_message: str | None = Field(None, description="Error message if failed") retry_count: int = Field(..., description="Number of retry attempts") diff --git a/backend/syft_space/components/sources/noop_source.py b/backend/syft_space/components/sources/noop_source.py index a4a0ef86..1113d4fd 100644 --- a/backend/syft_space/components/sources/noop_source.py +++ b/backend/syft_space/components/sources/noop_source.py @@ -20,9 +20,15 @@ class NoOpSource: - """No-op source. Lists nothing, fetches nothing, emits no events.""" + """No-op source. Lists nothing, fetches nothing, emits no events. + + ``IS_NOOP`` is read by the ingestion manager to skip spawning an + empty per-dataset task — bindings whose vector store is fed + externally don't need an ingest pipeline at all. + """ NAME = "noop" + IS_NOOP = True def __init__(self, config: dict[str, Any]) -> None: self.raw_config = config diff --git a/frontend/src/api/types/index.ts b/frontend/src/api/types/index.ts index 47bd699a..fdeeadec 100644 --- a/frontend/src/api/types/index.ts +++ b/frontend/src/api/types/index.ts @@ -109,9 +109,8 @@ export interface DatasetTypeInfoResponse { // Ingestion API types export interface IngestionJobResponse { id: string - file_path: string - file_name: string - file_size: number + external_id: string + fingerprint: string status: string error_message?: string retry_count: number diff --git a/frontend/src/pages/DatasetDetailPage.vue b/frontend/src/pages/DatasetDetailPage.vue index 674043dd..12480206 100644 --- a/frontend/src/pages/DatasetDetailPage.vue +++ b/frontend/src/pages/DatasetDetailPage.vue @@ -433,11 +433,10 @@
-

{{ job.file_name }}

+

+ {{ job.external_id.split('/').pop() || job.external_id }} +

-

- {{ formatFileSize(job.file_size) }} -

@@ -883,14 +882,6 @@ const retryFailedJobs = async () => { } } -const formatFileSize = (bytes: number): string => { - if (bytes === 0) return '0 B' - const k = 1024 - const sizes = ['B', 'KB', 'MB', 'GB'] - const i = Math.floor(Math.log(bytes) / Math.log(k)) - return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i] -} - const getJobStatusBadgeVariant = (status: string) => { switch (status) { case 'completed': diff --git a/frontend/src/pages/EndpointDetailPage.vue b/frontend/src/pages/EndpointDetailPage.vue index b3749da4..a430d31c 100644 --- a/frontend/src/pages/EndpointDetailPage.vue +++ b/frontend/src/pages/EndpointDetailPage.vue @@ -1110,7 +1110,7 @@ const watchedPathStats = computed(() => { } for (const job of ingestionJobs.value?.jobs ?? []) { for (const [path, entry] of stats) { - if (job.file_path.startsWith(path)) { + if (job.external_id.startsWith(path)) { entry.fileCount += 1 entry.counts[job.status] = (entry.counts[job.status] || 0) + 1 } @@ -1145,6 +1145,49 @@ const getWatchedPaths = computed(() => { }) }) +// Get statistics for a specific path by filtering jobs +const getStatsForPath = (watchedPath: string) => { + if (!ingestionJobs.value?.jobs) { + return { fileCount: 0, status: 'unknown' } + } + + // Filter jobs that start with the watched path + const pathJobs = ingestionJobs.value.jobs.filter((job) => + job.external_id.startsWith(watchedPath), + ) + + if (pathJobs.length === 0) { + return { fileCount: 0, status: 'unknown' } + } + + // Count jobs by status + const statusCounts = pathJobs.reduce( + (counts, job) => { + counts[job.status] = (counts[job.status] || 0) + 1 + return counts + }, + {} as Record, + ) + + // Determine overall status based on priority + let status = 'unknown' + if ((statusCounts['in_progress'] ?? 0) > 0) { + status = 'processing' + } else if ((statusCounts['pending'] ?? 0) > 0) { + status = 'queued' + } else if ((statusCounts['failed'] ?? 0) > 0) { + status = 'errored' + } else if ((statusCounts['completed'] ?? 0) > 0) { + status = 'indexed' + } + + return { + fileCount: pathJobs.length, + status, + } +} + +// Fetch all ingestion jobs across all pages const fetchAllIngestionJobs = async (datasetId: string) => { const allJobs = [] let offset = 0