Skip to content

Commit d42741a

Browse files
committed
fix: harden workflow archive DB retries
1 parent f3df2b8 commit d42741a

5 files changed

Lines changed: 533 additions & 54 deletions

File tree

api/commands/retention.py

Lines changed: 125 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
import datetime
22
import logging
33
import time
4+
from collections.abc import Callable
45
from typing import TypedDict
56

67
import click
78
import sqlalchemy as sa
9+
from sqlalchemy.orm import Session, sessionmaker
810

911
from extensions.ext_database import db
1012
from libs.datetime_utils import naive_utc_now
1113
from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs
1214
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
1315
from services.retention.conversation.messages_clean_service import MessagesCleanService
1416
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
17+
from services.retention.workflow_run.db_retry import run_with_db_retry
1518
from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition
1619
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
1720

@@ -35,6 +38,12 @@ class WorkflowRunArchiveTenantPlan(TypedDict):
3538
unpaid_tenant_ids: list[str]
3639

3740

41+
class WorkflowRunArchivePrefixStats(TypedDict):
42+
tenant_ids: list[str]
43+
workflow_runs: int
44+
workflow_node_executions: int
45+
46+
3847
def _normalize_utc_datetime(value: datetime.datetime) -> datetime.datetime:
3948
if value.tzinfo is None:
4049
return value.replace(tzinfo=datetime.UTC)
@@ -57,6 +66,7 @@ def _parse_tenant_prefixes(prefixes: str | None) -> list[str]:
5766

5867

5968
def _get_archive_candidate_tenant_ids_by_prefix(
69+
session: Session,
6070
prefix: str,
6171
*,
6272
start_from: datetime.datetime | None,
@@ -75,7 +85,7 @@ def _get_archive_candidate_tenant_ids_by_prefix(
7585
if start_from is not None:
7686
conditions.append(WorkflowRun.created_at >= start_from)
7787

78-
tenant_ids = db.session.scalars(
88+
tenant_ids = session.scalars(
7989
sa.select(WorkflowRun.tenant_id).where(*conditions).distinct().order_by(WorkflowRun.tenant_id)
8090
).all()
8191
return list(tenant_ids)
@@ -102,8 +112,80 @@ def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[lis
102112
return paid_tenant_ids, unpaid_tenant_ids
103113

104114

115+
def _run_archive_command_db_retry[T](operation_name: str, operation: Callable[[], T]) -> T:
116+
return run_with_db_retry(operation_name, operation, logger=logger)
117+
118+
119+
def _get_archive_candidate_tenant_ids_with_retry(
120+
session_maker: sessionmaker[Session],
121+
prefix: str,
122+
*,
123+
start_from: datetime.datetime | None,
124+
end_before: datetime.datetime,
125+
) -> list[str]:
126+
def fetch_tenant_ids() -> list[str]:
127+
with session_maker() as session:
128+
return _get_archive_candidate_tenant_ids_by_prefix(
129+
session,
130+
prefix,
131+
start_from=start_from,
132+
end_before=end_before,
133+
)
134+
135+
return _run_archive_command_db_retry(f"workflow archive tenant resolve for prefix {prefix}", fetch_tenant_ids)
136+
137+
138+
def _get_archive_plan_prefix_stats(
139+
session_maker: sessionmaker[Session],
140+
prefix: str,
141+
*,
142+
start_from: datetime.datetime | None,
143+
end_before: datetime.datetime,
144+
) -> WorkflowRunArchivePrefixStats:
145+
from graphon.enums import WorkflowExecutionStatus
146+
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
147+
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
148+
149+
def fetch_prefix_stats() -> WorkflowRunArchivePrefixStats:
150+
with session_maker() as session:
151+
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
152+
session,
153+
prefix,
154+
start_from=start_from,
155+
end_before=end_before,
156+
)
157+
run_conditions = [
158+
WorkflowRun.created_at < end_before,
159+
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
160+
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
161+
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
162+
]
163+
if start_from is not None:
164+
run_conditions.append(WorkflowRun.created_at >= start_from)
165+
workflow_runs = (
166+
session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
167+
)
168+
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
169+
workflow_node_executions = (
170+
session.scalar(
171+
sa.select(sa.func.count())
172+
.select_from(WorkflowNodeExecutionModel)
173+
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
174+
)
175+
or 0
176+
)
177+
return WorkflowRunArchivePrefixStats(
178+
tenant_ids=tenant_ids,
179+
workflow_runs=workflow_runs,
180+
workflow_node_executions=workflow_node_executions,
181+
)
182+
183+
return _run_archive_command_db_retry(f"workflow archive plan for prefix {prefix}", fetch_prefix_stats)
184+
185+
105186
def _resolve_archive_tenant_ids_from_plan(
106187
*,
188+
session_maker: sessionmaker[Session],
107189
tenant_ids: str | None,
108190
tenant_prefixes: list[str],
109191
start_from: datetime.datetime | None,
@@ -122,7 +204,8 @@ def _resolve_archive_tenant_ids_from_plan(
122204
requested_tenant_ids = []
123205
for prefix in tenant_prefixes:
124206
requested_tenant_ids.extend(
125-
_get_archive_candidate_tenant_ids_by_prefix(
207+
_get_archive_candidate_tenant_ids_with_retry(
208+
session_maker,
126209
prefix,
127210
start_from=start_from,
128211
end_before=end_before,
@@ -143,6 +226,24 @@ def _resolve_archive_tenant_ids_from_plan(
143226
)
144227

145228

229+
def _safe_remove_scoped_session(context: str) -> None:
230+
try:
231+
db.session.remove()
232+
except Exception:
233+
logger.warning("Ignoring DB scoped-session cleanup error after %s", context, exc_info=True)
234+
registry = getattr(db.session, "registry", None)
235+
clear_registry = getattr(registry, "clear", None)
236+
if callable(clear_registry):
237+
try:
238+
clear_registry()
239+
except Exception:
240+
logger.warning("Ignoring DB scoped-session registry cleanup error after %s", context, exc_info=True)
241+
try:
242+
db.engine.dispose()
243+
except Exception:
244+
logger.warning("Ignoring DB engine dispose error after %s", context, exc_info=True)
245+
246+
146247
def _resolve_archive_time_range(
147248
*,
148249
before_days: int,
@@ -349,10 +450,6 @@ def archive_workflow_runs_plan(
349450
supported workflow types, and the requested created_at window. V2 bundle archive
350451
does not maintain per-run archive logs, so this plan reports source-table volume.
351452
"""
352-
from graphon.enums import WorkflowExecutionStatus
353-
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
354-
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
355-
356453
before_days, start_from, end_before = _resolve_archive_time_range(
357454
before_days=before_days,
358455
from_days_ago=from_days_ago,
@@ -364,37 +461,25 @@ def archive_workflow_runs_plan(
364461
if include_archived:
365462
click.echo(click.style("--include-archived is a no-op for V2 bundle archive plans.", fg="yellow"))
366463

464+
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
367465
rows: list[WorkflowRunArchivePlanRow] = []
368466
for prefix in _HEX_PREFIXES:
369-
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
370-
prefix,
371-
start_from=start_from,
372-
end_before=plan_end_before,
373-
)
467+
try:
468+
prefix_stats = _get_archive_plan_prefix_stats(
469+
session_maker,
470+
prefix,
471+
start_from=start_from,
472+
end_before=plan_end_before,
473+
)
474+
except Exception as exc:
475+
logger.exception("Failed to build workflow archive plan for prefix %s", prefix)
476+
raise click.ClickException(f"Failed to build workflow archive plan for prefix {prefix}.") from exc
477+
tenant_ids = prefix_stats["tenant_ids"]
478+
workflow_runs = prefix_stats["workflow_runs"]
479+
workflow_node_executions = prefix_stats["workflow_node_executions"]
374480
total_tenants = len(tenant_ids)
375481
paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(tenant_ids)
376482

377-
run_conditions = [
378-
WorkflowRun.created_at < plan_end_before,
379-
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
380-
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
381-
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
382-
]
383-
if start_from is not None:
384-
run_conditions.append(WorkflowRun.created_at >= start_from)
385-
workflow_runs = (
386-
db.session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
387-
)
388-
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
389-
workflow_node_executions = (
390-
db.session.scalar(
391-
sa.select(sa.func.count())
392-
.select_from(WorkflowNodeExecutionModel)
393-
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
394-
)
395-
or 0
396-
)
397-
398483
rows.append(
399484
WorkflowRunArchivePlanRow(
400485
tenant_prefix=prefix,
@@ -574,17 +659,18 @@ def archive_workflow_runs(
574659
)
575660
)
576661

662+
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
577663
try:
578664
tenant_plan = _resolve_archive_tenant_ids_from_plan(
665+
session_maker=session_maker,
579666
tenant_ids=tenant_ids,
580667
tenant_prefixes=parsed_tenant_prefixes,
581668
start_from=start_from,
582669
end_before=plan_end_before,
583670
)
584-
except Exception:
671+
except Exception as exc:
585672
logger.exception("Failed to resolve workflow archive tenant plan")
586-
click.echo(click.style("Failed to resolve workflow archive tenant plan.", fg="red"))
587-
return
673+
raise click.ClickException("Failed to resolve workflow archive tenant plan.") from exc
588674

589675
planned_tenant_ids = tenant_plan["archive_tenant_ids"]
590676
planned_paid_tenant_ids = tenant_plan["paid_tenant_ids"] if planned_tenant_ids is not None else None
@@ -616,7 +702,10 @@ def archive_workflow_runs(
616702
dry_run=dry_run,
617703
delete_after_archive=delete_after_archive,
618704
)
619-
summary = archiver.run()
705+
try:
706+
summary = archiver.run()
707+
finally:
708+
_safe_remove_scoped_session("archive workflow run command")
620709
click.echo(
621710
click.style(
622711
f"Summary: processed={summary.total_runs_processed}, archived={summary.runs_archived}, "

0 commit comments

Comments
 (0)