22
33import os .path
44import subprocess
5- from typing import Dict , List , Tuple
5+ from typing import List
66
77from six .moves .urllib .parse import urlparse
88
9- from .globus import globus_transfer
9+ from .globus import cleanup_completed_transfers , globus_transfer
1010from .settings import get_db_filename , logger
1111from .utils import run_command , ts_utc
1212
1313prev_transfers : List [str ] = list ()
1414curr_transfers : List [str ] = list ()
15- # Global tracking for non-blocking transfers
16- pending_cleanup : Dict [str , Tuple [str , str ]] = {} # task_id -> (file_path, timestamp)
17-
18-
19- def cleanup_completed_transfers ():
20- """
21- Check status of previously submitted non-blocking transfers and delete
22- files for any that have completed successfully.
23- """
24- from .globus import transfer_client
25-
26- global pending_cleanup
27-
28- if not pending_cleanup :
29- return
30-
31- logger .debug (
32- f"{ ts_utc ()} : Checking { len (pending_cleanup )} pending transfers for cleanup"
33- )
34-
35- completed_tasks = []
36-
37- for task_id , (file_path , submit_time ) in pending_cleanup .items ():
38- try :
39- # Check transfer status
40- task = transfer_client .get_task (task_id )
41- task_status = task ["status" ]
42-
43- if task_status == "SUCCEEDED" :
44- # Transfer succeeded - delete the file
45- if os .path .exists (file_path ):
46- logger .info (
47- f"{ ts_utc ()} : Deleting successfully transferred file { file_path } "
48- )
49- os .remove (file_path )
50- else :
51- logger .warning (f"{ ts_utc ()} : File { file_path } already missing" )
52- completed_tasks .append (task_id )
53-
54- elif task_status in ["FAILED" , "CANCELED" ]:
55- # Transfer failed - keep the file but stop tracking
56- logger .warning (
57- f"{ ts_utc ()} : Transfer { task_id } failed/canceled, keeping file { file_path } "
58- )
59- completed_tasks .append (task_id )
60-
61- elif task_status in ["ACTIVE" , "PENDING" ]:
62- # Still in progress - keep tracking
63- logger .debug (f"{ ts_utc ()} : Transfer { task_id } still { task_status } " )
64-
65- else :
66- # Unknown status - log and remove from tracking
67- logger .warning (
68- f"{ ts_utc ()} : Unknown transfer status { task_status } for { task_id } "
69- )
70- completed_tasks .append (task_id )
71-
72- except Exception as e :
73- # Error checking status - remove from tracking to avoid infinite retries
74- logger .error (f"Error checking transfer status for { task_id } : { e } " )
75- completed_tasks .append (task_id )
76-
77- # Remove completed/failed transfers from tracking
78- for task_id in completed_tasks :
79- del pending_cleanup [task_id ]
80-
81- if completed_tasks :
82- logger .debug (
83- f"{ ts_utc ()} : Cleaned up { len (completed_tasks )} completed transfers"
84- )
8515
8616
8717# C901 'hpss_transfer' is too complex (19)
@@ -96,7 +26,6 @@ def hpss_transfer( # noqa: C901
9626):
9727 global prev_transfers
9828 global curr_transfers
99- global pending_cleanup
10029
10130 # NEW: Clean up any completed transfers from previous calls
10231 if non_blocking and transfer_type == "put" :
@@ -198,7 +127,7 @@ def hpss_transfer( # noqa: C901
198127 # NEW: Track non-blocking transfers for later cleanup
199128 if non_blocking and transfer_type == "put" and not keep :
200129 # Get the current task_id from globus module
201- from .globus import task_id
130+ from .globus import pending_cleanup , task_id
202131
203132 if task_id :
204133 pending_cleanup [task_id ] = (file_path , ts_utc ())
0 commit comments