Skip to content

Commit c01ffb9

Browse files
authored
Merge branch 'release-1.10.0' into feat/extension-production-install
2 parents 7a346d4 + 5c823f6 commit c01ffb9

13 files changed

Lines changed: 2708 additions & 57 deletions

File tree

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""add_preprocessing_output
2+
3+
Adds:
4+
- memory_base.preproc_kill_phrase (nullable String) to support LLM gating sentinel.
5+
- memory_base_preprocessing_output table — one row per preprocessing batch capturing
6+
the LLM output, status (processed/ingested/skipped), and the source message-id list
7+
so two-phase commit (LLM call → Chroma write) can resume after KB failures.
8+
9+
Phase: EXPAND
10+
11+
Revision ID: mb01b2c3d4e5
12+
Revises: kb1a2b3c4d5e
13+
Create Date: 2026-05-01 00:00:00.000000
14+
"""
15+
16+
from collections.abc import Sequence
17+
18+
import sqlalchemy as sa
19+
from alembic import op
20+
from langflow.utils import migration
21+
22+
revision: str = "mb01b2c3d4e5" # pragma: allowlist secret
23+
down_revision: str | None = "kb1a2b3c4d5e" # pragma: allowlist secret
24+
branch_labels: str | Sequence[str] | None = None
25+
depends_on: str | Sequence[str] | None = None
26+
27+
28+
def upgrade() -> None:
29+
conn = op.get_bind()
30+
31+
# ------------------------------------------------------------------ #
32+
# memory_base.preproc_kill_phrase #
33+
# ------------------------------------------------------------------ #
34+
with op.batch_alter_table("memory_base", schema=None) as batch_op:
35+
if not migration.column_exists("memory_base", "preproc_kill_phrase", conn):
36+
batch_op.add_column(sa.Column("preproc_kill_phrase", sa.String(), nullable=True))
37+
38+
# ------------------------------------------------------------------ #
39+
# memory_base_preprocessing_output #
40+
# ------------------------------------------------------------------ #
41+
if not migration.table_exists("memory_base_preprocessing_output", conn):
42+
op.create_table(
43+
"memory_base_preprocessing_output",
44+
sa.Column("id", sa.Uuid(), nullable=False),
45+
sa.Column(
46+
"memory_base_id",
47+
sa.Uuid(),
48+
sa.ForeignKey("memory_base.id", ondelete="CASCADE"),
49+
nullable=False,
50+
),
51+
sa.Column("session_id", sa.String(), nullable=False),
52+
sa.Column(
53+
"job_id",
54+
sa.Uuid(),
55+
sa.ForeignKey("job.job_id", ondelete="SET NULL"),
56+
nullable=True,
57+
),
58+
sa.Column("status", sa.String(), nullable=False),
59+
sa.Column("output_text", sa.Text(), nullable=True),
60+
sa.Column("source_message_ids", sa.JSON(), nullable=False),
61+
sa.Column("model_used", sa.String(), nullable=False),
62+
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
63+
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
64+
sa.PrimaryKeyConstraint("id"),
65+
)
66+
op.create_index(
67+
"ix_mbpo_pending",
68+
"memory_base_preprocessing_output",
69+
["memory_base_id", "session_id", "status", "created_at"],
70+
)
71+
op.create_index(
72+
"ix_mbpo_listing",
73+
"memory_base_preprocessing_output",
74+
["memory_base_id", "session_id", "created_at"],
75+
)
76+
op.create_index(
77+
"ix_mbpo_job_id",
78+
"memory_base_preprocessing_output",
79+
["job_id"],
80+
)
81+
82+
83+
def downgrade() -> None:
84+
conn = op.get_bind()
85+
86+
if migration.table_exists("memory_base_preprocessing_output", conn):
87+
op.drop_index("ix_mbpo_job_id", table_name="memory_base_preprocessing_output")
88+
op.drop_index("ix_mbpo_listing", table_name="memory_base_preprocessing_output")
89+
op.drop_index("ix_mbpo_pending", table_name="memory_base_preprocessing_output")
90+
op.drop_table("memory_base_preprocessing_output")
91+
92+
with op.batch_alter_table("memory_base", schema=None) as batch_op:
93+
if migration.column_exists("memory_base", "preproc_kill_phrase", conn):
94+
batch_op.drop_column("preproc_kill_phrase")

src/backend/base/langflow/api/utils/kb_helpers.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,35 @@ class IngestionCancelledError(Exception):
7070
"""Custom error for when an ingestion job is cancelled."""
7171

7272

73+
def chunk_text_for_ingestion(
74+
text: str,
75+
*,
76+
chunk_size: int = 1000,
77+
chunk_overlap: int = 100,
78+
separator: str | None = None,
79+
) -> list[str]:
80+
r"""Split text into chunks using ``RecursiveCharacterTextSplitter``.
81+
82+
Single source of truth for chunking config used by every ingestion path —
83+
KB file ingestion and Memory Base raw / preprocessed message ingestion.
84+
Centralizing this keeps chunk-size / overlap behavior identical so a
85+
chunk that fits in one path won't suddenly overflow in another.
86+
87+
``separator``: when provided, escaped newlines (``"\\n"``) are unescaped
88+
and the value is passed as a single-element ``separators`` list, matching
89+
the behavior of ``KBIngestionHelper.perform_ingestion``.
90+
91+
Returns ``[]`` for empty / whitespace-only input.
92+
"""
93+
if not text or not text.strip():
94+
return []
95+
splitter_kwargs: dict = {"chunk_size": chunk_size, "chunk_overlap": chunk_overlap}
96+
if separator:
97+
splitter_kwargs["separators"] = [separator.replace("\\n", "\n")]
98+
splitter = RecursiveCharacterTextSplitter(**splitter_kwargs)
99+
return splitter.split_text(text)
100+
101+
73102
class KBStorageHelper:
74103
"""Helper class for Knowledge Base storage and path management."""
75104

@@ -664,12 +693,6 @@ async def perform_ingestion(
664693
encoded_metadata_tag = json.dumps(source_metadata) if source_metadata else ""
665694
source_extension_tags: set[str] = set()
666695
try:
667-
splitter_kwargs: dict = {"chunk_size": chunk_size, "chunk_overlap": chunk_overlap}
668-
if separator:
669-
resolved_separator = separator.replace("\\n", "\n")
670-
splitter_kwargs["separators"] = [resolved_separator]
671-
text_splitter = RecursiveCharacterTextSplitter(**splitter_kwargs)
672-
673696
embeddings = await KBIngestionHelper.build_embeddings(embedding_provider, embedding_model, current_user)
674697
backend_type_value = (
675698
kb_record.backend_type if kb_record and kb_record.backend_type else BackendType.CHROMA.value
@@ -736,7 +759,12 @@ async def perform_ingestion(
736759
combined_metadata.update(item.source_metadata)
737760
item_metadata_tag = json.dumps(combined_metadata) if combined_metadata else encoded_metadata_tag
738761

739-
chunks = text_splitter.split_text(text)
762+
chunks = chunk_text_for_ingestion(
763+
text,
764+
chunk_size=chunk_size,
765+
chunk_overlap=chunk_overlap,
766+
separator=separator,
767+
)
740768
docs = [
741769
Document(
742770
page_content=c,

src/backend/base/langflow/api/v1/memories.py

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from fastapi_pagination import Page, Params
2929
from fastapi_pagination.ext.sqlmodel import apaginate
3030
from pydantic import BaseModel
31-
from sqlmodel import col, select
31+
from sqlmodel import select
3232

3333
from langflow.api.utils import CurrentActiveUser
3434
from langflow.services.database.models.memory_base.model import (
@@ -38,9 +38,9 @@
3838
MemoryBaseSessionRead,
3939
MemoryBaseUpdate,
4040
)
41-
from langflow.services.database.models.message.model import MessageTable
4241
from langflow.services.deps import get_memory_base_service, session_scope
4342
from langflow.services.jobs import DuplicateJobError
43+
from langflow.services.memory_base.service import PreprocessingValidationError
4444

4545
router = APIRouter(tags=["Memories"], prefix="/memories", include_in_schema=False)
4646

@@ -105,6 +105,8 @@ async def create_memory_base(
105105
except PermissionError as exc:
106106
# Flow not found or belongs to another user — return 404 to avoid info-leak
107107
raise HTTPException(status_code=404, detail=str(exc)) from exc
108+
except PreprocessingValidationError as exc:
109+
raise HTTPException(status_code=422, detail=str(exc)) from exc
108110
except ValueError as exc:
109111
raise HTTPException(status_code=409, detail=str(exc)) from exc
110112
return MemoryBaseRead.model_validate(mb)
@@ -193,37 +195,44 @@ async def list_session_messages(
193195
194196
Returns 404 if the Memory Base does not belong to the current user.
195197
"""
196-
from sqlalchemy import and_
197-
198-
from langflow.services.database.models.memory_base.model import MessageIngestionRecord
199-
198+
service = get_memory_base_service()
200199
async with session_scope() as db:
201200
mb_stmt = select(MemoryBase).where(MemoryBase.id == memory_base_id).where(MemoryBase.user_id == current_user.id)
202201
result = await db.exec(mb_stmt)
203-
if result.first() is None:
202+
mb = result.first()
203+
if mb is None:
204204
raise HTTPException(status_code=404, detail="Memory base not found")
205205

206-
# INNER JOIN — only messages that were actually ingested into this MB/session pair.
207-
# No extra WHERE filters needed:
208-
# - mir.session_id == session_id in the JOIN guarantees msg.session_id == session_id
209-
# (session_id is denormalized from the message at ingestion time — immutable).
210-
# - flow_id is implicitly correct: ingestion only ever touches messages from mb.flow_id,
211-
# and MB ownership is already verified above.
212-
msg_stmt = (
213-
select(MessageTable, MessageIngestionRecord)
214-
.join(
215-
MessageIngestionRecord,
216-
and_(
217-
MessageIngestionRecord.message_id == MessageTable.id,
218-
MessageIngestionRecord.memory_base_id == memory_base_id,
219-
MessageIngestionRecord.session_id == session_id,
220-
),
206+
if mb.preprocessing:
207+
# Preprocessing MBs: the KB holds LLM-distilled output, so the
208+
# surface for "what's in the KB" is MemoryBasePreprocessingOutput,
209+
# not MessageTable. Project the row into the same response shape
210+
# so the API contract is identical from the frontend's perspective.
211+
stmt = service.session_preprocessed_outputs_stmt(memory_base_id, session_id)
212+
return await apaginate(
213+
db,
214+
stmt,
215+
params=params,
216+
transformer=lambda rows: [
217+
MessageReadResponse(
218+
id=row.id,
219+
timestamp=row.created_at,
220+
sender="Machine",
221+
sender_name="Preprocessor",
222+
session_id=row.session_id,
223+
text=row.output_text or "",
224+
content_blocks=[],
225+
job_id=row.job_id,
226+
ingested_at=row.created_at,
227+
)
228+
for row in rows
229+
],
221230
)
222-
.order_by(col(MessageTable.timestamp).asc())
223-
)
231+
232+
stmt = service.session_raw_messages_stmt(memory_base_id, session_id)
224233
return await apaginate(
225234
db,
226-
msg_stmt,
235+
stmt,
227236
params=params,
228237
transformer=lambda rows: [
229238
MessageReadResponse(
@@ -253,7 +262,10 @@ async def update_memory_base(
253262
Threshold changes only take effect at the next auto-capture trigger.
254263
Any already-running ingestion task continues with its original arguments.
255264
"""
256-
mb = await get_memory_base_service().update(memory_base_id, user_id=current_user.id, patch=patch)
265+
try:
266+
mb = await get_memory_base_service().update(memory_base_id, user_id=current_user.id, patch=patch)
267+
except PreprocessingValidationError as exc:
268+
raise HTTPException(status_code=422, detail=str(exc)) from exc
257269
if mb is None:
258270
raise HTTPException(status_code=404, detail="Memory base not found")
259271
return MemoryBaseRead.model_validate(mb)

src/backend/base/langflow/services/database/models/memory_base/model.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import sqlalchemy as sa
55
from pydantic import model_validator
6-
from sqlalchemy import Column, DateTime, ForeignKey, Index, UniqueConstraint
6+
from sqlalchemy import JSON, Column, DateTime, ForeignKey, Index, Text, UniqueConstraint
77
from sqlmodel import Field, Relationship, SQLModel
88

99

@@ -13,11 +13,11 @@ class MemoryBaseBase(SQLModel):
1313
user_id: UUID = Field(index=True)
1414
threshold: int = Field(default=50)
1515
auto_capture: bool = Field(default=True)
16-
# Preprocessing config — accepted in payload but logic deferred to future scope
1716
embedding_model: str = Field(default="")
1817
preprocessing: bool = Field(default=False)
1918
preproc_model: str | None = Field(default=None)
2019
preproc_instructions: str | None = Field(default=None)
20+
preproc_kill_phrase: str | None = Field(default=None)
2121

2222

2323
class MemoryBase(MemoryBaseBase, table=True): # type: ignore[call-arg]
@@ -42,10 +42,17 @@ class MemoryBaseCreate(MemoryBaseBase):
4242
user_id: UUID | None = None # Derived from auth token in the endpoint; not required in request body
4343

4444
@model_validator(mode="after")
45-
def preproc_model_required_when_preprocessing(self) -> "MemoryBaseCreate":
45+
def preprocessing_defaults(self) -> "MemoryBaseCreate":
4646
if self.preprocessing and not self.preproc_model:
4747
msg = "preproc_model is required when preprocessing is enabled"
4848
raise ValueError(msg)
49+
# Default the kill phrase so callers that enable preprocessing without
50+
# supplying one still get the deterministic gate. Imported lazily so the
51+
# model module stays free of service-layer deps.
52+
if self.preprocessing and not self.preproc_kill_phrase:
53+
from langflow.services.memory_base.preprocessing import DEFAULT_KILL_PHRASE
54+
55+
self.preproc_kill_phrase = DEFAULT_KILL_PHRASE
4956
return self
5057

5158

@@ -56,6 +63,7 @@ class MemoryBaseUpdate(SQLModel):
5663
preprocessing: bool | None = None
5764
preproc_model: str | None = None
5865
preproc_instructions: str | None = None
66+
preproc_kill_phrase: str | None = None
5967

6068

6169
class MemoryBaseRead(MemoryBaseBase):
@@ -197,3 +205,66 @@ class MessageIngestionRecord(SQLModel, table=True): # type: ignore[call-arg]
197205
# Denormalized from MessageTable.session_id — immutable, avoids JOIN on the hot query path
198206
session_id: str = Field(sa_column=Column(sa.String(), nullable=False))
199207
ingested_at: datetime = Field(sa_column=Column(DateTime(timezone=True), nullable=False))
208+
209+
210+
class MemoryBasePreprocessingOutput(SQLModel, table=True): # type: ignore[call-arg]
211+
"""One row per preprocessing batch — captures the LLM-distilled output before KB write.
212+
213+
Status flow:
214+
- ``processed`` — LLM produced output; Chroma write pending. Cursor NOT advanced.
215+
The next ingestion job for this session reuses this row and
216+
retries only the Chroma write (no LLM re-invocation).
217+
- ``ingested`` — Chroma write confirmed; cursor advanced; visible in get-messages view.
218+
- ``skipped`` — LLM emitted the kill phrase; no Chroma write, no output_text,
219+
but cursor advances so the same batch is not re-evaluated.
220+
"""
221+
222+
__tablename__ = "memory_base_preprocessing_output"
223+
__table_args__ = (
224+
Index(
225+
"ix_mbpo_pending",
226+
"memory_base_id",
227+
"session_id",
228+
"status",
229+
"created_at",
230+
),
231+
Index(
232+
"ix_mbpo_listing",
233+
"memory_base_id",
234+
"session_id",
235+
"created_at",
236+
),
237+
Index("ix_mbpo_job_id", "job_id"),
238+
)
239+
240+
id: UUID = Field(default_factory=uuid4, primary_key=True)
241+
memory_base_id: UUID = Field(
242+
sa_column=Column(
243+
sa.Uuid(),
244+
ForeignKey("memory_base.id", ondelete="CASCADE"),
245+
nullable=False,
246+
)
247+
)
248+
# Denormalized — immutable for the row's lifetime
249+
session_id: str = Field(sa_column=Column(sa.String(), nullable=False))
250+
job_id: UUID | None = Field(
251+
default=None,
252+
sa_column=Column(
253+
sa.Uuid(),
254+
ForeignKey("job.job_id", ondelete="SET NULL"),
255+
nullable=True,
256+
),
257+
)
258+
status: str = Field(sa_column=Column(sa.String(), nullable=False))
259+
output_text: str | None = Field(default=None, sa_column=Column(Text(), nullable=True))
260+
# Canonical batch identity — JSON list of message UUIDs as strings.
261+
source_message_ids: list = Field(default_factory=list, sa_column=Column(JSON(), nullable=False))
262+
model_used: str = Field(sa_column=Column(sa.String(), nullable=False))
263+
created_at: datetime = Field(
264+
default_factory=lambda: datetime.now(timezone.utc),
265+
sa_column=Column(DateTime(timezone=True), nullable=False),
266+
)
267+
updated_at: datetime = Field(
268+
default_factory=lambda: datetime.now(timezone.utc),
269+
sa_column=Column(DateTime(timezone=True), nullable=False),
270+
)

0 commit comments

Comments
 (0)