Skip to content

Commit 8d9c1a3

Browse files
Fix database reset failing on PostgreSQL with orphaned state_id references (#20941)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: alex.s <alex.s@prefect.io> Co-authored-by: alex.s <ajstreed1@gmail.com>
1 parent b5e9c4d commit 8d9c1a3

File tree

4 files changed

+176
-2
lines changed

4 files changed

+176
-2
lines changed

src/prefect/server/database/_migrations/versions/postgresql/2025_06_13_160644_3b86c5ea017a_drop_task_run_state_id_foreign_key_.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
"""
88

9+
import sqlalchemy as sa
910
from alembic import op
1011

1112
# revision identifiers, used by Alembic.
@@ -23,6 +24,26 @@ def upgrade():
2324

2425

2526
def downgrade():
27+
# Null out any task_run.state_id values that reference non-existent
28+
# task_run_state rows. After the FK was dropped (upgrade), the system no
29+
# longer enforces referential integrity, so orphaned references are
30+
# expected. Without this cleanup, re-adding the FK would fail with an
31+
# IntegrityError on PostgreSQL.
32+
connection = op.get_bind()
33+
connection.execute(
34+
sa.text(
35+
"""
36+
UPDATE task_run
37+
SET state_id = NULL
38+
WHERE state_id IS NOT NULL
39+
AND NOT EXISTS (
40+
SELECT 1 FROM task_run_state
41+
WHERE task_run_state.id = task_run.state_id
42+
)
43+
"""
44+
)
45+
)
46+
2647
# Re-add the foreign key constraint
2748
op.create_foreign_key(
2849
"fk_task_run__state_id__task_run_state",

src/prefect/server/database/_migrations/versions/sqlite/2025_06_13_164506_8bb517bae6f9_drop_task_run_state_id_foreign_key_.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
"""
88

9+
import sqlalchemy as sa
910
from alembic import op
1011

1112
# revision identifiers, used by Alembic.
@@ -22,6 +23,26 @@ def upgrade():
2223

2324

2425
def downgrade():
26+
# Null out any task_run.state_id values that reference non-existent
27+
# task_run_state rows. After the FK was dropped (upgrade), the system no
28+
# longer enforces referential integrity, so orphaned references are
29+
# expected. Without this cleanup, re-adding the FK would fail with an
30+
# IntegrityError.
31+
connection = op.get_bind()
32+
connection.execute(
33+
sa.text(
34+
"""
35+
UPDATE task_run
36+
SET state_id = NULL
37+
WHERE state_id IS NOT NULL
38+
AND NOT EXISTS (
39+
SELECT 1 FROM task_run_state
40+
WHERE task_run_state.id = task_run.state_id
41+
)
42+
"""
43+
)
44+
)
45+
2546
# Re-add the foreign key constraint
2647
with op.batch_alter_table("task_run") as batch_op:
2748
batch_op.create_foreign_key(

src/prefect/server/database/interface.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,38 @@ async def create_db(self) -> None:
7777
await self.run_migrations_upgrade()
7878

7979
async def drop_db(self) -> None:
80-
"""Drop the database"""
81-
await self.run_migrations_downgrade(revision="base")
80+
"""Drop the database by removing all tables directly.
81+
82+
This reflects the actual database schema and drops every table rather
83+
than running all Alembic downgrade migrations in reverse. Running
84+
downgrades is fragile because individual migration downgrade steps may
85+
fail on real-world data (e.g. re-adding a foreign key constraint when
86+
orphaned references exist). Dropping tables directly is both faster
87+
and more robust.
88+
89+
Reflection is used instead of ``Base.metadata.drop_all()`` so that
90+
tables created by migrations but not tracked in the ORM (e.g.
91+
``deployment_version``, ``alembic_version``) are also removed.
92+
"""
93+
engine = await self.engine()
94+
async with engine.begin() as conn:
95+
# Disable FK checks for SQLite so that tables can be dropped in
96+
# any order without triggering constraint errors.
97+
dialect = get_dialect(self.database_config.connection_url)
98+
is_sqlite = dialect.name == "sqlite"
99+
if is_sqlite:
100+
await conn.execute(sa.text("PRAGMA foreign_keys = OFF"))
101+
102+
try:
103+
# Reflect the actual database schema so we capture every
104+
# table, including migration-only tables not present in the
105+
# ORM metadata.
106+
metadata = sa.MetaData()
107+
await conn.run_sync(metadata.reflect)
108+
await conn.run_sync(metadata.drop_all)
109+
finally:
110+
if is_sqlite:
111+
await conn.execute(sa.text("PRAGMA foreign_keys = ON"))
82112

83113
async def run_migrations_upgrade(self) -> None:
84114
"""Run all upgrade migrations"""

tests/server/database/test_migrations.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,3 +936,105 @@ async def test_migrate_flow_run_notifications_to_automations(db: PrefectDBInterf
936936
]
937937
finally:
938938
await run_sync_in_worker_thread(alembic_upgrade)
939+
940+
941+
async def test_downgrade_with_orphaned_task_run_state_ids(db, flow):
942+
"""
943+
Tests that migration 3b86c5ea017a can be downgraded even when
944+
task_run.state_id references a non-existent task_run_state row.
945+
946+
This is the scenario reported in GitHub issue #20939: after the FK
947+
constraint was dropped (upgrade), the system no longer enforces
948+
referential integrity. Orphaned state_id values are expected in
949+
production. The downgrade must clean them up before re-adding the FK.
950+
"""
951+
connection_url = PREFECT_API_DATABASE_CONNECTION_URL.value()
952+
dialect = get_dialect(connection_url)
953+
954+
if dialect.name == "postgresql":
955+
# revision just after the FK-drop migration
956+
fk_drop_revision = "3b86c5ea017a"
957+
pre_fk_drop_revision = "aa1234567890"
958+
else:
959+
fk_drop_revision = "8bb517bae6f9"
960+
pre_fk_drop_revision = "bb2345678901"
961+
962+
flow_run_id = uuid4()
963+
task_run_id = uuid4()
964+
orphaned_state_id = uuid4() # Does NOT exist in task_run_state
965+
966+
try:
967+
# Downgrade to the FK-drop migration (FK is removed at this point)
968+
await run_sync_in_worker_thread(alembic_downgrade, revision=fk_drop_revision)
969+
970+
# Insert a task_run with an orphaned state_id (no corresponding
971+
# task_run_state row). This simulates real-world data after the FK
972+
# was dropped.
973+
session = await db.session()
974+
async with session:
975+
await session.execute(
976+
sa.text(
977+
"INSERT INTO flow_run (id, name, flow_id)"
978+
f" VALUES ('{flow_run_id}', 'test-flow-run', '{flow.id}');"
979+
)
980+
)
981+
await session.execute(
982+
sa.text(
983+
"INSERT INTO task_run (id, name, task_key, dynamic_key,"
984+
f" flow_run_id, state_id) VALUES ('{task_run_id}',"
985+
f" 'test-task', 'test-task', '0', '{flow_run_id}',"
986+
f" '{orphaned_state_id}');"
987+
)
988+
)
989+
await session.commit()
990+
991+
# Downgrade past the FK-drop migration — this re-adds the FK.
992+
# Before the fix, this would fail with IntegrityError.
993+
await run_sync_in_worker_thread(
994+
alembic_downgrade, revision=pre_fk_drop_revision
995+
)
996+
997+
# Verify the orphaned state_id was cleaned up (nulled)
998+
session = await db.session()
999+
async with session:
1000+
result = (
1001+
await session.execute(
1002+
sa.text(f"SELECT state_id FROM task_run WHERE id = '{task_run_id}'")
1003+
)
1004+
).scalar()
1005+
assert result is None, "Orphaned state_id should be nulled during downgrade"
1006+
finally:
1007+
await run_sync_in_worker_thread(alembic_upgrade)
1008+
1009+
1010+
async def test_drop_db_removes_all_tables(db):
1011+
"""
1012+
Tests that drop_db() successfully removes all tables even when the
1013+
database contains data that would cause downgrade migrations to fail.
1014+
1015+
drop_db() uses metadata.drop_all() instead of running the full Alembic
1016+
downgrade chain, making it immune to data-dependent migration failures.
1017+
"""
1018+
engine = await db.engine()
1019+
1020+
# Verify tables exist before drop
1021+
async with engine.connect() as conn:
1022+
table_names = await conn.run_sync(
1023+
lambda sync_conn: sa.inspect(sync_conn).get_table_names()
1024+
)
1025+
assert len(table_names) > 0, "Database should have tables before drop"
1026+
1027+
# Drop the database
1028+
await db.drop_db()
1029+
1030+
# Verify all ORM tables and alembic_version are gone
1031+
async with engine.connect() as conn:
1032+
table_names = await conn.run_sync(
1033+
lambda sync_conn: sa.inspect(sync_conn).get_table_names()
1034+
)
1035+
assert len(table_names) == 0, (
1036+
f"All tables should be dropped, but found: {table_names}"
1037+
)
1038+
1039+
# Recreate the database for other tests
1040+
await db.create_db()

0 commit comments

Comments
 (0)