Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Add user_metadata column to ingestion_run

Revision ID: 16a290ab1332
Revises: e728126476a8
Create Date: 2026-04-28 19:00:00.000000

Phase: EXPAND
Safe to rollback: YES (column is nullable + has a JSON default; older
services that ignore the column keep round-tripping rows just fine).
Services compatible: All versions. New code writes user-supplied tags
here; older code ignores the column. Empty objects (``{}``) are
written when no user metadata is supplied so list endpoints can
treat presence/absence of tags consistently.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB
from langflow.utils import migration

# revision identifiers, used by Alembic.
revision: str = "16a290ab1332" # pragma: allowlist secret
down_revision: str | None = "e728126476a8" # pragma: allowlist secret
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

TABLE_NAME = "ingestion_run"
COLUMN_NAME = "user_metadata"

# Mirror the model's JsonVariant choice — JSONB on Postgres, JSON elsewhere.
JsonVariant = sa.JSON().with_variant(JSONB(), "postgresql")


def upgrade() -> None:
conn = op.get_bind()
if not migration.table_exists(TABLE_NAME, conn):
return
if migration.column_exists(TABLE_NAME, COLUMN_NAME, conn):
return

with op.batch_alter_table(TABLE_NAME, schema=None) as batch_op:
batch_op.add_column(
sa.Column(
COLUMN_NAME,
JsonVariant,
nullable=False,
server_default="{}",
)
)


def downgrade() -> None:
conn = op.get_bind()
if not migration.table_exists(TABLE_NAME, conn):
return
if not migration.column_exists(TABLE_NAME, COLUMN_NAME, conn):
return

with op.batch_alter_table(TABLE_NAME, schema=None) as batch_op:
batch_op.drop_column(COLUMN_NAME)
7 changes: 7 additions & 0 deletions src/backend/base/langflow/api/utils/ingestion_run_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async def create_run(
job_id: UUID | None,
user_id: UUID | None,
kb_id: UUID | None = None,
user_metadata: dict | None = None,
) -> UUID:
"""Insert a PENDING ``ingestion_run`` row and return its id.

Expand All @@ -48,6 +49,11 @@ async def create_run(
``kb_name`` as the legacy pointer, so a run still records a row
even if the KB doesn't yet have a ``knowledge_base`` DB entry
(backfill will link them on the next list).

``user_metadata`` carries the run-level tags supplied at the API
boundary (already validated by ``parse_user_metadata``). Stored as
its own column so the run-history UI can render the tags without
decoding the per-chunk ``source_metadata`` blobs.
"""
run_id = uuid4()
description = source.describe()
Expand All @@ -66,6 +72,7 @@ async def create_run(
source_config=source_config,
status=IngestionRunStatus.PENDING.value,
items=[],
user_metadata=user_metadata or {},
)
session.add(row)
await session.commit()
Expand Down
23 changes: 17 additions & 6 deletions src/backend/base/langflow/api/utils/kb_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

import chromadb
import chromadb.errors
Expand Down Expand Up @@ -515,6 +516,7 @@ async def perform_ingestion(
source_type: str = DEFAULT_INGESTION_SOURCE_TYPE,
source_metadata: dict | None = None,
source: KBIngestionSource | None = None,
per_file_metadata: dict[str, dict] | None = None,
) -> dict[str, object]:
"""Orchestrate the ingestion of content into a knowledge base.

Expand All @@ -541,9 +543,12 @@ async def perform_ingestion(
if not files_data:
msg = "perform_ingestion requires either 'source' or non-empty 'files_data'."
raise ValueError(msg)
source_config: dict[str, Any] = {"files": files_data, "source_name": source_name}
if per_file_metadata:
source_config["per_file_metadata"] = per_file_metadata
source = FileUploadSource(
user_id=current_user.id,
source_config={"files": files_data, "source_name": source_name},
source_config=source_config,
)

try:
Expand All @@ -558,6 +563,7 @@ async def perform_ingestion(
user_id=current_user.id,
job_id=task_job_id,
source_config=source.describe().get("config") or {},
user_metadata=dict(source_metadata or {}),
)
# Link the run to the ``knowledge_base`` row when one exists.
# During the Phase 1.5 rollout some KBs still only exist in
Expand All @@ -567,6 +573,7 @@ async def perform_ingestion(
kb_record_id = kb_record.id if kb_record is not None else None
run_id = await ingestion_run_service.create_run(
kb_name=kb_name,
user_metadata=dict(source_metadata or {}),
source=source,
job_id=task_job_id,
user_id=current_user.id,
Expand Down Expand Up @@ -657,11 +664,15 @@ async def perform_ingestion(
)
continue

# Collapse per-item + run-level metadata into one blob so
# Phase 2 can render either view.
combined_metadata: dict = dict(item.source_metadata or {})
if source_metadata:
combined_metadata.update(source_metadata)
# Collapse run-level + per-item metadata into one blob so
# Phase 2 can render either view. Per-item wins on key
# collision: callers that set both run-level and per-file
# tags expect the file-specific value to override the
# batch default (e.g. ``confidential=true`` on one file
# in an otherwise public batch).
combined_metadata: dict = dict(source_metadata or {})
if item.source_metadata:
combined_metadata.update(item.source_metadata)
item_metadata_tag = json.dumps(combined_metadata) if combined_metadata else encoded_metadata_tag

chunks = text_splitter.split_text(text)
Expand Down
134 changes: 134 additions & 0 deletions src/backend/base/langflow/api/utils/kb_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""User-supplied metadata helpers for Knowledge Base ingestion.

Two helpers:

* :func:`parse_user_metadata` decodes a JSON string from a multipart Form
field, runs the rule set in :func:`validate_user_metadata`, and returns a
dict — or raises :class:`fastapi.HTTPException` with a 422 status so the
rejection surfaces as an inline form-validation error in the UI.
* :func:`parse_per_file_metadata` does the same for the
``per_file_metadata`` field, which carries a ``{filename: {...}}`` map of
per-file overrides. Each inner dict goes through the same validator.

Mirrors the bounds in :mod:`langflow.utils.kb_constants` and is the only
metadata gate at the API boundary — :class:`KBIngestionHelper.perform_ingestion`
trusts that whatever it receives has already been validated.
"""

from __future__ import annotations

import json
from typing import Any

from fastapi import HTTPException

from langflow.utils.kb_constants import (
KB_METADATA_MAX_ARRAY_LENGTH,
KB_METADATA_MAX_KEY_LENGTH,
KB_METADATA_MAX_KEYS,
KB_METADATA_MAX_VALUE_LENGTH,
KB_METADATA_RESERVED_KEYS,
)

_KEY_ALLOWED_CHARS = set("abcdefghijklmnopqrstuvwxyz0123456789_")


def _is_valid_key(key: str) -> bool:
if not key or len(key) > KB_METADATA_MAX_KEY_LENGTH:
return False
return all(c in _KEY_ALLOWED_CHARS for c in key)


def _validate_value(key: str, value: Any) -> None:
if isinstance(value, (bool, int, float)):
return
if isinstance(value, str):
if len(value) > KB_METADATA_MAX_VALUE_LENGTH:
msg = f"Metadata value for '{key}' exceeds {KB_METADATA_MAX_VALUE_LENGTH} characters."
raise HTTPException(status_code=422, detail=msg)
return
if isinstance(value, list):
if len(value) > KB_METADATA_MAX_ARRAY_LENGTH:
msg = f"Metadata array '{key}' exceeds {KB_METADATA_MAX_ARRAY_LENGTH} items."
raise HTTPException(status_code=422, detail=msg)
for entry in value:
if not isinstance(entry, str):
msg = f"Metadata array '{key}' must contain only strings."
raise HTTPException(status_code=422, detail=msg)
if len(entry) > KB_METADATA_MAX_VALUE_LENGTH:
msg = f"Metadata array entry under '{key}' exceeds {KB_METADATA_MAX_VALUE_LENGTH} characters."
raise HTTPException(status_code=422, detail=msg)
return
msg = f"Metadata value for '{key}' must be a string, number, bool, or string array; got {type(value).__name__}."
raise HTTPException(status_code=422, detail=msg)


def validate_user_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
"""Enforce the user-metadata contract on a decoded dict.

Returns the same dict (a shallow copy is *not* made — callers may mutate
safely once validation passes). Raises :class:`HTTPException` with a 422
status on any violation so FastAPI surfaces an inline error.
"""
if not isinstance(metadata, dict):
msg = "Metadata must be a JSON object."
raise HTTPException(status_code=422, detail=msg)
if len(metadata) > KB_METADATA_MAX_KEYS:
msg = f"Metadata exceeds the {KB_METADATA_MAX_KEYS} key limit."
raise HTTPException(status_code=422, detail=msg)
for key, value in metadata.items():
if not isinstance(key, str) or not _is_valid_key(key):
msg = (
f"Metadata key {key!r} is invalid: must be 1-{KB_METADATA_MAX_KEY_LENGTH} "
"lowercase alphanumeric or underscore characters."
)
raise HTTPException(status_code=422, detail=msg)
if key in KB_METADATA_RESERVED_KEYS:
msg = f"Metadata key '{key}' is reserved for ingestion-internal use."
raise HTTPException(status_code=422, detail=msg)
_validate_value(key, value)
return metadata


def parse_user_metadata(raw: str | None) -> dict[str, Any]:
"""Decode + validate the ``metadata`` form field. Empty/None → ``{}``."""
if not raw:
return {}
try:
decoded = json.loads(raw)
except json.JSONDecodeError as exc:
msg = f"Metadata is not valid JSON: {exc.msg}"
raise HTTPException(status_code=422, detail=msg) from exc
return validate_user_metadata(decoded)


def parse_per_file_metadata(raw: str | None) -> dict[str, dict[str, Any]]:
"""Decode + validate the ``per_file_metadata`` form field.

Shape: ``{filename: {metadata_dict}, ...}``. Each inner dict goes through
the same validator as run-level metadata, so per-file overrides obey the
same key/value rules. Empty/None → ``{}``.
"""
if not raw:
return {}
try:
decoded = json.loads(raw)
except json.JSONDecodeError as exc:
msg = f"Per-file metadata is not valid JSON: {exc.msg}"
raise HTTPException(status_code=422, detail=msg) from exc
if not isinstance(decoded, dict):
msg = "Per-file metadata must be a JSON object keyed by filename."
raise HTTPException(status_code=422, detail=msg)
if len(decoded) > KB_METADATA_MAX_KEYS:
msg = f"Per-file metadata exceeds the {KB_METADATA_MAX_KEYS} file limit."
raise HTTPException(status_code=422, detail=msg)
out: dict[str, dict[str, Any]] = {}
for filename, file_metadata in decoded.items():
if not isinstance(filename, str) or not filename:
msg = "Per-file metadata keys must be non-empty filename strings."
raise HTTPException(status_code=422, detail=msg)
if not isinstance(file_metadata, dict):
msg = f"Per-file metadata for {filename!r} must be a JSON object."
raise HTTPException(status_code=422, detail=msg)
out[filename] = validate_user_metadata(file_metadata)
return out
Loading
Loading