22
33import os .path
44import subprocess
5- from typing import List
5+ from typing import Dict , List , Tuple
66
77from six .moves .urllib .parse import urlparse
88
1212
1313prev_transfers : List [str ] = list ()
1414curr_transfers : List [str ] = list ()
15+ # Global tracking for non-blocking transfers
16+ pending_cleanup : Dict [str , Tuple [str , str ]] = {} # task_id -> (file_path, timestamp)
1517
1618
17- def hpss_transfer (
19+ def cleanup_completed_transfers ():
20+ """
21+ Check status of previously submitted non-blocking transfers and delete
22+ files for any that have completed successfully.
23+ """
24+ from .globus import transfer_client
25+
26+ global pending_cleanup
27+
28+ if not pending_cleanup :
29+ return
30+
31+ logger .debug (
32+ f"{ ts_utc ()} : Checking { len (pending_cleanup )} pending transfers for cleanup"
33+ )
34+
35+ completed_tasks = []
36+
37+ for task_id , (file_path , submit_time ) in pending_cleanup .items ():
38+ try :
39+ # Check transfer status
40+ task = transfer_client .get_task (task_id )
41+ task_status = task ["status" ]
42+
43+ if task_status == "SUCCEEDED" :
44+ # Transfer succeeded - delete the file
45+ if os .path .exists (file_path ):
46+ logger .info (
47+ f"{ ts_utc ()} : Deleting successfully transferred file { file_path } "
48+ )
49+ os .remove (file_path )
50+ else :
51+ logger .warning (f"{ ts_utc ()} : File { file_path } already missing" )
52+ completed_tasks .append (task_id )
53+
54+ elif task_status in ["FAILED" , "CANCELED" ]:
55+ # Transfer failed - keep the file but stop tracking
56+ logger .warning (
57+ f"{ ts_utc ()} : Transfer { task_id } failed/canceled, keeping file { file_path } "
58+ )
59+ completed_tasks .append (task_id )
60+
61+ elif task_status in ["ACTIVE" , "PENDING" ]:
62+ # Still in progress - keep tracking
63+ logger .debug (f"{ ts_utc ()} : Transfer { task_id } still { task_status } " )
64+
65+ else :
66+ # Unknown status - log and remove from tracking
67+ logger .warning (
68+ f"{ ts_utc ()} : Unknown transfer status { task_status } for { task_id } "
69+ )
70+ completed_tasks .append (task_id )
71+
72+ except Exception as e :
73+ # Error checking status - remove from tracking to avoid infinite retries
74+ logger .error (f"Error checking transfer status for { task_id } : { e } " )
75+ completed_tasks .append (task_id )
76+
77+ # Remove completed/failed transfers from tracking
78+ for task_id in completed_tasks :
79+ del pending_cleanup [task_id ]
80+
81+ if completed_tasks :
82+ logger .debug (
83+ f"{ ts_utc ()} : Cleaned up { len (completed_tasks )} completed transfers"
84+ )
85+
86+
87+ # C901 'hpss_transfer' is too complex (19)
88+ def hpss_transfer ( # noqa: C901
1889 hpss : str ,
1990 file_path : str ,
2091 transfer_type : str ,
@@ -25,6 +96,11 @@ def hpss_transfer(
2596):
2697 global prev_transfers
2798 global curr_transfers
99+ global pending_cleanup
100+
101+ # NEW: Clean up any completed transfers from previous calls
102+ if non_blocking and transfer_type == "put" :
103+ cleanup_completed_transfers ()
28104
29105 logger .info (
30106 f"{ ts_utc ()} : in hpss_transfer, prev_transfers is starting as { prev_transfers } "
@@ -118,6 +194,17 @@ def hpss_transfer(
118194 # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but
119195 # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer
120196 # return a tuple (task_id, status).
197+
198+ # NEW: Track non-blocking transfers for later cleanup
199+ if non_blocking and transfer_type == "put" and not keep :
200+ # Get the current task_id from globus module
201+ from .globus import task_id
202+
203+ if task_id :
204+ pending_cleanup [task_id ] = (file_path , ts_utc ())
205+ logger .debug (
206+ f"{ ts_utc ()} : Added { file_path } to pending cleanup for task { task_id } "
207+ )
121208 else :
122209 # Transfer file using `hsi`
123210 command : str = 'hsi -q "cd {}; {} {}"' .format (hpss , transfer_command , name )
@@ -130,19 +217,26 @@ def hpss_transfer(
130217
131218 if transfer_type == "put" :
132219 if not keep :
133- if (scheme != "globus" ) or (globus_status == "SUCCEEDED" ):
134- # Note: This is intended to fulfill the default removal of successfully-transfered
135- # tar files when keep=False, irrespective of non-blocking status
220+ if scheme != "globus" :
221+ # HSI transfers - delete immediately as before
136222 logger .debug (
137- f"{ ts_utc ()} : deleting transfered files { prev_transfers } "
223+ f"{ ts_utc ()} : deleting transferred files { prev_transfers } "
138224 )
139225 for src_path in prev_transfers :
140226 os .remove (src_path )
141227 prev_transfers = curr_transfers
142228 curr_transfers = list ()
143- logger .info (
144- f"{ ts_utc ()} : prev_transfers has been set to { prev_transfers } "
145- )
229+ elif not non_blocking :
230+ # Blocking Globus transfers - delete immediately if succeeded
231+ if globus_status == "SUCCEEDED" :
232+ logger .debug (
233+ f"{ ts_utc ()} : deleting transferred files { prev_transfers } "
234+ )
235+ for src_path in prev_transfers :
236+ os .remove (src_path )
237+ prev_transfers = curr_transfers
238+ curr_transfers = list ()
239+ # else: non-blocking Globus - cleanup handled by pending_cleanup mechanism
146240
147241
148242def hpss_put (
0 commit comments