Skip to content

Commit 79e4e62

Browse files
authored
Merge branch 'release-1.10.0' into feat/extension-production-install
2 parents 5902ee2 + 90455af commit 79e4e62

2 files changed

Lines changed: 151 additions & 0 deletions

File tree

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Ensure message ingestion records keep their message FK
2+
3+
Phase: EXPAND
4+
5+
Revision ID: b4c2f8e9a1d3
6+
Revises: mb00a1b2c3d4
7+
Create Date: 2026-05-09 00:00:00.000000
8+
"""
9+
10+
from collections.abc import Sequence
11+
12+
import sqlalchemy as sa
13+
from alembic import op
14+
from langflow.utils import migration
15+
16+
# revision identifiers, used by Alembic.
17+
revision: str = "b4c2f8e9a1d3" # pragma: allowlist secret
18+
down_revision: str | None = "mb00a1b2c3d4" # pragma: allowlist secret
19+
branch_labels: str | Sequence[str] | None = None
20+
depends_on: str | Sequence[str] | None = None
21+
22+
MIR_TABLE = "message_ingestion_record"
23+
MESSAGE_TABLE = "message"
24+
MESSAGE_FK_NAME = "fk_message_ingestion_record_message_id_message"
25+
26+
27+
def _message_fk_exists(conn) -> bool:
28+
inspector = sa.inspect(conn)
29+
for fk in inspector.get_foreign_keys(MIR_TABLE):
30+
options = fk.get("options") or {}
31+
ondelete = (options.get("ondelete") or "").upper()
32+
if (
33+
fk.get("constrained_columns") == ["message_id"]
34+
and fk.get("referred_table") == MESSAGE_TABLE
35+
and fk.get("referred_columns") == ["id"]
36+
and ondelete == "CASCADE"
37+
):
38+
return True
39+
return False
40+
41+
42+
def _constraint_exists(conn, constraint_name: str) -> bool:
43+
inspector = sa.inspect(conn)
44+
return any(fk.get("name") == constraint_name for fk in inspector.get_foreign_keys(MIR_TABLE))
45+
46+
47+
def upgrade() -> None:
48+
conn = op.get_bind()
49+
50+
# This repairs a PostgreSQL startup race where an older idempotent migration can
51+
# replay DROP TABLE "message" CASCADE after the memory-base table already exists.
52+
# That drops only the inbound message_id FK, while the mb00 migration then skips
53+
# recreating the existing message_ingestion_record table.
54+
if conn.dialect.name != "postgresql":
55+
return
56+
57+
if not migration.table_exists(MIR_TABLE, conn) or not migration.table_exists(MESSAGE_TABLE, conn):
58+
return
59+
60+
if _message_fk_exists(conn):
61+
return
62+
63+
op.create_foreign_key(
64+
MESSAGE_FK_NAME,
65+
MIR_TABLE,
66+
MESSAGE_TABLE,
67+
["message_id"],
68+
["id"],
69+
ondelete="CASCADE",
70+
postgresql_not_valid=True,
71+
)
72+
73+
74+
def downgrade() -> None:
75+
conn = op.get_bind()
76+
77+
if conn.dialect.name != "postgresql" or not migration.table_exists(MIR_TABLE, conn):
78+
return
79+
80+
if _constraint_exists(conn, MESSAGE_FK_NAME):
81+
op.drop_constraint(MESSAGE_FK_NAME, MIR_TABLE, type_="foreignkey")

src/backend/tests/unit/alembic/test_migration_execution.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,76 @@ def test_no_phantom_migrations(db_url):
379379
)
380380

381381

382+
def test_message_ingestion_message_fk_repaired(db_url):
383+
"""Repair the Postgres-only state left by concurrent startup migrations."""
384+
if not db_url.startswith("postgresql"):
385+
pytest.skip("PostgreSQL-only migration repair")
386+
387+
alembic_cfg = _make_alembic_cfg(db_url)
388+
command.upgrade(alembic_cfg, "mb00a1b2c3d4")
389+
390+
engine = create_engine(_engine_url(db_url))
391+
try:
392+
with engine.begin() as connection:
393+
constraint_name = connection.execute(
394+
text(
395+
"""
396+
SELECT conname
397+
FROM pg_constraint
398+
WHERE contype = 'f'
399+
AND conrelid = 'message_ingestion_record'::regclass
400+
AND confrelid = 'message'::regclass
401+
AND conkey = ARRAY[
402+
(
403+
SELECT attnum
404+
FROM pg_attribute
405+
WHERE attrelid = 'message_ingestion_record'::regclass
406+
AND attname = 'message_id'
407+
)
408+
]::smallint[]
409+
"""
410+
)
411+
).scalar_one()
412+
connection.execute(text(f'ALTER TABLE message_ingestion_record DROP CONSTRAINT "{constraint_name}"'))
413+
finally:
414+
engine.dispose()
415+
416+
command.upgrade(alembic_cfg, "head")
417+
418+
engine = create_engine(_engine_url(db_url))
419+
try:
420+
with engine.connect() as connection:
421+
restored_fk = connection.execute(
422+
text(
423+
"""
424+
SELECT conname, confdeltype
425+
FROM pg_constraint
426+
WHERE contype = 'f'
427+
AND conrelid = 'message_ingestion_record'::regclass
428+
AND confrelid = 'message'::regclass
429+
AND conkey = ARRAY[
430+
(
431+
SELECT attnum
432+
FROM pg_attribute
433+
WHERE attrelid = 'message_ingestion_record'::regclass
434+
AND attname = 'message_id'
435+
)
436+
]::smallint[]
437+
"""
438+
)
439+
).one_or_none()
440+
assert restored_fk is not None
441+
assert restored_fk.confdeltype == "c"
442+
443+
migration_context = MigrationContext.configure(connection)
444+
diffs = compare_metadata(migration_context, SQLModel.metadata)
445+
finally:
446+
engine.dispose()
447+
448+
significant_diffs = _filter_diffs(diffs, db_url)
449+
assert significant_diffs == []
450+
451+
382452
def test_upgrade_from_main_branch(db_url):
383453
"""Verify that a DB at main's head can upgrade to current head and downgrade back.
384454

0 commit comments

Comments
 (0)