Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ repos:
# Requires opt-in: `pre-commit install --hook-type commit-msg`
# (the default `pre-commit install` only wires the pre-commit stage).
- repo: https://github.com/compilerla/conventional-pre-commit
rev: v3.6.0
rev: v4.4.0
hooks:
- id: conventional-pre-commit
stages: [commit-msg]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""drop legacy file_* columns from ingestion_jobs

Revision ID: 758674b03ba7
Revises: f0a1b2c3d4e5
Create Date: 2026-06-05 17:05:10.229827

Completes the source-agnostic ``ingestion_jobs`` re-key. The file-shaped
columns (``file_path`` / ``file_name`` / ``file_size`` /
``file_mtime_ns``) and the inline ``(dataset_id, file_path)`` UNIQUE
constraint are removed; ``external_id`` and ``fingerprint`` take their
place and become NOT NULL. The new uniqueness invariant is
``(dataset_id, external_id)`` enforced via a UNIQUE INDEX (SQLite
can't ALTER TABLE ADD CONSTRAINT).

DB engine: SQLite.
"""

from collections.abc import Sequence

import sqlalchemy as sa
import sqlmodel
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "758674b03ba7"
down_revision: str | None = "f0a1b2c3d4e5"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# The companion non-unique index is dropped first; batch mode below
# recreates the table without the legacy columns, which also clears
# the inline UNIQUE (dataset_id, file_path) constraint.
op.drop_index("idx_ingestion_job_dataset_external", table_name="ingestion_jobs")

with op.batch_alter_table("ingestion_jobs") as batch_op:
batch_op.drop_column("file_path")
batch_op.drop_column("file_name")
batch_op.drop_column("file_size")
batch_op.drop_column("file_mtime_ns")
batch_op.alter_column(
"external_id",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
)
batch_op.alter_column(
"fingerprint",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
)

op.create_index(
"uq_ingestion_job_dataset_external",
"ingestion_jobs",
["dataset_id", "external_id"],
unique=True,
)


def downgrade() -> None:
op.drop_index("uq_ingestion_job_dataset_external", table_name="ingestion_jobs")

# Re-add the legacy columns nullable so backfill can populate before
# batch mode recreates the table with NOT NULL + inline UNIQUE.
with op.batch_alter_table("ingestion_jobs") as batch_op:
batch_op.add_column(
sa.Column("file_path", sqlmodel.sql.sqltypes.AutoString(), nullable=True)
)
batch_op.add_column(
sa.Column("file_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True)
)
batch_op.add_column(sa.Column("file_size", sa.Integer(), nullable=True))
batch_op.add_column(sa.Column("file_mtime_ns", sa.Integer(), nullable=True))

# Best-effort backfill: copy external_id into file_path / file_name; the
# original size/mtime are unrecoverable, so zeros stand in.
op.execute(
"""
UPDATE ingestion_jobs
SET file_path = external_id,
file_name = external_id,
file_size = 0,
file_mtime_ns = 0
WHERE file_path IS NULL
"""
)

with op.batch_alter_table(
"ingestion_jobs",
table_args=[
sa.UniqueConstraint(
"dataset_id", "file_path", name="uq_ingestion_job_dataset_file"
)
],
) as batch_op:
batch_op.alter_column(
"file_path",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
)
batch_op.alter_column(
"file_name",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
)
batch_op.alter_column("file_size", existing_type=sa.Integer(), nullable=False)
batch_op.alter_column(
"file_mtime_ns", existing_type=sa.Integer(), nullable=False
)
batch_op.alter_column(
"external_id",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
)
batch_op.alter_column(
"fingerprint",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
)

op.create_index(
"idx_ingestion_job_dataset_external",
"ingestion_jobs",
["dataset_id", "external_id"],
unique=False,
)
36 changes: 9 additions & 27 deletions backend/syft_space/components/ingestion/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,11 @@ class IngestionJob(SQLModel, table=True):

__tablename__ = "ingestion_jobs"
__table_args__ = (
# Deprecated uniqueness on (dataset_id, file_path). Will be
# replaced by (dataset_id, external_id) once the legacy file_*
# columns are dropped.
UniqueConstraint(
"dataset_id", "file_path", name="uq_ingestion_job_dataset_file"
"dataset_id", "external_id", name="uq_ingestion_job_dataset_external"
),
# Index for efficient tenant-scoped queries
Index("idx_ingestion_job_tenant_status", "tenant_id", "status"),
# Index for dataset lookups
Index("idx_ingestion_job_dataset_id", "dataset_id"),
# Source-agnostic lookup path. Non-unique during the dual-write
# transition; promoted to UNIQUE once file_path is dropped.
Index("idx_ingestion_job_dataset_external", "dataset_id", "external_id"),
)

id: UUID = Field(default_factory=uuid4, primary_key=True, index=True)
Expand All @@ -62,29 +54,19 @@ class IngestionJob(SQLModel, table=True):
description="Dataset this job belongs to",
)

# Source-agnostic identifiers.
# ``external_id`` is the source-unique opaque key (filesystem path,
# WP post id, RSS guid, S3 key, ...). ``fingerprint`` is the
# source-controlled change-detection token compared as an opaque
# string. Both are nullable during the dual-write transition; they
# will become NOT NULL once the legacy file_* columns are dropped.
external_id: str | None = Field(
default=None,
# Source-agnostic identifiers. ``external_id`` is the source-unique
# opaque key (filesystem path, WP post id, RSS guid, S3 key, ...).
# ``fingerprint`` is the source-controlled change-detection token
# compared as an opaque string.
external_id: str = Field(
...,
index=True,
description="Source-unique identifier (path, post id, guid, ...)",
)
fingerprint: str | None = Field(
default=None,
description="Source-defined change-detection token (opaque)",
fingerprint: str = Field(
..., description="Source-defined change-detection token (opaque)"
)

# Deprecated file_*-specific columns. Populated by dual-write
# alongside external_id/fingerprint until they're dropped.
file_path: str = Field(..., description="Absolute path to the file")
file_name: str = Field(..., description="File name (basename)")
file_size: int = Field(..., description="File size in bytes")
file_mtime_ns: int = Field(..., description="File modification time in nanoseconds")

# Status tracking
status: str = Field(
default=IngestionJobStatus.PENDING.value,
Expand Down
47 changes: 11 additions & 36 deletions backend/syft_space/components/ingestion/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
"""

import asyncio
import json
from pathlib import Path as SyncPath
from typing import TYPE_CHECKING
from uuid import UUID

Expand Down Expand Up @@ -67,13 +65,21 @@ def _build_dataset_type(self, dataset: Dataset):
return dataset_type_cls(dataset.configuration)

def _has_source(self, dataset: Dataset) -> bool:
"""Whether this dataset's binding exposes a ``BaseSource`` instance."""
"""Whether this dataset's binding exposes an active ``BaseSource``.

``NoOpSource`` instances (used by externally-fed bindings like
remote Weaviate) are skipped — spawning a per-dataset task to
iterate an empty change stream is wasted bookkeeping.
"""
try:
dataset_type = self._build_dataset_type(dataset)
except Exception as e:
logger.warning(f"Cannot build dataset_type for {dataset.id}: {e}")
return False
return getattr(dataset_type, "source", None) is not None
source = getattr(dataset_type, "source", None)
if source is None:
return False
return not getattr(source, "IS_NOOP", False)

# -------------------------------------------------------------------------
# Lifecycle
Expand Down Expand Up @@ -267,38 +273,15 @@ async def _handle_source_event(
)
return

file_size, file_mtime_ns = self._derive_legacy_file_fields(event.fingerprint)
await self._ingestion_repository.upsert_by_external_id(
tenant_id=tenant_id,
dataset_id=dataset_id,
external_id=event.external_id,
fingerprint=event.fingerprint,
# Deprecated dual-write — will be removed with the legacy columns.
file_path=event.external_id,
file_name=SyncPath(event.external_id).name,
file_size=file_size,
file_mtime_ns=file_mtime_ns,
)
if self._job_signal is not None:
self._job_signal.set()

@staticmethod
def _derive_legacy_file_fields(fingerprint: str | None) -> tuple[int, int]:
"""Decode the LocalFileSource fingerprint into ``(size, mtime_ns)``.

Transitional dual-write helper. ``fingerprint`` is the canonical
change-detection token; the deprecated ``file_size`` /
``file_mtime_ns`` columns still need values until they're
dropped. Non-decodable fingerprints (non-filesystem sources)
get zeros — those rows never reach chunking, since their
dataset_type is the consumer.
"""
try:
data = json.loads(fingerprint) if fingerprint else {}
except (json.JSONDecodeError, TypeError):
data = {}
return int(data.get("size", 0)), int(data.get("mtime_ns", 0))

# -------------------------------------------------------------------------
# Job processing
# -------------------------------------------------------------------------
Expand Down Expand Up @@ -365,7 +348,7 @@ async def _process_single_job(self, job: IngestionJob) -> None:
return
source, dataset_type = resolved

external_id = job.external_id or job.file_path
external_id = job.external_id
try:
# Fingerprint-drift check via the source. Opaque string compare —
# if the item changed since the job was queued, re-upsert with
Expand All @@ -375,19 +358,11 @@ async def _process_single_job(self, job: IngestionJob) -> None:
current_fp = source.fingerprint(external_id)
if current_fp != job.fingerprint:
logger.info(f"Item changed during processing: {external_id}")
file_size, file_mtime_ns = self._derive_legacy_file_fields(
current_fp
)
await self._ingestion_repository.upsert_by_external_id(
tenant_id=job.tenant_id,
dataset_id=job.dataset_id,
external_id=external_id,
fingerprint=current_fp,
# Deprecated dual-write — will be removed with the legacy columns.
file_path=external_id,
file_name=job.file_name,
file_size=file_size,
file_mtime_ns=file_mtime_ns,
)
await self._ingestion_repository.update_status(
job.id,
Expand Down
19 changes: 1 addition & 18 deletions backend/syft_space/components/ingestion/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ async def upsert_by_external_id(
tenant_id: UUID,
dataset_id: UUID,
external_id: str,
fingerprint: str | None,
file_path: str,
file_name: str,
file_size: int,
file_mtime_ns: int,
fingerprint: str,
) -> IngestionJob:
"""Create or update an ingestion job by source-unique id.

Expand All @@ -55,11 +51,6 @@ async def upsert_by_external_id(
The fingerprint comparison is an opaque string equality check —
sources define the format (see ``BaseSource.fingerprint``); the
repository treats it as a blob.

``file_path`` / ``file_name`` / ``file_size`` / ``file_mtime_ns`` are
deprecated columns dual-written alongside ``external_id`` /
``fingerprint`` and derived by the caller from the source event.
They will be dropped once nothing reads from them.
"""
async with self.db.get_session() as session:
result = await session.exec(
Expand All @@ -83,9 +74,6 @@ async def upsert_by_external_id(
return existing

existing.fingerprint = fingerprint
# Deprecated dual-write — will be removed with the legacy columns.
existing.file_size = file_size
existing.file_mtime_ns = file_mtime_ns
existing.status = IngestionJobStatus.PENDING.value
existing.error_message = None
existing.updated_at = now
Expand All @@ -103,11 +91,6 @@ async def upsert_by_external_id(
dataset_id=dataset_id,
external_id=external_id,
fingerprint=fingerprint,
# Deprecated dual-write — will be removed with the legacy columns.
file_path=file_path,
file_name=file_name,
file_size=file_size,
file_mtime_ns=file_mtime_ns,
status=IngestionJobStatus.PENDING.value,
created_at=now,
updated_at=now,
Expand Down
11 changes: 4 additions & 7 deletions backend/syft_space/components/ingestion/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ class IngestionJobResponse(BaseModel):
"""Response model for a single ingestion job."""

id: UUID = Field(..., description="Job UUID")
external_id: str | None = Field(
None, description="Source-unique identifier (path, post id, guid, ...)"
external_id: str = Field(
..., description="Source-unique identifier (path, post id, guid, ...)"
)
fingerprint: str | None = Field(
None, description="Source-defined change-detection token (opaque)"
fingerprint: str = Field(
..., description="Source-defined change-detection token (opaque)"
)
file_path: str = Field(..., description="Absolute file path")
file_name: str = Field(..., description="File name")
file_size: int = Field(..., description="File size in bytes")
status: str = Field(..., description="Job status")
error_message: str | None = Field(None, description="Error message if failed")
retry_count: int = Field(..., description="Number of retry attempts")
Expand Down
8 changes: 7 additions & 1 deletion backend/syft_space/components/sources/noop_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@


class NoOpSource:
"""No-op source. Lists nothing, fetches nothing, emits no events."""
"""No-op source. Lists nothing, fetches nothing, emits no events.

``IS_NOOP`` is read by the ingestion manager to skip spawning an
empty per-dataset task — bindings whose vector store is fed
externally don't need an ingest pipeline at all.
"""

NAME = "noop"
IS_NOOP = True

def __init__(self, config: dict[str, Any]) -> None:
self.raw_config = config
Expand Down
5 changes: 2 additions & 3 deletions frontend/src/api/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ export interface DatasetTypeInfoResponse {
// Ingestion API types
export interface IngestionJobResponse {
id: string
file_path: string
file_name: string
file_size: number
external_id: string
fingerprint: string
status: string
error_message?: string
retry_count: number
Expand Down
Loading
Loading