Skip to content

Commit e9de611

Browse files
committed
Tar files not yet deleted
1 parent dafa4df commit e9de611

File tree

7 files changed

+275
-199
lines changed

7 files changed

+275
-199
lines changed

zstash/create.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
from six.moves.urllib.parse import urlparse
1212

13-
from .globus import GlobusTransferCollection, globus_activate, globus_finalize
13+
from .globus import globus_activate, globus_finalize
1414
from .hpss import hpss_put
1515
from .hpss_utils import add_files
1616
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
17+
from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection
1718
from .utils import (
1819
create_tars_table,
1920
get_files_to_archive,
@@ -89,16 +90,23 @@ def create():
8990

9091
# Create and set up the database
9192
logger.debug(f"{ts_utc()}: Calling create_database()")
92-
failures: List[str] = create_database(cache, args, gtc=gtc)
93+
htc: HPSSTransferCollection = HPSSTransferCollection()
94+
failures: List[str] = create_database(cache, args, gtc=gtc, htc=htc)
9395

9496
# Transfer to HPSS. Always keep a local copy.
9597
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
9698
hpss_put(
97-
hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True, gtc=gtc
99+
hpss,
100+
get_db_filename(cache),
101+
cache,
102+
keep=args.keep,
103+
is_index=True,
104+
gtc=gtc,
105+
# htc=htc,
98106
)
99107

100108
logger.debug(f"{ts_utc()}: calling globus_finalize()")
101-
globus_finalize(gtc, non_blocking=args.non_blocking)
109+
globus_finalize(gtc, htc, non_blocking=args.non_blocking)
102110

103111
if len(failures) > 0:
104112
# List the failures
@@ -208,7 +216,10 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
208216

209217

210218
def create_database(
211-
cache: str, args: argparse.Namespace, gtc: Optional[GlobusTransferCollection]
219+
cache: str,
220+
args: argparse.Namespace,
221+
gtc: Optional[GlobusTransferCollection],
222+
htc: Optional[HPSSTransferCollection],
212223
) -> List[str]:
213224
# Create new database
214225
logger.debug(f"{ts_utc()}:Creating index database")
@@ -284,6 +295,7 @@ def create_database(
284295
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
285296
force_database_corruption=args.for_developers_force_database_corruption,
286297
gtc=gtc,
298+
htc=htc,
287299
)
288300
except FileNotFoundError as e:
289301
if args.follow_symlinks:

zstash/globus.py

Lines changed: 63 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,31 @@
33
import sys
44
from typing import List, Optional
55

6-
from globus_sdk import TransferAPIError, TransferClient, TransferData
6+
from globus_sdk import TransferAPIError, TransferData
77
from globus_sdk.response import GlobusHTTPResponse
8-
from globus_sdk.services.transfer.response.iterable import IterableTransferResponse
98
from six.moves.urllib.parse import urlparse
109

1110
from .globus_utils import (
1211
HPSS_ENDPOINT_MAP,
1312
check_state_files,
13+
file_exists,
1414
get_local_endpoint_id,
1515
get_transfer_client_with_auth,
16+
globus_block_wait,
17+
globus_wait,
1618
set_up_TransferData,
1719
submit_transfer_with_checks,
1820
)
1921
from .settings import logger
22+
from .transfer_tracking import (
23+
GlobusTransfer,
24+
GlobusTransferCollection,
25+
HPSSTransferCollection,
26+
delete_transferred_files,
27+
)
2028
from .utils import ts_utc
2129

2230

23-
class GlobusTransfer(object):
24-
def __init__(self):
25-
self.transfer_data: Optional[TransferData] = None
26-
self.task_id: Optional[str] = None
27-
# https://docs.globus.org/api/transfer/task/#task_fields
28-
# ACTIVE, SUCCEEDED, FAILED, INACTIVE
29-
self.task_status: Optional[str] = None
30-
31-
32-
class GlobusTransferCollection(object):
33-
def __init__(self):
34-
# Attributes common to all the transfers
35-
self.remote_endpoint: Optional[str] = None
36-
self.local_endpoint: Optional[str] = None
37-
self.transfer_client: Optional[TransferClient] = None
38-
self.archive_directory_listing: Optional[IterableTransferResponse] = None
39-
40-
self.transfers: List[GlobusTransfer] = (
41-
[]
42-
) # TODO: Replace with collections.deque?
43-
self.cumulative_tarfiles_pushed: int = 0
44-
45-
def get_most_recent_transfer(self) -> Optional[GlobusTransfer]:
46-
return self.transfers[-1] if self.transfers else None
47-
48-
4931
def globus_activate(
5032
hpss: str, gtc: Optional[GlobusTransferCollection] = None
5133
) -> Optional[GlobusTransferCollection]:
@@ -72,13 +54,6 @@ def globus_activate(
7254
return gtc
7355

7456

75-
def file_exists(archive_directory_listing, name: str) -> bool:
76-
for entry in archive_directory_listing:
77-
if entry.get("name") == name:
78-
return True
79-
return False
80-
81-
8257
# C901 'globus_transfer' is too complex (20)
8358
def globus_transfer( # noqa: C901
8459
gtc: Optional[GlobusTransferCollection],
@@ -168,6 +143,9 @@ def globus_transfer( # noqa: C901
168143
logger.info(
169144
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
170145
)
146+
# Nullify the submitted transfer data structure so that a new one will be created on next call.
147+
transfer_data = None
148+
171149
new_transfer = GlobusTransfer()
172150
new_transfer.transfer_data = transfer_data
173151
new_transfer.task_id = task_id
@@ -196,7 +174,6 @@ def globus_transfer( # noqa: C901
196174
transfer_client=gtc.transfer_client,
197175
task_id=new_mrt.task_id,
198176
wait_timeout=7200,
199-
polling_interval=10,
200177
max_retries=5,
201178
)
202179
else:
@@ -208,146 +185,67 @@ def globus_transfer( # noqa: C901
208185
globus_wait(gtc.transfer_client, new_mrt.task_id)
209186

210187

211-
def globus_block_wait(
212-
transfer_client: TransferClient,
213-
task_id: str,
214-
wait_timeout: int,
215-
polling_interval: int,
216-
max_retries: int,
217-
):
218-
219-
# poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours
220-
logger.info(
221-
f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}"
222-
)
223-
task_status = "UNKNOWN"
224-
retry_count = 0
225-
while retry_count < max_retries:
226-
try:
227-
# Wait for the task to complete
228-
logger.info(
229-
f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}"
230-
)
231-
transfer_client.task_wait(
232-
task_id, timeout=wait_timeout, polling_interval=10
233-
)
234-
logger.info(f"{ts_utc()}: done with wait")
235-
except Exception as e:
236-
logger.error(f"Unexpected Exception: {e}")
237-
else:
238-
curr_task = transfer_client.get_task(task_id)
239-
task_status = curr_task["status"]
240-
if task_status == "SUCCEEDED":
241-
break
242-
finally:
243-
retry_count += 1
244-
logger.info(
245-
f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds"
246-
)
247-
248-
if retry_count == max_retries:
249-
logger.info(
250-
f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds"
251-
)
252-
task_status = "EXHAUSTED_TIMEOUT_RETRIES"
253-
254-
logger.info(
255-
f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}"
256-
)
257-
258-
return task_status
259-
260-
261-
def globus_wait(transfer_client: TransferClient, task_id: str):
262-
try:
263-
"""
264-
A Globus transfer job (task) can be in one of the three states:
265-
ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a
266-
status of the transfer job (task) from the Globus Transfer service,
267-
with 20 second timeout limit. If the task is ACTIVE after time runs
268-
out 'task_wait' returns False, and True otherwise.
269-
"""
270-
while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20):
271-
pass
272-
"""
273-
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
274-
Check if the transfer SUCCEEDED or FAILED.
275-
"""
276-
task = transfer_client.get_task(task_id)
277-
if task["status"] == "SUCCEEDED":
278-
src_ep = task["source_endpoint_id"]
279-
dst_ep = task["destination_endpoint_id"]
280-
label = task["label"]
281-
logger.info(
282-
"Globus transfer {}, from {} to {}: {} succeeded".format(
283-
task_id, src_ep, dst_ep, label
284-
)
285-
)
286-
else:
287-
logger.error("Transfer FAILED")
288-
except TransferAPIError as e:
289-
if e.code == "NoCredException":
290-
logger.error(
291-
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
292-
e.message
293-
)
294-
)
295-
else:
296-
logger.error(e)
297-
sys.exit(1)
298-
except Exception as e:
299-
logger.error("Exception: {}".format(e))
300-
sys.exit(1)
301-
302-
303188
def globus_finalize(
304-
gtc: Optional[GlobusTransferCollection], non_blocking: bool = False
189+
gtc: Optional[GlobusTransferCollection],
190+
htc: HPSSTransferCollection,
191+
non_blocking: bool = False,
305192
):
306193
last_task_id = None
307194

308195
if gtc is None:
309196
logger.warning("No GlobusTransferCollection object provided for finalization")
310197
return
311198

312-
transfer = gtc.get_most_recent_transfer()
313-
314-
if transfer and transfer.transfer_data:
315-
# DEBUG: review accumulated items in TransferData
316-
logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
317-
attribs = transfer.transfer_data.__dict__
318-
for item in attribs["data"]["DATA"]:
319-
if item["DATA_TYPE"] == "transfer_item":
320-
gtc.cumulative_tarfiles_pushed += 1
321-
print(
322-
f" (finalize) PUSHING ({gtc.cumulative_tarfiles_pushed}) source item: {item['source_path']}",
323-
flush=True,
324-
)
199+
transfer: Optional[GlobusTransfer] = gtc.get_most_recent_transfer()
200+
if transfer:
201+
if transfer.transfer_data:
202+
# DEBUG: review accumulated items in TransferData
203+
logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
204+
attribs = transfer.transfer_data.__dict__
205+
for item in attribs["data"]["DATA"]:
206+
if item["DATA_TYPE"] == "transfer_item":
207+
gtc.cumulative_tarfiles_pushed += 1
208+
print(
209+
f" (finalize) PUSHING ({gtc.cumulative_tarfiles_pushed}) source item: {item['source_path']}",
210+
flush=True,
211+
)
325212

326-
# SUBMIT new transfer here
327-
logger.info(
328-
f"{ts_utc()}: DIVING: Submit Transfer for {transfer.transfer_data['label']}"
329-
)
330-
try:
331-
last_task = submit_transfer_with_checks(
332-
gtc.transfer_client, transfer.transfer_data
213+
# SUBMIT new transfer here
214+
logger.info(
215+
f"{ts_utc()}: DIVING: Submit Transfer for {transfer.transfer_data['label']}"
333216
)
334-
last_task_id = last_task.get("task_id")
335-
except TransferAPIError as e:
336-
if e.code == "NoCredException":
337-
logger.error(
338-
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
339-
e.message
340-
)
217+
try:
218+
last_task = submit_transfer_with_checks(
219+
gtc.transfer_client, transfer.transfer_data
341220
)
342-
else:
343-
logger.error(e)
344-
sys.exit(1)
345-
except Exception as e:
346-
logger.error("Exception: {}".format(e))
347-
sys.exit(1)
221+
last_task_id = last_task.get("task_id")
222+
except TransferAPIError as e:
223+
if e.code == "NoCredException":
224+
logger.error(
225+
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
226+
e.message
227+
)
228+
)
229+
else:
230+
logger.error(e)
231+
sys.exit(1)
232+
except Exception as e:
233+
logger.error("Exception: {}".format(e))
234+
sys.exit(1)
348235

349-
if not non_blocking:
350-
if transfer and transfer.task_id:
351-
globus_wait(gtc.transfer_client, transfer.task_id)
236+
if not non_blocking:
237+
if transfer and transfer.task_id:
238+
globus_wait(gtc.transfer_client, transfer.task_id)
352239
if last_task_id:
353240
globus_wait(gtc.transfer_client, last_task_id)
241+
242+
# TODO: figure out how to end!
243+
# new_mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer()
244+
# if new_mrt and new_mrt.task_id:
245+
# new_mrt.task_status = globus_block_wait(
246+
# transfer_client=gtc.transfer_client,
247+
# task_id=new_mrt.task_id,
248+
# wait_timeout=7200,
249+
# max_retries=5,
250+
# )
251+
delete_transferred_files(htc)

0 commit comments

Comments
 (0)