diff --git a/conda/dev.yml b/conda/dev.yml index a70d6164..5a19a9c4 100644 --- a/conda/dev.yml +++ b/conda/dev.yml @@ -8,8 +8,7 @@ dependencies: - pip=22.2.2 - python=3.9.13 - six=1.16.0 - - globus-sdk=3.2.1 - - fair-research-login=0.2.6 + - globus-sdk=3.15.0 # Developer Tools # ================= # If versions are updated, also update 'rev' in `.pre-commit.config.yaml` diff --git a/tests/scripts/globus_auth.bash b/tests/scripts/globus_auth.bash new file mode 100755 index 00000000..b785bc8c --- /dev/null +++ b/tests/scripts/globus_auth.bash @@ -0,0 +1,125 @@ +setup() +{ + echo "##########################################################################################################" + local case_name="${1}" + local src_dir="${2}" + echo "Testing: ${case_name}" + full_dir="${src_dir}/${case_name}" + rm -rf ${full_dir} + mkdir -p ${full_dir} + cd ${full_dir} + + mkdir zstash_demo + mkdir zstash_demo/empty_dir + mkdir zstash_demo/dir + echo -n '' > zstash_demo/file_empty.txt + echo 'file0 stuff' > zstash_demo/dir/file0.txt +} + +check_log_has() +{ + local expected_grep="${1}" + local log_file="${2}" + grep "${expected_grep}" ${log_file} + if [ $? != 0 ]; then + echo "Expected grep '${expected_grep}' not found in ${log_file}. Test failed." + exit 2 + fi +} + +check_log_does_not_have() +{ + local not_expected_grep="${1}" + local log_file="${2}" + grep "${not_expected_grep}" ${log_file} + if [ $? == 0 ]; then + echo "Not-expected grep '${expected_grep}' was found in ${log_file}. Test failed." + exit 2 + fi +} + +run_test_cases() +{ + # This script requires user input and thus cannot be run automatically as part of a test suite. + + # To start fresh with Globus: + # 1. Log into endpoints (LCRC Improv DTN, NERSC Perlmutter) at globus.org: File Manager > Add the endpoints in the "Collection" fields + # 2. To start fresh, with no consents: https://auth.globus.org/v2/web/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all" + + # Before each run: + # Perlmutter: + # cd /global/homes/f/forsyth/zstash/tests/ + # rm -rf test_globus_auth_try1 # Or just change $DST_DIR to a new directory + # + # Chrysalis: + # cd ~/ez/zstash/ + # conda activate + # pre-commit run --all-files + # python -m pip install . + # cd tests/scripts + # ./globus_auth.bash + + PERLMUTTER_ENDPOINT=6bdc7956-fc0f-4ad2-989c-7aa5ee643a79 + + SRC_DIR=/lcrc/group/e3sm/ac.forsyth2/zstash_testing/test_globus_auth # Chrysalis + DST_DIR=globus://${PERLMUTTER_ENDPOINT}/global/homes/f/forsyth/zstash/tests/test_globus_auth_try5 + + GLOBUS_CFG=/home/ac.forsyth2/.globus-native-apps.cfg + INI_PATH=/home/ac.forsyth2/.zstash.ini + TOKEN_FILE=/home/ac.forsyth2/.zstash_globus_tokens.json + + # Start fresh + rm -rf ${GLOBUS_CFG} + rm -rf ${INI_PATH} + rm -rf ${TOKEN_FILE} + + echo "Running globus_auth test" + echo "Exit codes: 0 -- success, 1 -- zstash failed, 2 -- grep check failed" + + case_name="run1" + setup ${case_name} "${SRC_DIR}" + # Expecting to see exactly 1 authentication prompt + zstash create --hpss=${DST_DIR}/${case_name} zstash_demo 2>&1 | tee ${case_name}.log + if [ $? != 0 ]; then + echo "${case_name} failed. Check ${case_name}_create.log for details. Cannot continue." + exit 1 + fi + echo "${case_name} completed successfully. Checking ${case_name}.log now." + # From check_state_files + check_log_does_not_have "WARNING: Globus CFG ${GLOBUS_CFG} exists. This may be left over from earlier versions of zstash, and may cause issues. Consider deleting." ${case_name}.log + check_log_has "INFO: ${INI_PATH} does NOT exist. This means we won't be able to read the local endpoint ID from it." ${case_name}.log + check_log_has "INFO: Token file ${TOKEN_FILE} does NOT exist. This means we won't be able to load tokens from it." ${case_name}.log + # From get_local_endpoint_id + check_log_has "INFO: Writing to empty ${INI_PATH}" ${case_name}.log + check_log_has "INFO: Setting local_endpoint_id based on FQDN chrlogin2.lcrc.anl.gov:" ${case_name}.log + # From get_transfer_client_with_auth + check_log_has "INFO: No stored tokens found - starting authentication" ${case_name}.log + check_log_has "Please go to this URL and login:" ${case_name}.log # Our one expected authentication prompt + # From save_tokens + check_log_has "INFO: Tokens saved successfully" ${case_name}.log + + + case_name="run2" + setup ${case_name} "${SRC_DIR}" + # Expecting to see exactly 0 authentication prompts + zstash create --hpss=${DST_DIR}/${case_name} zstash_demo 2>&1 | tee ${case_name}.log + if [ $? != 0 ]; then + echo "${case_name} failed. Check ${case_name}_create.log for details. Cannot continue." + exit 1 + fi + echo "${case_name} completed successfully. Checking ${case_name}.log now." + # From check_state_files + check_log_does_not_have "WARNING: Globus CFG ${GLOBUS_CFG} exists. This may be left over from earlier versions of zstash, and may cause issues. Consider deleting." ${case_name}.log + check_log_has "INFO: ${INI_PATH} exists. We can try to read the local endpoint ID from it." ${case_name}.log # Differs from run1 + check_log_has "INFO: Token file ${TOKEN_FILE} exists. We can try to load tokens from it." ${case_name}.log # Differs from run1 + # From get_local_endpoint_id + check_log_has "INFO: Setting local_endpoint_id based on ${INI_PATH}" ${case_name}.log # Differs from run1 + check_log_has "INFO: Setting local_endpoint_id based on FQDN chrlogin2.lcrc.anl.gov:" ${case_name}.log + # From get_transfer_client_with_auth + check_log_has "INFO: Found stored refresh token - using it" ${case_name}.log # Differs from run1 + check_log_does_not_have "Please go to this URL and login:" ${case_name}.log # There should be no login prompts for run2! + # From save_tokens + check_log_does_not_have "INFO: Tokens saved successfully" ${case_name}.log # Differs from run1 +} + +run_test_cases diff --git a/zstash/create.py b/zstash/create.py index b502f1e6..d8f7079d 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -6,7 +6,7 @@ import os.path import sqlite3 import sys -from typing import Any, List, Tuple +from typing import Any, List, Optional, Tuple from six.moves.urllib.parse import urlparse @@ -14,6 +14,7 @@ from .hpss import hpss_put from .hpss_utils import add_files from .settings import DEFAULT_CACHE, config, get_db_filename, logger +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import ( create_tars_table, get_files_to_archive, @@ -52,12 +53,13 @@ def create(): logger.error(input_path_error_str) raise NotADirectoryError(input_path_error_str) + gtc: Optional[GlobusTransferCollection] = None if hpss != "none": url = urlparse(hpss) if url.scheme == "globus": # identify globus endpoints - logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)") - globus_activate(hpss) + logger.debug(f"{ts_utc()}:Calling globus_activate()") + gtc = globus_activate(hpss) else: # config.hpss is not "none", so we need to # create target HPSS directory @@ -88,14 +90,23 @@ def create(): # Create and set up the database logger.debug(f"{ts_utc()}: Calling create_database()") - failures: List[str] = create_database(cache, args) + htc: HPSSTransferCollection = HPSSTransferCollection() + failures: List[str] = create_database(cache, args, gtc=gtc, htc=htc) # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) + hpss_put( + hpss, + get_db_filename(cache), + cache, + keep=args.keep, + is_index=True, + gtc=gtc, + # htc=htc, + ) logger.debug(f"{ts_utc()}: calling globus_finalize()") - globus_finalize(non_blocking=args.non_blocking) + globus_finalize(gtc, htc, non_blocking=args.non_blocking) if len(failures) > 0: # List the failures @@ -204,7 +215,12 @@ def setup_create() -> Tuple[str, argparse.Namespace]: return cache, args -def create_database(cache: str, args: argparse.Namespace) -> List[str]: +def create_database( + cache: str, + args: argparse.Namespace, + gtc: Optional[GlobusTransferCollection], + htc: Optional[HPSSTransferCollection], +) -> List[str]: # Create new database logger.debug(f"{ts_utc()}:Creating index database") if os.path.exists(get_db_filename(cache)): @@ -263,26 +279,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: files: List[str] = get_files_to_archive(cache, args.include, args.exclude) failures: List[str] - if args.follow_symlinks: - try: - # Add files to archive - failures = add_files( - cur, - con, - -1, - files, - cache, - args.keep, - args.follow_symlinks, - skip_tars_md5=args.no_tars_md5, - non_blocking=args.non_blocking, - error_on_duplicate_tar=args.error_on_duplicate_tar, - overwrite_duplicate_tars=args.overwrite_duplicate_tars, - force_database_corruption=args.for_developers_force_database_corruption, - ) - except FileNotFoundError: - raise Exception("Archive creation failed due to broken symlink.") - else: + try: # Add files to archive failures = add_files( cur, @@ -297,7 +294,14 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: error_on_duplicate_tar=args.error_on_duplicate_tar, overwrite_duplicate_tars=args.overwrite_duplicate_tars, force_database_corruption=args.for_developers_force_database_corruption, + gtc=gtc, + htc=htc, ) + except FileNotFoundError as e: + if args.follow_symlinks: + raise Exception("Archive creation failed due to broken symlink.") + else: + raise e # Close database con.commit() diff --git a/zstash/globus.py b/zstash/globus.py index 84c12cdf..aded9ed4 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -1,191 +1,81 @@ from __future__ import absolute_import, print_function -import configparser -import os -import os.path -import re -import socket import sys +from typing import List, Optional -from fair_research_login.client import NativeClient -from globus_sdk import TransferAPIError, TransferClient, TransferData -from globus_sdk.services.transfer.response.iterable import IterableTransferResponse +from globus_sdk import TransferAPIError, TransferData +from globus_sdk.response import GlobusHTTPResponse from six.moves.urllib.parse import urlparse +from .globus_utils import ( + HPSS_ENDPOINT_MAP, + check_state_files, + file_exists, + get_local_endpoint_id, + get_transfer_client_with_auth, + globus_block_wait, + globus_wait, + set_up_TransferData, + submit_transfer_with_checks, +) from .settings import logger +from .transfer_tracking import ( + GlobusTransfer, + GlobusTransferCollection, + HPSSTransferCollection, + delete_transferred_files, +) from .utils import ts_utc -hpss_endpoint_map = { - "ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec", - "NERSC": "9cd89cfd-6d04-11e5-ba46-22000b92c6ec", -} - -# This is used if the `globus_endpoint_uuid` is not set in `~/.zstash.ini` -regex_endpoint_map = { - r"theta.*\.alcf\.anl\.gov": "08925f04-569f-11e7-bef8-22000b9a448b", - r"blueslogin.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", - r"chrlogin.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", - r"b\d+\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", - r"chr.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", - r"compy.*\.pnl\.gov": "68fbd2fa-83d7-11e9-8e63-029d279f7e24", - r"perlmutter.*\.nersc\.gov": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79", -} - -remote_endpoint = None -local_endpoint = None -transfer_client: TransferClient = None -transfer_data: TransferData = None -task_id = None -archive_directory_listing: IterableTransferResponse = None - - -def check_endpoint_version_5(ep_id): - output = transfer_client.get_endpoint(ep_id) - version = output.get("gcs_version", "0.0") - if output["gcs_version"] is None: - return False - elif int(version.split(".")[0]) >= 5: - return True - return False - - -def submit_transfer_with_checks(transfer_data): - try: - task = transfer_client.submit_transfer(transfer_data) - except TransferAPIError as err: - if err.info.consent_required: - scopes = "urn:globus:auth:scope:transfer.api.globus.org:all[" - for ep_id in [remote_endpoint, local_endpoint]: - if check_endpoint_version_5(ep_id): - scopes += f" *https://auth.globus.org/scopes/{ep_id}/data_access" - scopes += " ]" - native_client = NativeClient( - client_id="6c1629cf-446c-49e7-af95-323c6412397f", app_name="Zstash" - ) - native_client.login(requested_scopes=scopes) - # Quit here and tell user to re-try - print( - "Consents added, please re-run the previous command to start transfer" - ) - sys.exit(0) - else: - raise err - return task - - -def globus_activate(hpss: str): - """ - Read the local globus endpoint UUID from ~/.zstash.ini. - If the ini file does not exist, create an ini file with empty values, - and try to find the local endpoint UUID based on the FQDN - """ - global transfer_client - global local_endpoint - global remote_endpoint +def globus_activate( + hpss: str, gtc: Optional[GlobusTransferCollection] = None +) -> Optional[GlobusTransferCollection]: url = urlparse(hpss) if url.scheme != "globus": - return - remote_endpoint = url.netloc - - ini_path = os.path.expanduser("~/.zstash.ini") - ini = configparser.ConfigParser() - if ini.read(ini_path): - if "local" in ini.sections(): - local_endpoint = ini["local"].get("globus_endpoint_uuid") - else: - ini["local"] = {"globus_endpoint_uuid": ""} - try: - with open(ini_path, "w") as f: - ini.write(f) - except Exception as e: - logger.error(e) - sys.exit(1) - if not local_endpoint: - fqdn = socket.getfqdn() - if re.fullmatch(r"n.*\.local", fqdn) and os.getenv("HOSTNAME", "NA").startswith( - "compy" - ): - fqdn = "compy.pnl.gov" - for pattern in regex_endpoint_map.keys(): - if re.fullmatch(pattern, fqdn): - local_endpoint = regex_endpoint_map.get(pattern) - break - # FQDN is not set on Perlmutter at NERSC - if not local_endpoint: - nersc_hostname = os.environ.get("NERSC_HOST") - if nersc_hostname and ( - nersc_hostname == "perlmutter" or nersc_hostname == "unknown" - ): - local_endpoint = regex_endpoint_map.get(r"perlmutter.*\.nersc\.gov") - if not local_endpoint: - logger.error( - "{} does not have the local Globus endpoint set nor could one be found in regex_endpoint_map.".format( - ini_path - ) - ) - sys.exit(1) - - if remote_endpoint.upper() in hpss_endpoint_map.keys(): - remote_endpoint = hpss_endpoint_map.get(remote_endpoint.upper()) - - native_client = NativeClient( - client_id="6c1629cf-446c-49e7-af95-323c6412397f", - app_name="Zstash", - default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all", - ) - native_client.login(no_local_server=True, refresh_tokens=True) - transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org") - transfer_client = TransferClient(authorizer=transfer_authorizer) - - for ep_id in [local_endpoint, remote_endpoint]: - r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600) + return None + if gtc is None: + gtc = GlobusTransferCollection() + check_state_files() + gtc.remote_endpoint = url.netloc + gtc.local_endpoint = get_local_endpoint_id(gtc.local_endpoint) + upper_remote_ep = gtc.remote_endpoint.upper() + if upper_remote_ep in HPSS_ENDPOINT_MAP.keys(): + gtc.remote_endpoint = HPSS_ENDPOINT_MAP.get(upper_remote_ep) + both_endpoints: List[Optional[str]] = [gtc.local_endpoint, gtc.remote_endpoint] + gtc.transfer_client = get_transfer_client_with_auth(both_endpoints) + for ep_id in both_endpoints: + r = gtc.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600) if r.get("code") == "AutoActivationFailed": logger.error( - "The {} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{} and (re)activate the endpoint.".format( - ep_id, ep_id - ) + f"The {ep_id} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{ep_id} and (re)activate the endpoint." ) sys.exit(1) - - -def file_exists(name: str) -> bool: - global archive_directory_listing - - for entry in archive_directory_listing: - if entry.get("name") == name: - return True - return False - - -global_variable_tarfiles_pushed = 0 + return gtc # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 - remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool + gtc: Optional[GlobusTransferCollection], + remote_ep: str, + remote_path: str, + name: str, + transfer_type: str, + non_blocking: bool, ): - global transfer_client - global local_endpoint - global remote_endpoint - global transfer_data - global task_id - global archive_directory_listing - global global_variable_tarfiles_pushed - logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}") - if not transfer_client: - globus_activate("globus://" + remote_ep) - if not transfer_client: + if (not gtc) or (not gtc.transfer_client): + gtc = globus_activate("globus://" + remote_ep) + if (not gtc) or (not gtc.transfer_client): sys.exit(1) if transfer_type == "get": - if not archive_directory_listing: - archive_directory_listing = transfer_client.operation_ls( - remote_endpoint, remote_path + if not gtc.archive_directory_listing: + gtc.archive_directory_listing = gtc.transfer_client.operation_ls( + gtc.remote_endpoint, remote_path ) - if not file_exists(name): + if not file_exists(gtc.archive_directory_listing, name): logger.error( "Remote file globus://{}{}/{} does not exist".format( remote_ep, remote_path, name @@ -193,62 +83,44 @@ def globus_transfer( # noqa: C901 ) sys.exit(1) - if transfer_type == "get": - src_ep = remote_endpoint - src_path = os.path.join(remote_path, name) - dst_ep = local_endpoint - dst_path = os.path.join(os.getcwd(), name) - else: - src_ep = local_endpoint - src_path = os.path.join(os.getcwd(), name) - dst_ep = remote_endpoint - dst_path = os.path.join(remote_path, name) - - subdir = os.path.basename(os.path.normpath(remote_path)) - subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir) - filename = name.split(".")[0] - label = subdir_label + " " + filename + mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() + transfer_data: TransferData = set_up_TransferData( + transfer_type, + gtc.local_endpoint, + gtc.remote_endpoint, + remote_path, + name, + gtc.transfer_client, + mrt.transfer_data if mrt else None, + ) - if not transfer_data: - transfer_data = TransferData( - transfer_client, - src_ep, - dst_ep, - label=label, - verify_checksum=True, - preserve_timestamp=True, - fail_on_quota_errors=True, - ) - transfer_data.add_item(src_path, dst_path) - transfer_data["label"] = label + task: GlobusHTTPResponse try: - if task_id: - task = transfer_client.get_task(task_id) - prev_task_status = task["status"] + if mrt and mrt.task_id: + task = gtc.transfer_client.get_task(mrt.task_id) + mrt.task_status = task["status"] # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} # NOTE: How we behave here depends upon whether we want to support mutliple active transfers. # Presently, we do not, except inadvertantly (if status == PENDING) - if prev_task_status == "ACTIVE": + if mrt.task_status == "ACTIVE": logger.info( - f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE." + f"{ts_utc()}: Previous task_id {mrt.task_id} Still Active. Returning ACTIVE." ) return "ACTIVE" - elif prev_task_status == "SUCCEEDED": + elif mrt.task_status == "SUCCEEDED": logger.info( - f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED." + f"{ts_utc()}: Previous task_id {mrt.task_id} status = SUCCEEDED." ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] ts = ts_utc() logger.info( - "{}:Globus transfer {}, from {} to {}: {} succeeded".format( - ts, task_id, src_ep, dst_ep, label - ) + f"{ts}:Globus transfer {mrt.task_id}, from {src_ep} to {dst_ep}: {label} succeeded" ) else: logger.error( - f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}." + f"{ts_utc()}: Previous task_id {mrt.task_id} status = {mrt.task_status}." ) # DEBUG: review accumulated items in TransferData @@ -256,24 +128,29 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - global_variable_tarfiles_pushed += 1 + gtc.cumulative_tarfiles_pushed += 1 print( - f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}", + f" (routine) PUSHING (#{gtc.cumulative_tarfiles_pushed}) STORED source item: {item['source_path']}", flush=True, ) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") - task = submit_transfer_with_checks(transfer_data) + task = submit_transfer_with_checks(gtc.transfer_client, transfer_data) task_id = task.get("task_id") # NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer, # the "lable" given here refers only to the LAST tarfile in the TransferData list. logger.info( f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" ) - # Nullify the submitted transfer data structure so that a new one will be created on next call. transfer_data = None + + new_transfer = GlobusTransfer() + new_transfer.transfer_data = transfer_data + new_transfer.task_id = task_id + new_transfer.task_status = "UNKNOWN" + gtc.transfers.append(new_transfer) except TransferAPIError as e: if e.code == "NoCredException": logger.error( @@ -288,156 +165,81 @@ def globus_transfer( # noqa: C901 logger.error("Exception: {}".format(e)) sys.exit(1) - # test for blocking on new task_id - task_status = "UNKNOWN" - if not non_blocking: - task_status = globus_block_wait( - task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 - ) - else: - logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") - - if transfer_type == "put": - return task_status - - if transfer_type == "get" and task_id: - globus_wait(task_id) + new_mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() - return task_status - - -def globus_block_wait( - task_id: str, wait_timeout: int, polling_interval: int, max_retries: int -): - global transfer_client - - # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours - logger.info( - f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" - ) - task_status = "UNKNOWN" - retry_count = 0 - while retry_count < max_retries: - try: - # Wait for the task to complete - logger.info( - f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}" - ) - transfer_client.task_wait( - task_id, timeout=wait_timeout, polling_interval=10 + # test for blocking on new task_id + if new_mrt and new_mrt.task_id: + if not non_blocking: + new_mrt.task_status = globus_block_wait( + transfer_client=gtc.transfer_client, + task_id=new_mrt.task_id, + wait_timeout=7200, + max_retries=5, ) - logger.info(f"{ts_utc()}: done with wait") - except Exception as e: - logger.error(f"Unexpected Exception: {e}") else: - curr_task = transfer_client.get_task(task_id) - task_status = curr_task["status"] - if task_status == "SUCCEEDED": - break - finally: - retry_count += 1 logger.info( - f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {new_mrt.task_id}" ) - if retry_count == max_retries: - logger.info( - f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" - ) - task_status = "EXHAUSTED_TIMEOUT_RETRIES" + if transfer_type == "get": + globus_wait(gtc.transfer_client, new_mrt.task_id) - logger.info( - f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" - ) - return task_status +def globus_finalize( + gtc: Optional[GlobusTransferCollection], + htc: HPSSTransferCollection, + non_blocking: bool = False, +): + last_task_id = None + if gtc is None: + logger.warning("No GlobusTransferCollection object provided for finalization") + return -def globus_wait(task_id: str): - global transfer_client + transfer: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() + if transfer: + if transfer.transfer_data: + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") + attribs = transfer.transfer_data.__dict__ + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": + gtc.cumulative_tarfiles_pushed += 1 + print( + f" (finalize) PUSHING ({gtc.cumulative_tarfiles_pushed}) source item: {item['source_path']}", + flush=True, + ) - try: - """ - A Globus transfer job (task) can be in one of the three states: - ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a - status of the transfer job (task) from the Globus Transfer service, - with 20 second timeout limit. If the task is ACTIVE after time runs - out 'task_wait' returns False, and True otherwise. - """ - while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): - pass - """ - The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). - Check if the transfer SUCCEEDED or FAILED. - """ - task = transfer_client.get_task(task_id) - if task["status"] == "SUCCEEDED": - src_ep = task["source_endpoint_id"] - dst_ep = task["destination_endpoint_id"] - label = task["label"] + # SUBMIT new transfer here logger.info( - "Globus transfer {}, from {} to {}: {} succeeded".format( - task_id, src_ep, dst_ep, label - ) + f"{ts_utc()}: DIVING: Submit Transfer for {transfer.transfer_data['label']}" ) - else: - logger.error("Transfer FAILED") - except TransferAPIError as e: - if e.code == "NoCredException": - logger.error( - "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( - e.message - ) - ) - else: - logger.error(e) - sys.exit(1) - except Exception as e: - logger.error("Exception: {}".format(e)) - sys.exit(1) - - -def globus_finalize(non_blocking: bool = False): - global transfer_client - global transfer_data - global task_id - global global_variable_tarfiles_pushed - - last_task_id = None - - if transfer_data: - # DEBUG: review accumulated items in TransferData - logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") - attribs = transfer_data.__dict__ - for item in attribs["data"]["DATA"]: - if item["DATA_TYPE"] == "transfer_item": - global_variable_tarfiles_pushed += 1 - print( - f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}", - flush=True, + try: + last_task = submit_transfer_with_checks( + gtc.transfer_client, transfer.transfer_data ) - - # SUBMIT new transfer here - logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") - try: - last_task = submit_transfer_with_checks(transfer_data) - last_task_id = last_task.get("task_id") - except TransferAPIError as e: - if e.code == "NoCredException": - logger.error( - "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( - e.message + last_task_id = last_task.get("task_id") + except TransferAPIError as e: + if e.code == "NoCredException": + logger.error( + "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( + e.message + ) ) - ) - else: - logger.error(e) - sys.exit(1) - except Exception as e: - logger.error("Exception: {}".format(e)) - sys.exit(1) - - if not non_blocking: - if task_id: - globus_wait(task_id) - if last_task_id: - globus_wait(last_task_id) + else: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.error("Exception: {}".format(e)) + sys.exit(1) + if not non_blocking: + if transfer and transfer.task_id: + globus_wait(gtc.transfer_client, transfer.task_id) + if last_task_id: + globus_wait(gtc.transfer_client, last_task_id) + + if htc.curr_transfers and transfer and transfer.task_id: + globus_wait(gtc.transfer_client, transfer.task_id) + delete_transferred_files(htc) + if htc.prev_transfers: + delete_transferred_files(htc) diff --git a/zstash/globus_utils.py b/zstash/globus_utils.py new file mode 100644 index 00000000..c678675e --- /dev/null +++ b/zstash/globus_utils.py @@ -0,0 +1,373 @@ +from __future__ import absolute_import, print_function + +import configparser +import json +import os +import os.path +import re +import socket +import sys +from typing import Dict, List, Optional + +from globus_sdk import ( + NativeAppAuthClient, + RefreshTokenAuthorizer, + TransferAPIError, + TransferClient, + TransferData, +) +from globus_sdk.response import GlobusHTTPResponse + +from .settings import logger +from .utils import ts_utc + +# Global constants ############################################################ +HPSS_ENDPOINT_MAP: Dict[str, str] = { + "ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec", + "NERSC": "9cd89cfd-6d04-11e5-ba46-22000b92c6ec", +} + +# This is used if the `globus_endpoint_uuid` is not set in `~/.zstash.ini` +REGEX_ENDPOINT_MAP: Dict[str, str] = { + r"theta.*\.alcf\.anl\.gov": "08925f04-569f-11e7-bef8-22000b9a448b", + r"blueslogin.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", + r"chrlogin.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", + r"b\d+\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", + r"chr.*\.lcrc\.anl\.gov": "15288284-7006-4041-ba1a-6b52501e49f1", + r"compy.*\.pnl\.gov": "68fbd2fa-83d7-11e9-8e63-029d279f7e24", + r"perlmutter.*\.nersc\.gov": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79", +} + +ZSTASH_CLIENT_ID: str = "6c1629cf-446c-49e7-af95-323c6412397f" + +# State files +GLOBUS_CFG: str = os.path.expanduser("~/.globus-native-apps.cfg") +INI_PATH: str = os.path.expanduser("~/.zstash.ini") +TOKEN_FILE = os.path.expanduser("~/.zstash_globus_tokens.json") + +# Independent functions ####################################################### +# The functions here don't rely on the classes defined in globus.py. + + +# Primarily used by globus_activate ########################################### +def check_state_files(): + if os.path.exists(GLOBUS_CFG): + logger.warning( + f"Globus CFG {GLOBUS_CFG} exists. This may be left over from earlier versions of zstash, and may cause issues. Consider deleting." + ) + + if os.path.exists(INI_PATH): + logger.info( + f"{INI_PATH} exists. We can try to read the local endpoint ID from it." + ) + else: + logger.info( + f"{INI_PATH} does NOT exist. This means we won't be able to read the local endpoint ID from it." + ) + + if os.path.exists(TOKEN_FILE): + logger.info( + f"Token file {TOKEN_FILE} exists. We can try to load tokens from it." + ) + else: + logger.info( + f"Token file {TOKEN_FILE} does NOT exist. This means we won't be able to load tokens from it." + ) + + +def get_local_endpoint_id(local_endpoint_id: Optional[str]) -> str: + ini = configparser.ConfigParser() + if ini.read(INI_PATH): + if "local" in ini.sections(): + local_endpoint_id = ini["local"].get("globus_endpoint_uuid") + logger.info( + f"Setting local_endpoint_id based on {INI_PATH}: {local_endpoint_id}" + ) + else: + ini["local"] = {"globus_endpoint_uuid": ""} + try: + with open(INI_PATH, "w") as f: + ini.write(f) + logger.info(f"Writing to empty {INI_PATH}") + except Exception as e: + logger.error(e) + sys.exit(1) + if not local_endpoint_id: + fqdn = socket.getfqdn() + if re.fullmatch(r"n.*\.local", fqdn) and os.getenv("HOSTNAME", "NA").startswith( + "compy" + ): + fqdn = "compy.pnl.gov" + for pattern in REGEX_ENDPOINT_MAP.keys(): + if re.fullmatch(pattern, fqdn): + local_endpoint_id = REGEX_ENDPOINT_MAP.get(pattern) + logger.info( + f"Setting local_endpoint_id based on FQDN {fqdn}: {local_endpoint_id}" + ) + break + # FQDN is not set on Perlmutter at NERSC + if not local_endpoint_id: + nersc_hostname = os.environ.get("NERSC_HOST") + if nersc_hostname and ( + nersc_hostname == "perlmutter" or nersc_hostname == "unknown" + ): + local_endpoint_id = REGEX_ENDPOINT_MAP.get(r"perlmutter.*\.nersc\.gov") + logger.info( + f"Setting local_endpoint_id based on NERSC_HOST {nersc_hostname}: {local_endpoint_id}" + ) + if not local_endpoint_id: + logger.error( + f"{INI_PATH} does not have the local Globus endpoint set nor could one be found in REGEX_ENDPOINT_MAP." + ) + sys.exit(1) + return local_endpoint_id + + +def get_transfer_client_with_auth( + both_endpoints: List[Optional[str]], +) -> TransferClient: + tokens = load_tokens() + + # Check if we have stored refresh tokens + if "transfer.api.globus.org" in tokens: + token_data = tokens["transfer.api.globus.org"] + if "refresh_token" in token_data: + logger.info("Found stored refresh token - using it") + # Create a simple auth client for the RefreshTokenAuthorizer + auth_client = NativeAppAuthClient(ZSTASH_CLIENT_ID) + transfer_authorizer = RefreshTokenAuthorizer( + refresh_token=token_data["refresh_token"], auth_client=auth_client + ) + transfer_client = TransferClient(authorizer=transfer_authorizer) + return transfer_client + + # No stored tokens, need to authenticate + logger.info("No stored tokens found - starting authentication") + + # Get the required scopes + all_scopes = get_all_endpoint_scopes(both_endpoints) + + # Use the NativeAppAuthClient pattern from the documentation + client = NativeAppAuthClient(ZSTASH_CLIENT_ID) + client.oauth2_start_flow( + requested_scopes=all_scopes, + refresh_tokens=True, # This is the key to persistent auth! + ) + + authorize_url = client.oauth2_get_authorize_url() + print(f"Please go to this URL and login:\n{authorize_url}") + + auth_code = input("Please enter the code you get after login here: ").strip() + token_response = client.oauth2_exchange_code_for_tokens(auth_code) + + # Save tokens for next time + save_tokens(token_response) + + # Get the transfer token and create authorizer + globus_transfer_data = token_response.by_resource_server["transfer.api.globus.org"] + transfer_authorizer = RefreshTokenAuthorizer( + refresh_token=globus_transfer_data["refresh_token"], auth_client=client + ) + + transfer_client = TransferClient(authorizer=transfer_authorizer) + return transfer_client + + +def load_tokens(): + if os.path.exists(TOKEN_FILE): + try: + with open(TOKEN_FILE, "r") as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + return {} + + +def get_all_endpoint_scopes(endpoints: List[Optional[str]]) -> str: + inner = " ".join( + [ + f"*https://auth.globus.org/scopes/{ep}/data_access" + for ep in endpoints + if ep is not None + ] + ) + return f"urn:globus:auth:scope:transfer.api.globus.org:all[{inner}]" + + +def save_tokens(token_response): + tokens_to_save = {} + for resource_server, token_data in token_response.by_resource_server.items(): + tokens_to_save[resource_server] = { + "access_token": token_data["access_token"], + "refresh_token": token_data.get("refresh_token"), + "expires_at": token_data.get("expires_at_seconds"), + } + + with open(TOKEN_FILE, "w") as f: + json.dump(tokens_to_save, f, indent=2) + logger.info("Tokens saved successfully") + + +# Primarily used by globus_transfer ########################################### +def file_exists(archive_directory_listing, name: str) -> bool: + for entry in archive_directory_listing: + if entry.get("name") == name: + return True + return False + + +def set_up_TransferData( + transfer_type: str, + local_endpoint: Optional[str], + remote_endpoint: Optional[str], + remote_path: str, + name: str, + transfer_client: TransferClient, + transfer_data: Optional[TransferData] = None, +) -> TransferData: + if not local_endpoint: + raise ValueError("Local endpoint ID is not set.") + if not remote_endpoint: + raise ValueError("Remote endpoint ID is not set.") + if transfer_type == "get": + src_ep = remote_endpoint + src_path = os.path.join(remote_path, name) + dst_ep = local_endpoint + dst_path = os.path.join(os.getcwd(), name) + else: + src_ep = local_endpoint + src_path = os.path.join(os.getcwd(), name) + dst_ep = remote_endpoint + dst_path = os.path.join(remote_path, name) + + subdir = os.path.basename(os.path.normpath(remote_path)) + subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir) + filename = name.split(".")[0] + label = subdir_label + " " + filename + + if not transfer_data: + transfer_data = TransferData( + transfer_client, + src_ep, + dst_ep, + label=label, + verify_checksum=True, + preserve_timestamp=True, + fail_on_quota_errors=True, + ) + transfer_data.add_item(src_path, dst_path) + transfer_data["label"] = label + return transfer_data + + +def submit_transfer_with_checks(transfer_client, transfer_data) -> GlobusHTTPResponse: + task: GlobusHTTPResponse + try: + task = transfer_client.submit_transfer(transfer_data) + except TransferAPIError as err: + if err.info.consent_required: + logger.error("Consent required - this suggests scope issues.") + logger.error( + "With proper scope handling, this block should not be reached." + ) + logger.error( + "Please report this bug at https://github.com/E3SM-Project/zstash/issues, with details of what you were trying to do." + ) + raise RuntimeError( + "Insufficient Globus consents - please report this bug" + ) from err + else: + raise err + return task + + +def globus_block_wait( + transfer_client: TransferClient, + task_id: str, + wait_timeout: int, + max_retries: int, +): + + # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours + logger.info( + f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" + ) + task_status = "UNKNOWN" + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + logger.info( + f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}" + ) + transfer_client.task_wait( + task_id, timeout=wait_timeout, polling_interval=10 + ) + logger.info(f"{ts_utc()}: done with wait") + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task["status"] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info( + f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + ) + + if retry_count == max_retries: + logger.info( + f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" + ) + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info( + f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" + ) + + return task_status + + +# Primarily used by globus_transfer, globus_finalize ########################## +def globus_wait(transfer_client: TransferClient, task_id: str): + try: + """ + A Globus transfer job (task) can be in one of the three states: + ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a + status of the transfer job (task) from the Globus Transfer service, + with 20 second timeout limit. If the task is ACTIVE after time runs + out 'task_wait' returns False, and True otherwise. + """ + while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): + pass + """ + The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). + Check if the transfer SUCCEEDED or FAILED. + """ + task = transfer_client.get_task(task_id) + if task["status"] == "SUCCEEDED": + src_ep = task["source_endpoint_id"] + dst_ep = task["destination_endpoint_id"] + label = task["label"] + logger.info( + "Globus transfer {}, from {} to {}: {} succeeded".format( + task_id, src_ep, dst_ep, label + ) + ) + else: + logger.error("Transfer FAILED") + except TransferAPIError as e: + if e.code == "NoCredException": + logger.error( + "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( + e.message + ) + ) + else: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.error("Exception: {}".format(e)) + sys.exit(1) diff --git a/zstash/hpss.py b/zstash/hpss.py index 24603388..3003849d 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -2,19 +2,22 @@ import os.path import subprocess -from typing import List +from typing import List, Optional from six.moves.urllib.parse import urlparse from .globus import globus_transfer from .settings import get_db_filename, logger +from .transfer_tracking import ( + GlobusTransferCollection, + HPSSTransferCollection, + delete_transferred_files, +) from .utils import run_command, ts_utc -prev_transfers: List[str] = list() -curr_transfers: List[str] = list() - -def hpss_transfer( +# C901 'hpss_transfer' is too complex (19) +def hpss_transfer( # noqa: C901 hpss: str, file_path: str, transfer_type: str, @@ -22,15 +25,16 @@ def hpss_transfer( keep: bool = False, non_blocking: bool = False, is_index: bool = False, + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ): - global prev_transfers - global curr_transfers - + if not htc: + htc = HPSSTransferCollection() logger.info( - f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {prev_transfers}" + f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {htc.prev_transfers}" ) # logger.debug( - # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" + # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {htc.curr_transfers}" # ) if hpss == "none": @@ -85,9 +89,9 @@ def hpss_transfer( endpoint = url.netloc url_path = url.path - curr_transfers.append(file_path) + htc.curr_transfers.append(file_path) # logger.debug( - # f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" + # f"{ts_utc()}: curr_transfers has been appended to, is now {htc.curr_transfers}" # ) path, name = os.path.split(file_path) @@ -104,16 +108,21 @@ def hpss_transfer( # For `get`, this directory is where the file we get from HPSS will go. os.chdir(path) + globus_status: Optional[str] = "UNKNOWN" if scheme == "globus": - globus_status = "UNKNOWN" + if not gtc: + raise RuntimeError( + "Scheme is 'globus' but no GlobusTransferCollection provided" + ) # Transfer file using the Globus Transfer Service logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") - globus_status = globus_transfer( - endpoint, url_path, name, transfer_type, non_blocking - ) - logger.info( - f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}" - ) + globus_transfer(gtc, endpoint, url_path, name, transfer_type, non_blocking) + mrt = gtc.get_most_recent_transfer() + if mrt: + globus_status = mrt.task_status + logger.info( + f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}), task_id={mrt.task_id}, globus_status={globus_status}" + ) # NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer @@ -133,16 +142,7 @@ def hpss_transfer( if (scheme != "globus") or (globus_status == "SUCCEEDED"): # Note: This is intended to fulfill the default removal of successfully-transfered # tar files when keep=False, irrespective of non-blocking status - logger.debug( - f"{ts_utc()}: deleting transfered files {prev_transfers}" - ) - for src_path in prev_transfers: - os.remove(src_path) - prev_transfers = curr_transfers - curr_transfers = list() - logger.info( - f"{ts_utc()}: prev_transfers has been set to {prev_transfers}" - ) + delete_transferred_files(htc) def hpss_put( @@ -152,17 +152,22 @@ def hpss_put( keep: bool = True, non_blocking: bool = False, is_index=False, + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking, is_index) + hpss_transfer( + hpss, file_path, "put", cache, keep, non_blocking, is_index, gtc=gtc, htc=htc + ) def hpss_get(hpss: str, file_path: str, cache: str): """ Get a file from the HPSS archive. """ + # gtc will get set as part of globus_transfer hpss_transfer(hpss, file_path, "get", cache, False) diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 7f873e47..9a48470b 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -14,6 +14,7 @@ from .hpss import hpss_put from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import create_tars_table, tars_table_exists, ts_utc @@ -67,6 +68,8 @@ def add_files( error_on_duplicate_tar: bool = False, overwrite_duplicate_tars: bool = False, force_database_corruption: str = "", + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ) -> List[str]: # Now, perform the actual archiving @@ -161,7 +164,15 @@ def add_files( logger.info( f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname} [keep, non_blocking] = [{keep}, {non_blocking}]" ) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) + hpss_put( + hpss, + os.path.join(cache, tfname), + cache, + keep, + non_blocking, + gtc=gtc, + htc=htc, + ) logger.info( f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}" ) diff --git a/zstash/transfer_tracking.py b/zstash/transfer_tracking.py new file mode 100644 index 00000000..d022e1e9 --- /dev/null +++ b/zstash/transfer_tracking.py @@ -0,0 +1,54 @@ +import os +from typing import List, Optional + +from globus_sdk import TransferClient, TransferData +from globus_sdk.services.transfer.response.iterable import IterableTransferResponse + +from .settings import logger +from .utils import ts_utc + + +# Globus specific ############################################################# +class GlobusTransfer(object): + def __init__(self): + self.transfer_data: Optional[TransferData] = None + self.task_id: Optional[str] = None + # https://docs.globus.org/api/transfer/task/#task_fields + # ACTIVE, SUCCEEDED, FAILED, INACTIVE + self.task_status: Optional[str] = None + logger.debug(f"{ts_utc()}: GlobusTransfer initialized") + + +class GlobusTransferCollection(object): + def __init__(self): + # Attributes common to all the transfers + self.remote_endpoint: Optional[str] = None + self.local_endpoint: Optional[str] = None + self.transfer_client: Optional[TransferClient] = None + self.archive_directory_listing: Optional[IterableTransferResponse] = None + + self.transfers: List[GlobusTransfer] = ( + [] + ) # TODO: Replace with collections.deque? + self.cumulative_tarfiles_pushed: int = 0 + logger.debug(f"{ts_utc()}: GlobusTransferCollection initialized") + + def get_most_recent_transfer(self) -> Optional[GlobusTransfer]: + return self.transfers[-1] if self.transfers else None + + +# All Transfers ############################################################### +class HPSSTransferCollection(object): + def __init__(self): + self.prev_transfers: List[str] = [] # Can remove + self.curr_transfers: List[str] = [] # Still using! + logger.debug(f"{ts_utc()}: HPSSTransferCollection initialized") + + +def delete_transferred_files(htc: HPSSTransferCollection): + logger.debug(f"{ts_utc()}: deleting transfered files {htc.prev_transfers}") + for src_path in htc.prev_transfers: + os.remove(src_path) + htc.prev_transfers = htc.curr_transfers + htc.curr_transfers = [] + logger.info(f"{ts_utc()}: prev_transfers has been set to {htc.prev_transfers}") diff --git a/zstash/update.py b/zstash/update.py index b0f2af40..87939e0c 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -21,6 +21,7 @@ get_db_filename, logger, ) +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import get_files_to_archive, update_config @@ -29,8 +30,11 @@ def update(): args: argparse.Namespace cache: str args, cache = setup_update() + htc = HPSSTransferCollection() - result: Optional[List[str]] = update_database(args, cache) + result: Optional[List[str]] + gtc: Optional[GlobusTransferCollection] + result, gtc = update_database(args, cache, htc) if result is None: # There was either nothing to update or `--dry-run` was set. @@ -43,9 +47,17 @@ def update(): hpss = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) + hpss_put( + hpss, + get_db_filename(cache), + cache, + keep=args.keep, + is_index=True, + gtc=gtc, + # htc=htc, + ) - globus_finalize(non_blocking=args.non_blocking) + globus_finalize(gtc, htc, non_blocking=args.non_blocking) # List failures if len(failures) > 0: @@ -147,10 +159,11 @@ def setup_update() -> Tuple[argparse.Namespace, str]: # C901 'update_database' is too complex (20) def update_database( # noqa: C901 - args: argparse.Namespace, cache: str -) -> Optional[List[str]]: + args: argparse.Namespace, cache: str, htc: HPSSTransferCollection +) -> Tuple[Optional[List[str]], Optional[GlobusTransferCollection]]: # Open database logger.debug("Opening index database") + gtc: Optional[GlobusTransferCollection] = None if not os.path.exists(get_db_filename(cache)): # The database file doesn't exist in the cache. # We need to retrieve it from HPSS @@ -160,7 +173,7 @@ def update_database( # noqa: C901 hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - globus_activate(hpss) + gtc = globus_activate(hpss) hpss_get(hpss, get_db_filename(cache), cache) else: error_str: str = ( @@ -242,7 +255,7 @@ def update_database( # noqa: C901 # Close database con.commit() con.close() - return None + return None, gtc # --dry-run option if args.dry_run: @@ -252,7 +265,7 @@ def update_database( # noqa: C901 # Close database con.commit() con.close() - return None + return None, gtc # Find last used tar archive itar: int = -1 @@ -263,24 +276,7 @@ def update_database( # noqa: C901 itar = max(itar, int(tfile_string[0:6], 16)) failures: List[str] - if args.follow_symlinks: - try: - # Add files - failures = add_files( - cur, - con, - itar, - newfiles, - cache, - keep, - args.follow_symlinks, - non_blocking=args.non_blocking, - error_on_duplicate_tar=args.error_on_duplicate_tar, - overwrite_duplicate_tars=args.overwrite_duplicate_tars, - ) - except FileNotFoundError: - raise Exception("Archive update failed due to broken symlink.") - else: + try: # Add files failures = add_files( cur, @@ -293,10 +289,17 @@ def update_database( # noqa: C901 non_blocking=args.non_blocking, error_on_duplicate_tar=args.error_on_duplicate_tar, overwrite_duplicate_tars=args.overwrite_duplicate_tars, + gtc=gtc, + htc=htc, ) + except FileNotFoundError as e: + if args.follow_symlinks: + raise Exception("Archive update failed due to broken symlink.") + else: + raise e # Close database con.commit() con.close() - return failures + return failures, gtc