Skip to content

Commit eb1e003

Browse files
committed
Fix circular import but tars still get deleted
1 parent 209d59f commit eb1e003

File tree

2 files changed

+76
-77
lines changed

2 files changed

+76
-77
lines changed

zstash/globus.py

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from __future__ import absolute_import, print_function
22

3+
import os
34
import sys
4-
from typing import List, Optional
5+
from typing import Dict, List, Optional, Tuple
56

67
from globus_sdk import TransferAPIError, TransferClient, TransferData
78
from globus_sdk.response import GlobusHTTPResponse
@@ -16,7 +17,6 @@
1617
set_up_TransferData,
1718
submit_transfer_with_checks,
1819
)
19-
from .hpss import cleanup_completed_transfers
2020
from .settings import logger
2121
from .utils import ts_utc
2222

@@ -26,6 +26,76 @@
2626
transfer_data: TransferData = None
2727
task_id = None
2828
archive_directory_listing: IterableTransferResponse = None
29+
# Global tracking for non-blocking transfers
30+
pending_cleanup: Dict[str, Tuple[str, str]] = {} # task_id -> (file_path, timestamp)
31+
32+
33+
def cleanup_completed_transfers():
34+
"""
35+
Check status of previously submitted non-blocking transfers and delete
36+
files for any that have completed successfully.
37+
"""
38+
from .globus import transfer_client
39+
40+
global pending_cleanup
41+
42+
if not pending_cleanup:
43+
return
44+
45+
logger.debug(
46+
f"{ts_utc()}: Checking {len(pending_cleanup)} pending transfers for cleanup"
47+
)
48+
49+
completed_tasks = []
50+
51+
for task_id, (file_path, submit_time) in pending_cleanup.items():
52+
try:
53+
# Check transfer status
54+
task = transfer_client.get_task(task_id)
55+
task_status = task["status"]
56+
57+
if task_status == "SUCCEEDED":
58+
# Transfer succeeded - delete the file
59+
if os.path.exists(file_path):
60+
logger.info(
61+
f"{ts_utc()}: Deleting successfully transferred file {file_path}"
62+
)
63+
os.remove(file_path)
64+
else:
65+
logger.warning(f"{ts_utc()}: File {file_path} already missing")
66+
completed_tasks.append(task_id)
67+
68+
elif task_status in ["FAILED", "CANCELED"]:
69+
# Transfer failed - keep the file but stop tracking
70+
logger.warning(
71+
f"{ts_utc()}: Transfer {task_id} failed/canceled, keeping file {file_path}"
72+
)
73+
completed_tasks.append(task_id)
74+
75+
elif task_status in ["ACTIVE", "PENDING"]:
76+
# Still in progress - keep tracking
77+
logger.debug(f"{ts_utc()}: Transfer {task_id} still {task_status}")
78+
79+
else:
80+
# Unknown status - log and remove from tracking
81+
logger.warning(
82+
f"{ts_utc()}: Unknown transfer status {task_status} for {task_id}"
83+
)
84+
completed_tasks.append(task_id)
85+
86+
except Exception as e:
87+
# Error checking status - remove from tracking to avoid infinite retries
88+
logger.error(f"Error checking transfer status for {task_id}: {e}")
89+
completed_tasks.append(task_id)
90+
91+
# Remove completed/failed transfers from tracking
92+
for task_id in completed_tasks:
93+
del pending_cleanup[task_id]
94+
95+
if completed_tasks:
96+
logger.debug(
97+
f"{ts_utc()}: Cleaned up {len(completed_tasks)} completed transfers"
98+
)
2999

30100

31101
def globus_activate(hpss: str):
@@ -293,6 +363,7 @@ def globus_finalize(non_blocking: bool = False):
293363
global transfer_data
294364
global task_id
295365
global global_variable_tarfiles_pushed
366+
global pending_cleanup
296367

297368
last_task_id = None
298369

@@ -343,7 +414,6 @@ def globus_finalize(non_blocking: bool = False):
343414
# 1. Wait for them (defeating the non-blocking purpose)
344415
# 2. Leave them for manual cleanup
345416
# 3. Log a warning
346-
from .hpss import pending_cleanup
347417

348418
if pending_cleanup:
349419
logger.warning(

zstash/hpss.py

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -2,86 +2,16 @@
22

33
import os.path
44
import subprocess
5-
from typing import Dict, List, Tuple
5+
from typing import List
66

77
from six.moves.urllib.parse import urlparse
88

9-
from .globus import globus_transfer
9+
from .globus import cleanup_completed_transfers, globus_transfer
1010
from .settings import get_db_filename, logger
1111
from .utils import run_command, ts_utc
1212

1313
prev_transfers: List[str] = list()
1414
curr_transfers: List[str] = list()
15-
# Global tracking for non-blocking transfers
16-
pending_cleanup: Dict[str, Tuple[str, str]] = {} # task_id -> (file_path, timestamp)
17-
18-
19-
def cleanup_completed_transfers():
20-
"""
21-
Check status of previously submitted non-blocking transfers and delete
22-
files for any that have completed successfully.
23-
"""
24-
from .globus import transfer_client
25-
26-
global pending_cleanup
27-
28-
if not pending_cleanup:
29-
return
30-
31-
logger.debug(
32-
f"{ts_utc()}: Checking {len(pending_cleanup)} pending transfers for cleanup"
33-
)
34-
35-
completed_tasks = []
36-
37-
for task_id, (file_path, submit_time) in pending_cleanup.items():
38-
try:
39-
# Check transfer status
40-
task = transfer_client.get_task(task_id)
41-
task_status = task["status"]
42-
43-
if task_status == "SUCCEEDED":
44-
# Transfer succeeded - delete the file
45-
if os.path.exists(file_path):
46-
logger.info(
47-
f"{ts_utc()}: Deleting successfully transferred file {file_path}"
48-
)
49-
os.remove(file_path)
50-
else:
51-
logger.warning(f"{ts_utc()}: File {file_path} already missing")
52-
completed_tasks.append(task_id)
53-
54-
elif task_status in ["FAILED", "CANCELED"]:
55-
# Transfer failed - keep the file but stop tracking
56-
logger.warning(
57-
f"{ts_utc()}: Transfer {task_id} failed/canceled, keeping file {file_path}"
58-
)
59-
completed_tasks.append(task_id)
60-
61-
elif task_status in ["ACTIVE", "PENDING"]:
62-
# Still in progress - keep tracking
63-
logger.debug(f"{ts_utc()}: Transfer {task_id} still {task_status}")
64-
65-
else:
66-
# Unknown status - log and remove from tracking
67-
logger.warning(
68-
f"{ts_utc()}: Unknown transfer status {task_status} for {task_id}"
69-
)
70-
completed_tasks.append(task_id)
71-
72-
except Exception as e:
73-
# Error checking status - remove from tracking to avoid infinite retries
74-
logger.error(f"Error checking transfer status for {task_id}: {e}")
75-
completed_tasks.append(task_id)
76-
77-
# Remove completed/failed transfers from tracking
78-
for task_id in completed_tasks:
79-
del pending_cleanup[task_id]
80-
81-
if completed_tasks:
82-
logger.debug(
83-
f"{ts_utc()}: Cleaned up {len(completed_tasks)} completed transfers"
84-
)
8515

8616

8717
# C901 'hpss_transfer' is too complex (19)
@@ -96,7 +26,6 @@ def hpss_transfer( # noqa: C901
9626
):
9727
global prev_transfers
9828
global curr_transfers
99-
global pending_cleanup
10029

10130
# NEW: Clean up any completed transfers from previous calls
10231
if non_blocking and transfer_type == "put":
@@ -198,7 +127,7 @@ def hpss_transfer( # noqa: C901
198127
# NEW: Track non-blocking transfers for later cleanup
199128
if non_blocking and transfer_type == "put" and not keep:
200129
# Get the current task_id from globus module
201-
from .globus import task_id
130+
from .globus import pending_cleanup, task_id
202131

203132
if task_id:
204133
pending_cleanup[task_id] = (file_path, ts_utc())

0 commit comments

Comments
 (0)