Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tests/scripts_unit_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The unit tests, in script form.

The unit tests are hard to follow via Python,
so it may be easier to run and debug the logic using these scripts.
46 changes: 46 additions & 0 deletions tests/scripts_unit_tests/test_ls_globus.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash

hpss_globus_endpoint="6c54cade-bde5-45c1-bdea-f4bd71dba2cc"
hpss_path="globus://${hpss_globus_endpoint}/~/zstash_test/"
cache=zstash # Set via `self.cache = "zstash"`

# base.setupDirs ##############################################################
test_dir=zstash_test

# Create files and directories
echo "Creating files."
mkdir -p ${test_dir}
mkdir -p ${test_dir}/empty_dir
mkdir -p ${test_dir}/dir

echo "file0 stuff" > ${test_dir}/file0.txt
echo "" > ${test_dir}/file_empty.txt
echo "file1 stuff" > ${test_dir}/dir/file1.txt

# Symbolic (soft) link (points to a file name which points to an inode)
# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt
# The python `os.symlink` call omits the first `test_dir`
# because it simply looks in the same directory for the file to link to.
ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt
# Bad symbolic (soft) link (points to a file name which points to an inode)
ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt
# Hard link (points to an inode directly)
ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt

# base.create #################################################################
echo "Adding files to HPSS"
zstash create --hpss=${hpss_path} ${test_dir}
# Archives 000000.tar
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, index.db

# back in test_globus.helperLsGlobus ##########################################
cd ${test_dir}
zstash ls --hpss=${hpss_path}
cd ..
echo "Cache:"
ls -l ${test_dir}/${cache}
echo "HPSS:"
hsi ls -l ${hpss_path}
90 changes: 90 additions & 0 deletions tests/scripts_unit_tests/test_update_non_empty_hpss.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/bash

hpss_path=zstash_test # Set via `HPSS_ARCHIVE = "zstash_test"`
cache=zstash # Set via `self.cache = "zstash"`

# base.setupDirs ##############################################################
test_dir=zstash_test

# Create files and directories
echo "Creating files."
mkdir -p ${test_dir}
mkdir -p ${test_dir}/empty_dir
mkdir -p ${test_dir}/dir

echo "file0 stuff" > ${test_dir}/file0.txt
echo "" > ${test_dir}/file_empty.txt
echo "file1 stuff" > ${test_dir}/dir/file1.txt

# Symbolic (soft) link (points to a file name which points to an inode)
# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt
# The python `os.symlink` call omits the first `test_dir`
# because it simply looks in the same directory for the file to link to.
ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt
# Bad symbolic (soft) link (points to a file name which points to an inode)
ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt
# Hard link (points to an inode directly)
ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt

# base.create #################################################################
echo "Adding files to HPSS"
zstash create --hpss=${hpss_path} ${test_dir}
# Archives 000000.tar
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, index.db

# base.add_files ##############################################################
echo "Testing update with an actual change"
mkdir -p ${test_dir}/dir2
echo "file2 stuff" > ${test_dir}/dir2/file2.txt
echo "file1 stuff with changes" > ${test_dir}/dir/file1.txt
cd ${test_dir}
zstash update -v --hpss=${hpss_path}
# Archives 000001.tar
cd ..
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, index.db

echo "Adding more files to the HPSS archive."
echo "file3 stuff" > ${test_dir}/file3.txt
cd ${test_dir}
zstash update --hpss=${hpss_path}
# Archives 000002.tar
cd ..
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, index.db

echo "file4 stuff" > ${test_dir}/file4.txt
cd ${test_dir}
zstash update --hpss=${hpss_path}
# Archives 000003.tar
cd ..
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, index.db

echo "file5 stuff" > ${test_dir}/file5.txt
cd ${test_dir}
zstash update --hpss=${hpss_path}
# Archives 000004.tar
cd ..
echo "Cache:"
ls -l ${test_dir}/${cache} # just index.db
echo "HPSS:"
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, 000004.tar, index.db

# back in test_update.helperUpdateNonEmpty ####################################
echo "Cache check actually performed in the unit test:"
ls -l ${test_dir}/${cache} # just index.db

# base.tearDown ###############################################################
echo "Removing test files, both locally and at the HPSS repo"
rm -rf ${test_dir}
hsi rm -R ${hpss_path}
31 changes: 31 additions & 0 deletions tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ def helperUpdateCache(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
)
self.stop(error_message)

def helperUpdateNonEmpty(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Test `zstash update`.
"""
self.hpss_path = hpss_path
use_hpss = self.setupDirs(test_name)
self.create(use_hpss, zstash_path)
self.add_files(use_hpss, zstash_path)
files = os.listdir("{}/{}".format(self.test_dir, self.cache))
if use_hpss:
expected_files = ["index.db"]
else:
expected_files = [
"index.db",
"000003.tar",
"000004.tar",
"000000.tar",
"000001.tar",
"000002.tar",
]
if not compare(files, expected_files):
error_message = f"The zstash cache {self.test_dir}/{self.cache} does not contain expected files.\nIt has: {files}"
self.stop(error_message)

def testUpdate(self):
self.helperUpdate("testUpdate", "none")

Expand Down Expand Up @@ -169,6 +193,13 @@ def testUpdateCacheHPSS(self):
self.conditional_hpss_skip()
self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE)

def testUpdateNonEmpty(self):
self.helperUpdateNonEmpty("testUpdateNonEmpty", "none")

def testUpdateNonEmptyHPSS(self):
self.conditional_hpss_skip()
self.helperUpdateNonEmpty("testUpdateNonEmptyHPSS", HPSS_ARCHIVE)


if __name__ == "__main__":
unittest.main()
4 changes: 2 additions & 2 deletions tests3/README_TEST_BLOCKING
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ working directory:
[CWD]/dst_data/

- destination for Globus transfer of archives.

[CWD]/tmp_cache/

- [Optional] alternative location for tar-file generation.
Expand Down Expand Up @@ -86,7 +86,7 @@ to running Globus transfers.

It is suggested that you run the test script with

test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1
test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1

so that your command prompt returns and you can monitor progress with

Expand Down
2 changes: 0 additions & 2 deletions tests3/snapshot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,3 @@ ls -l src_data/zstash
echo ""
echo "tmp_cache:"
ls -l tmp_cache


1 change: 0 additions & 1 deletion tests3/test_zstash_blocking.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,3 @@ fi
echo "Testing Completed"

exit 0

5 changes: 2 additions & 3 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def create():

# Transfer to HPSS. Always keep a local copy.
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)

logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(non_blocking=args.non_blocking)
Expand Down Expand Up @@ -169,9 +169,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
# Now that we're inside a subcommand, ignore the first two argvs
# (zstash create)
args: argparse.Namespace = parser.parse_args(sys.argv[2:])
if args.hpss and args.hpss.lower() == "none":
if (not args.hpss) or (args.hpss.lower() == "none"):
args.hpss = "none"
if args.non_blocking:
args.keep = True
if args.verbose:
logger.setLevel(logging.DEBUG)
Expand Down
41 changes: 35 additions & 6 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def file_exists(name: str) -> bool:
return False


# TODO: What does gv stand for? Globus something? Global variable?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global Variable. If I must use them, I like to label them as such.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I'm going to expand gv to global_variable then, so it's clear.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. That might discourage people from using them - should be a standard!

gv_tarfiles_pushed = 0


# C901 'globus_transfer' is too complex (20)
def globus_transfer( # noqa: C901
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
Expand All @@ -168,8 +172,10 @@ def globus_transfer( # noqa: C901
global transfer_data
global task_id
global archive_directory_listing
global gv_tarfiles_pushed

logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}")
if not transfer_client:
globus_activate("globus://" + remote_ep)
if not transfer_client:
Expand Down Expand Up @@ -215,7 +221,7 @@ def globus_transfer( # noqa: C901
fail_on_quota_errors=True,
)
transfer_data.add_item(src_path, dst_path)
transfer_data["label"] = subdir_label + " " + filename
transfer_data["label"] = label
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: label is defined to be exactly the same thing above already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

try:
if task_id:
task = transfer_client.get_task(task_id)
Expand All @@ -225,12 +231,12 @@ def globus_transfer( # noqa: C901
# Presently, we do not, except inadvertantly (if status == PENDING)
if prev_task_status == "ACTIVE":
logger.info(
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning."
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE."
)
return "ACTIVE"
elif prev_task_status == "SUCCEEDED":
logger.info(
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing."
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED."
)
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
Expand All @@ -243,15 +249,19 @@ def globus_transfer( # noqa: C901
)
else:
logger.error(
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing."
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}."
)

# DEBUG: review accumulated items in TransferData
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
attribs = transfer_data.__dict__
for item in attribs["data"]["DATA"]:
if item["DATA_TYPE"] == "transfer_item":
print(f" source item: {item['source_path']}")
gv_tarfiles_pushed += 1
print(
f" (routine) PUSHING (#{gv_tarfiles_pushed}) STORED source item: {item['source_path']}",
flush=True,
)

# SUBMIT new transfer here
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
Expand All @@ -263,6 +273,7 @@ def globus_transfer( # noqa: C901
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
)

# Nullify the submitted transfer data structure so that a new one will be created on next call.
transfer_data = None
except TransferAPIError as e:
if e.code == "NoCredException":
Expand Down Expand Up @@ -310,9 +321,13 @@ def globus_block_wait(
while retry_count < max_retries:
try:
# Wait for the task to complete
logger.info(
f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}"
)
transfer_client.task_wait(
task_id, timeout=wait_timeout, polling_interval=10
)
logger.info(f"{ts_utc()}: done with wait")
except Exception as e:
logger.error(f"Unexpected Exception: {e}")
else:
Expand Down Expand Up @@ -350,7 +365,7 @@ def globus_wait(task_id: str):
with 20 second timeout limit. If the task is ACTIVE after time runs
out 'task_wait' returns False, and True otherwise.
"""
while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20):
pass
"""
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
Expand Down Expand Up @@ -387,10 +402,24 @@ def globus_finalize(non_blocking: bool = False):
global transfer_client
global transfer_data
global task_id
global gv_tarfiles_pushed

last_task_id = None

if transfer_data:
# DEBUG: review accumulated items in TransferData
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a note explaining this code block, right? Not a TODO that still needs to be addressed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct!

logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
attribs = transfer_data.__dict__
for item in attribs["data"]["DATA"]:
if item["DATA_TYPE"] == "transfer_item":
gv_tarfiles_pushed += 1
print(
f" (finalize) PUSHING ({gv_tarfiles_pushed}) source item: {item['source_path']}",
flush=True,
)

# SUBMIT new transfer here
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
try:
last_task = submit_transfer_with_checks(transfer_data)
last_task_id = last_task.get("task_id")
Expand Down
Loading
Loading