diff --git a/tests/scripts/test_non_blocking.sh b/tests/scripts/test_non_blocking.sh new file mode 100644 index 00000000..c1dea277 --- /dev/null +++ b/tests/scripts/test_non_blocking.sh @@ -0,0 +1,61 @@ +# First, we have to set up Globus according to https://github.com/E3SM-Project/zstash/discussions/329 +# Log in to Globus +# Authenticate LCRC Improv DTN +# Authenticate NERSC Perlmutter +source /lcrc/soft/climate/e3sm-unified/load_latest_e3sm_unified_chrysalis.sh +cd /home/ac.forsyth2/ez +mkdir zstash_dirs +cd zstash_dirs/ +mkdir zstash_demo; echo 'file0 stuff' > zstash_demo/file0.txt +zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo +# globus_sdk.services.transfer.errors.TransferAPIError: ('POST', 'https://transfer.api.globus.org/v0.10/endpoint/15288284-7006-4041-ba1a-6b52501e49f1/autoactivate?if_expires_in=600', None, 400, 'ClientError.AuthenticationFailed', 'No credentials supplied', 'msYY54WXq') +rm ~/.globus-native-apps.cfg +zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo +# Auth Code prompt appears twice + + +cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions +du -sh v2.NARRM.historical_0151/ +# That's 22 GB. Let's try to compress it with zstash. +cd v2.NARRM.historical_0151/tests + +# From https://docs.e3sm.org/zstash/_build/html/main/usage.html: +# `--maxsize MAXSIZE`` specifies the maximum size (in GB) for tar files. The default is 256 GB. Zstash will create tar files that are smaller than MAXSIZE except when individual input files exceed MAXSIZE (as individual files are never split up between different tar files). +# `--non-blocking` Zstash will submit a Globus transfer and immediately create a subsequent tarball. That is, Zstash will not wait until the transfer completes to start creating a subsequent tarball. On machines where it takes more time to create a tarball than transfer it, each Globus transfer will have one file. On machines where it takes less time to create a tarball than transfer it, the first transfer will have one file, but the number of tarballs in subsequent transfers will grow finding dynamically the most optimal number of tarballs per transfer. NOTE: zstash is currently always non-blocking. + +# Make maxsize 1 GB. This will create a new tar after every 1 GB of data. +zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v1 --maxsize 1 . + +# DEBUG: Closing tar archive 000000.tar +# INFO: Creating new tar archive 000001.tar + +# In a different window: +ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash +# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db + +# So, we can clearly see the tars are being created immediately. +# On the Globus website, test_290_v1 000000 transfer is complete. +# And test_290_v1 000001 transfer is in progress. + +# This is the `--non-blocking` behavior, even though we did not specify it. + +# Now, with changes in this PR: +conda activate zstash_dev_issue_290 +cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests +rm -rf zstash +zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v2 --maxsize 1 --non-blocking . +# DEBUG: Closing tar archive 000000.tar +# INFO: Creating new tar archive 000001.tar +# In a different window: +ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash +# 000000.tar 000001.tar index.db +# # On the Globus website, test_290_v1 000000 transfer is complete. +# And test_290_v1 000001 transfer is in progress. +ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash +# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db +ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash +000000.tar 000002.tar 000004.tar 000006.tar 000008.tar 00000a.tar +000001.tar 000003.tar 000005.tar 000007.tar 000009.tar index.db +# Command completed on the command line +# But on Globus website: +# Completed -- test_290_v2 000000, test_290_v2 000001, test_290_v2 index diff --git a/zstash/create.py b/zstash/create.py index e8819278..6ec7cb18 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -87,7 +87,9 @@ def create(): failures: List[str] = create_database(cache, args) # Transfer to HPSS. Always keep a local copy. - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put( + hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking + ) globus_finalize(non_blocking=args.non_blocking) @@ -254,6 +256,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive creation failed due to broken symlink.") @@ -268,6 +271,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) # Close database diff --git a/zstash/globus.py b/zstash/globus.py index 964baf9b..91dba631 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -158,7 +158,7 @@ def file_exists(name: str) -> bool: def globus_transfer( - remote_ep: str, remote_path: str, name: str, transfer_type: str + remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool ): # noqa: C901 global transfer_client global local_endpoint @@ -247,7 +247,10 @@ def globus_transfer( sys.exit(1) if transfer_type == "get" and task_id: - globus_wait(task_id) + # non_blocking => do not wait for the last transfer to finish before creating a new tar + # not non_blocking => blocking => wait for the last transfer to finish before creating a new tar + if not non_blocking: + globus_wait(task_id) def globus_wait(task_id: str): @@ -319,6 +322,8 @@ def globus_finalize(non_blocking: bool = False): logger.error("Exception: {}".format(e)) sys.exit(1) + # non_blocking => do not wait for the last transfer to finish before creating a new tar + # not non_blocking => blocking => wait for the last transfer to finish before creating a new tar if not non_blocking: if task_id: globus_wait(task_id) diff --git a/zstash/hpss.py b/zstash/hpss.py index a055fe99..b60805f7 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -17,6 +17,7 @@ def hpss_transfer( transfer_type: str, cache: str, keep: bool = False, + non_blocking: bool = False, ): if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) @@ -87,7 +88,7 @@ def hpss_transfer( if scheme == "globus": # Transfer file using the Globus Transfer Service - globus_transfer(endpoint, url_path, name, transfer_type) + globus_transfer(endpoint, url_path, name, transfer_type, non_blocking) else: # Transfer file using `hsi` command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) @@ -104,11 +105,13 @@ def hpss_transfer( os.remove(file_path) -def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True): +def hpss_put( + hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False +): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep) + hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) def hpss_get(hpss: str, file_path: str, cache: str): diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 456e6a52..bbded0ce 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -63,6 +63,7 @@ def add_files( keep: bool, follow_symlinks: bool, skip_tars_md5: bool = False, + non_blocking: bool = False, ) -> List[str]: # Now, perform the actual archiving @@ -156,7 +157,7 @@ def add_files( hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep) + hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) # Update database with files that have been archived # Add a row to the "files" table, diff --git a/zstash/update.py b/zstash/update.py index 56095897..ab1d0158 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -43,7 +43,9 @@ def update(): hpss = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put( + hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking + ) globus_finalize(non_blocking=args.non_blocking) @@ -242,14 +244,28 @@ def update_database( # noqa: C901 try: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive update failed due to broken symlink.") else: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) # Close database