11import datetime
22import logging
33import time
4+ from collections .abc import Callable
45from typing import TypedDict
56
67import click
78import sqlalchemy as sa
9+ from sqlalchemy .orm import Session , sessionmaker
810
911from extensions .ext_database import db
1012from libs .datetime_utils import naive_utc_now
1113from services .clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs
1214from services .retention .conversation .messages_clean_policy import create_message_clean_policy
1315from services .retention .conversation .messages_clean_service import MessagesCleanService
1416from services .retention .workflow_run .clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
17+ from services .retention .workflow_run .db_retry import run_with_db_retry
1518from services .retention .workflow_run .tenant_prefix import tenant_prefix_condition
1619from tasks .remove_app_and_related_data_task import delete_draft_variables_batch
1720
@@ -35,6 +38,12 @@ class WorkflowRunArchiveTenantPlan(TypedDict):
3538 unpaid_tenant_ids : list [str ]
3639
3740
41+ class WorkflowRunArchivePrefixStats (TypedDict ):
42+ tenant_ids : list [str ]
43+ workflow_runs : int
44+ workflow_node_executions : int
45+
46+
3847def _normalize_utc_datetime (value : datetime .datetime ) -> datetime .datetime :
3948 if value .tzinfo is None :
4049 return value .replace (tzinfo = datetime .UTC )
@@ -57,6 +66,7 @@ def _parse_tenant_prefixes(prefixes: str | None) -> list[str]:
5766
5867
5968def _get_archive_candidate_tenant_ids_by_prefix (
69+ session : Session ,
6070 prefix : str ,
6171 * ,
6272 start_from : datetime .datetime | None ,
@@ -75,7 +85,7 @@ def _get_archive_candidate_tenant_ids_by_prefix(
7585 if start_from is not None :
7686 conditions .append (WorkflowRun .created_at >= start_from )
7787
78- tenant_ids = db . session .scalars (
88+ tenant_ids = session .scalars (
7989 sa .select (WorkflowRun .tenant_id ).where (* conditions ).distinct ().order_by (WorkflowRun .tenant_id )
8090 ).all ()
8191 return list (tenant_ids )
@@ -102,8 +112,80 @@ def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[lis
102112 return paid_tenant_ids , unpaid_tenant_ids
103113
104114
115+ def _run_archive_command_db_retry [T ](operation_name : str , operation : Callable [[], T ]) -> T :
116+ return run_with_db_retry (operation_name , operation , logger = logger )
117+
118+
119+ def _get_archive_candidate_tenant_ids_with_retry (
120+ session_maker : sessionmaker [Session ],
121+ prefix : str ,
122+ * ,
123+ start_from : datetime .datetime | None ,
124+ end_before : datetime .datetime ,
125+ ) -> list [str ]:
126+ def fetch_tenant_ids () -> list [str ]:
127+ with session_maker () as session :
128+ return _get_archive_candidate_tenant_ids_by_prefix (
129+ session ,
130+ prefix ,
131+ start_from = start_from ,
132+ end_before = end_before ,
133+ )
134+
135+ return _run_archive_command_db_retry (f"workflow archive tenant resolve for prefix { prefix } " , fetch_tenant_ids )
136+
137+
138+ def _get_archive_plan_prefix_stats (
139+ session_maker : sessionmaker [Session ],
140+ prefix : str ,
141+ * ,
142+ start_from : datetime .datetime | None ,
143+ end_before : datetime .datetime ,
144+ ) -> WorkflowRunArchivePrefixStats :
145+ from graphon .enums import WorkflowExecutionStatus
146+ from models .workflow import WorkflowNodeExecutionModel , WorkflowRun
147+ from services .retention .workflow_run .archive_paid_plan_workflow_run import WorkflowRunArchiver
148+
149+ def fetch_prefix_stats () -> WorkflowRunArchivePrefixStats :
150+ with session_maker () as session :
151+ tenant_ids = _get_archive_candidate_tenant_ids_by_prefix (
152+ session ,
153+ prefix ,
154+ start_from = start_from ,
155+ end_before = end_before ,
156+ )
157+ run_conditions = [
158+ WorkflowRun .created_at < end_before ,
159+ WorkflowRun .status .in_ (WorkflowExecutionStatus .ended_values ()),
160+ WorkflowRun .type .in_ (WorkflowRunArchiver .ARCHIVED_TYPE ),
161+ tenant_prefix_condition (WorkflowRun .tenant_id , prefix ),
162+ ]
163+ if start_from is not None :
164+ run_conditions .append (WorkflowRun .created_at >= start_from )
165+ workflow_runs = (
166+ session .scalar (sa .select (sa .func .count ()).select_from (WorkflowRun ).where (* run_conditions )) or 0
167+ )
168+ candidate_runs = sa .select (WorkflowRun .id ).where (* run_conditions ).subquery ()
169+ workflow_node_executions = (
170+ session .scalar (
171+ sa .select (sa .func .count ())
172+ .select_from (WorkflowNodeExecutionModel )
173+ .join (candidate_runs , WorkflowNodeExecutionModel .workflow_run_id == candidate_runs .c .id )
174+ )
175+ or 0
176+ )
177+ return WorkflowRunArchivePrefixStats (
178+ tenant_ids = tenant_ids ,
179+ workflow_runs = workflow_runs ,
180+ workflow_node_executions = workflow_node_executions ,
181+ )
182+
183+ return _run_archive_command_db_retry (f"workflow archive plan for prefix { prefix } " , fetch_prefix_stats )
184+
185+
105186def _resolve_archive_tenant_ids_from_plan (
106187 * ,
188+ session_maker : sessionmaker [Session ],
107189 tenant_ids : str | None ,
108190 tenant_prefixes : list [str ],
109191 start_from : datetime .datetime | None ,
@@ -122,7 +204,8 @@ def _resolve_archive_tenant_ids_from_plan(
122204 requested_tenant_ids = []
123205 for prefix in tenant_prefixes :
124206 requested_tenant_ids .extend (
125- _get_archive_candidate_tenant_ids_by_prefix (
207+ _get_archive_candidate_tenant_ids_with_retry (
208+ session_maker ,
126209 prefix ,
127210 start_from = start_from ,
128211 end_before = end_before ,
@@ -143,6 +226,24 @@ def _resolve_archive_tenant_ids_from_plan(
143226 )
144227
145228
229+ def _safe_remove_scoped_session (context : str ) -> None :
230+ try :
231+ db .session .remove ()
232+ except Exception :
233+ logger .warning ("Ignoring DB scoped-session cleanup error after %s" , context , exc_info = True )
234+ registry = getattr (db .session , "registry" , None )
235+ clear_registry = getattr (registry , "clear" , None )
236+ if callable (clear_registry ):
237+ try :
238+ clear_registry ()
239+ except Exception :
240+ logger .warning ("Ignoring DB scoped-session registry cleanup error after %s" , context , exc_info = True )
241+ try :
242+ db .engine .dispose ()
243+ except Exception :
244+ logger .warning ("Ignoring DB engine dispose error after %s" , context , exc_info = True )
245+
246+
146247def _resolve_archive_time_range (
147248 * ,
148249 before_days : int ,
@@ -349,10 +450,6 @@ def archive_workflow_runs_plan(
349450 supported workflow types, and the requested created_at window. V2 bundle archive
350451 does not maintain per-run archive logs, so this plan reports source-table volume.
351452 """
352- from graphon .enums import WorkflowExecutionStatus
353- from models .workflow import WorkflowNodeExecutionModel , WorkflowRun
354- from services .retention .workflow_run .archive_paid_plan_workflow_run import WorkflowRunArchiver
355-
356453 before_days , start_from , end_before = _resolve_archive_time_range (
357454 before_days = before_days ,
358455 from_days_ago = from_days_ago ,
@@ -364,37 +461,25 @@ def archive_workflow_runs_plan(
364461 if include_archived :
365462 click .echo (click .style ("--include-archived is a no-op for V2 bundle archive plans." , fg = "yellow" ))
366463
464+ session_maker = sessionmaker (bind = db .engine , expire_on_commit = False )
367465 rows : list [WorkflowRunArchivePlanRow ] = []
368466 for prefix in _HEX_PREFIXES :
369- tenant_ids = _get_archive_candidate_tenant_ids_by_prefix (
370- prefix ,
371- start_from = start_from ,
372- end_before = plan_end_before ,
373- )
467+ try :
468+ prefix_stats = _get_archive_plan_prefix_stats (
469+ session_maker ,
470+ prefix ,
471+ start_from = start_from ,
472+ end_before = plan_end_before ,
473+ )
474+ except Exception as exc :
475+ logger .exception ("Failed to build workflow archive plan for prefix %s" , prefix )
476+ raise click .ClickException (f"Failed to build workflow archive plan for prefix { prefix } ." ) from exc
477+ tenant_ids = prefix_stats ["tenant_ids" ]
478+ workflow_runs = prefix_stats ["workflow_runs" ]
479+ workflow_node_executions = prefix_stats ["workflow_node_executions" ]
374480 total_tenants = len (tenant_ids )
375481 paid_tenant_ids , unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids (tenant_ids )
376482
377- run_conditions = [
378- WorkflowRun .created_at < plan_end_before ,
379- WorkflowRun .status .in_ (WorkflowExecutionStatus .ended_values ()),
380- WorkflowRun .type .in_ (WorkflowRunArchiver .ARCHIVED_TYPE ),
381- tenant_prefix_condition (WorkflowRun .tenant_id , prefix ),
382- ]
383- if start_from is not None :
384- run_conditions .append (WorkflowRun .created_at >= start_from )
385- workflow_runs = (
386- db .session .scalar (sa .select (sa .func .count ()).select_from (WorkflowRun ).where (* run_conditions )) or 0
387- )
388- candidate_runs = sa .select (WorkflowRun .id ).where (* run_conditions ).subquery ()
389- workflow_node_executions = (
390- db .session .scalar (
391- sa .select (sa .func .count ())
392- .select_from (WorkflowNodeExecutionModel )
393- .join (candidate_runs , WorkflowNodeExecutionModel .workflow_run_id == candidate_runs .c .id )
394- )
395- or 0
396- )
397-
398483 rows .append (
399484 WorkflowRunArchivePlanRow (
400485 tenant_prefix = prefix ,
@@ -574,17 +659,18 @@ def archive_workflow_runs(
574659 )
575660 )
576661
662+ session_maker = sessionmaker (bind = db .engine , expire_on_commit = False )
577663 try :
578664 tenant_plan = _resolve_archive_tenant_ids_from_plan (
665+ session_maker = session_maker ,
579666 tenant_ids = tenant_ids ,
580667 tenant_prefixes = parsed_tenant_prefixes ,
581668 start_from = start_from ,
582669 end_before = plan_end_before ,
583670 )
584- except Exception :
671+ except Exception as exc :
585672 logger .exception ("Failed to resolve workflow archive tenant plan" )
586- click .echo (click .style ("Failed to resolve workflow archive tenant plan." , fg = "red" ))
587- return
673+ raise click .ClickException ("Failed to resolve workflow archive tenant plan." ) from exc
588674
589675 planned_tenant_ids = tenant_plan ["archive_tenant_ids" ]
590676 planned_paid_tenant_ids = tenant_plan ["paid_tenant_ids" ] if planned_tenant_ids is not None else None
@@ -616,7 +702,10 @@ def archive_workflow_runs(
616702 dry_run = dry_run ,
617703 delete_after_archive = delete_after_archive ,
618704 )
619- summary = archiver .run ()
705+ try :
706+ summary = archiver .run ()
707+ finally :
708+ _safe_remove_scoped_session ("archive workflow run command" )
620709 click .echo (
621710 click .style (
622711 f"Summary: processed={ summary .total_runs_processed } , archived={ summary .runs_archived } , "
0 commit comments