Skip to content

Commit c96e591

Browse files
authored
Merge pull request #363 from E3SM-Project/non-block-testing-fix
Non block testing fix
2 parents 0fb1996 + 78476eb commit c96e591

File tree

9 files changed

+280
-21
lines changed

9 files changed

+280
-21
lines changed

tests/scripts_unit_tests/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
The unit tests, in script form.
2+
3+
The unit tests are hard to follow via Python,
4+
so it may be easier to run and debug the logic using these scripts.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/bin/bash
2+
3+
hpss_globus_endpoint="6c54cade-bde5-45c1-bdea-f4bd71dba2cc"
4+
hpss_path="globus://${hpss_globus_endpoint}/~/zstash_test/"
5+
cache=zstash # Set via `self.cache = "zstash"`
6+
7+
# base.setupDirs ##############################################################
8+
test_dir=zstash_test
9+
10+
# Create files and directories
11+
echo "Creating files."
12+
mkdir -p ${test_dir}
13+
mkdir -p ${test_dir}/empty_dir
14+
mkdir -p ${test_dir}/dir
15+
16+
echo "file0 stuff" > ${test_dir}/file0.txt
17+
echo "" > ${test_dir}/file_empty.txt
18+
echo "file1 stuff" > ${test_dir}/dir/file1.txt
19+
20+
# Symbolic (soft) link (points to a file name which points to an inode)
21+
# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt
22+
# The python `os.symlink` call omits the first `test_dir`
23+
# because it simply looks in the same directory for the file to link to.
24+
ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt
25+
# Bad symbolic (soft) link (points to a file name which points to an inode)
26+
ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt
27+
# Hard link (points to an inode directly)
28+
ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt
29+
30+
# base.create #################################################################
31+
echo "Adding files to HPSS"
32+
zstash create --hpss=${hpss_path} ${test_dir}
33+
# Archives 000000.tar
34+
echo "Cache:"
35+
ls -l ${test_dir}/${cache} # just index.db
36+
echo "HPSS:"
37+
hsi ls -l ${hpss_path} # 000000.tar, index.db
38+
39+
# back in test_globus.helperLsGlobus ##########################################
40+
cd ${test_dir}
41+
zstash ls --hpss=${hpss_path}
42+
cd ..
43+
echo "Cache:"
44+
ls -l ${test_dir}/${cache}
45+
echo "HPSS:"
46+
hsi ls -l ${hpss_path}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#!/bin/bash
2+
3+
hpss_path=zstash_test # Set via `HPSS_ARCHIVE = "zstash_test"`
4+
cache=zstash # Set via `self.cache = "zstash"`
5+
6+
# base.setupDirs ##############################################################
7+
test_dir=zstash_test
8+
9+
# Create files and directories
10+
echo "Creating files."
11+
mkdir -p ${test_dir}
12+
mkdir -p ${test_dir}/empty_dir
13+
mkdir -p ${test_dir}/dir
14+
15+
echo "file0 stuff" > ${test_dir}/file0.txt
16+
echo "" > ${test_dir}/file_empty.txt
17+
echo "file1 stuff" > ${test_dir}/dir/file1.txt
18+
19+
# Symbolic (soft) link (points to a file name which points to an inode)
20+
# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt
21+
# The python `os.symlink` call omits the first `test_dir`
22+
# because it simply looks in the same directory for the file to link to.
23+
ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt
24+
# Bad symbolic (soft) link (points to a file name which points to an inode)
25+
ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt
26+
# Hard link (points to an inode directly)
27+
ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt
28+
29+
# base.create #################################################################
30+
echo "Adding files to HPSS"
31+
zstash create --hpss=${hpss_path} ${test_dir}
32+
# Archives 000000.tar
33+
echo "Cache:"
34+
ls -l ${test_dir}/${cache} # just index.db
35+
echo "HPSS:"
36+
hsi ls -l ${hpss_path} # 000000.tar, index.db
37+
38+
# base.add_files ##############################################################
39+
echo "Testing update with an actual change"
40+
mkdir -p ${test_dir}/dir2
41+
echo "file2 stuff" > ${test_dir}/dir2/file2.txt
42+
echo "file1 stuff with changes" > ${test_dir}/dir/file1.txt
43+
cd ${test_dir}
44+
zstash update -v --hpss=${hpss_path}
45+
# Archives 000001.tar
46+
cd ..
47+
echo "Cache:"
48+
ls -l ${test_dir}/${cache} # just index.db
49+
echo "HPSS:"
50+
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, index.db
51+
52+
echo "Adding more files to the HPSS archive."
53+
echo "file3 stuff" > ${test_dir}/file3.txt
54+
cd ${test_dir}
55+
zstash update --hpss=${hpss_path}
56+
# Archives 000002.tar
57+
cd ..
58+
echo "Cache:"
59+
ls -l ${test_dir}/${cache} # just index.db
60+
echo "HPSS:"
61+
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, index.db
62+
63+
echo "file4 stuff" > ${test_dir}/file4.txt
64+
cd ${test_dir}
65+
zstash update --hpss=${hpss_path}
66+
# Archives 000003.tar
67+
cd ..
68+
echo "Cache:"
69+
ls -l ${test_dir}/${cache} # just index.db
70+
echo "HPSS:"
71+
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, index.db
72+
73+
echo "file5 stuff" > ${test_dir}/file5.txt
74+
cd ${test_dir}
75+
zstash update --hpss=${hpss_path}
76+
# Archives 000004.tar
77+
cd ..
78+
echo "Cache:"
79+
ls -l ${test_dir}/${cache} # just index.db
80+
echo "HPSS:"
81+
hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, 000004.tar, index.db
82+
83+
# back in test_update.helperUpdateNonEmpty ####################################
84+
echo "Cache check actually performed in the unit test:"
85+
ls -l ${test_dir}/${cache} # just index.db
86+
87+
# base.tearDown ###############################################################
88+
echo "Removing test files, both locally and at the HPSS repo"
89+
rm -rf ${test_dir}
90+
hsi rm -R ${hpss_path}

tests/test_update.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,30 @@ def helperUpdateCache(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
141141
)
142142
self.stop(error_message)
143143

144+
def helperUpdateNonEmpty(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
145+
"""
146+
Test `zstash update`.
147+
"""
148+
self.hpss_path = hpss_path
149+
use_hpss = self.setupDirs(test_name)
150+
self.create(use_hpss, zstash_path)
151+
self.add_files(use_hpss, zstash_path)
152+
files = os.listdir("{}/{}".format(self.test_dir, self.cache))
153+
if use_hpss:
154+
expected_files = ["index.db"]
155+
else:
156+
expected_files = [
157+
"index.db",
158+
"000003.tar",
159+
"000004.tar",
160+
"000000.tar",
161+
"000001.tar",
162+
"000002.tar",
163+
]
164+
if not compare(files, expected_files):
165+
error_message = f"The zstash cache {self.test_dir}/{self.cache} does not contain expected files.\nIt has: {files}"
166+
self.stop(error_message)
167+
144168
def testUpdate(self):
145169
self.helperUpdate("testUpdate", "none")
146170

@@ -169,6 +193,13 @@ def testUpdateCacheHPSS(self):
169193
self.conditional_hpss_skip()
170194
self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE)
171195

196+
def testUpdateNonEmpty(self):
197+
self.helperUpdateNonEmpty("testUpdateNonEmpty", "none")
198+
199+
def testUpdateNonEmptyHPSS(self):
200+
self.conditional_hpss_skip()
201+
self.helperUpdateNonEmpty("testUpdateNonEmptyHPSS", HPSS_ARCHIVE)
202+
172203

173204
if __name__ == "__main__":
174205
unittest.main()

zstash/create.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def create():
9292

9393
# Transfer to HPSS. Always keep a local copy.
9494
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
95-
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
95+
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
9696

9797
logger.debug(f"{ts_utc()}: calling globus_finalize()")
9898
globus_finalize(non_blocking=args.non_blocking)
@@ -169,9 +169,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
169169
# Now that we're inside a subcommand, ignore the first two argvs
170170
# (zstash create)
171171
args: argparse.Namespace = parser.parse_args(sys.argv[2:])
172-
if args.hpss and args.hpss.lower() == "none":
172+
if (not args.hpss) or (args.hpss.lower() == "none"):
173173
args.hpss = "none"
174-
if args.non_blocking:
175174
args.keep = True
176175
if args.verbose:
177176
logger.setLevel(logging.DEBUG)

zstash/globus.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ def file_exists(name: str) -> bool:
158158
return False
159159

160160

161+
global_variable_tarfiles_pushed = 0
162+
163+
161164
# C901 'globus_transfer' is too complex (20)
162165
def globus_transfer( # noqa: C901
163166
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
@@ -168,8 +171,10 @@ def globus_transfer( # noqa: C901
168171
global transfer_data
169172
global task_id
170173
global archive_directory_listing
174+
global global_variable_tarfiles_pushed
171175

172176
logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
177+
logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}")
173178
if not transfer_client:
174179
globus_activate("globus://" + remote_ep)
175180
if not transfer_client:
@@ -215,7 +220,7 @@ def globus_transfer( # noqa: C901
215220
fail_on_quota_errors=True,
216221
)
217222
transfer_data.add_item(src_path, dst_path)
218-
transfer_data["label"] = subdir_label + " " + filename
223+
transfer_data["label"] = label
219224
try:
220225
if task_id:
221226
task = transfer_client.get_task(task_id)
@@ -225,12 +230,12 @@ def globus_transfer( # noqa: C901
225230
# Presently, we do not, except inadvertantly (if status == PENDING)
226231
if prev_task_status == "ACTIVE":
227232
logger.info(
228-
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning."
233+
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE."
229234
)
230235
return "ACTIVE"
231236
elif prev_task_status == "SUCCEEDED":
232237
logger.info(
233-
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing."
238+
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED."
234239
)
235240
src_ep = task["source_endpoint_id"]
236241
dst_ep = task["destination_endpoint_id"]
@@ -243,15 +248,19 @@ def globus_transfer( # noqa: C901
243248
)
244249
else:
245250
logger.error(
246-
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing."
251+
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}."
247252
)
248253

249254
# DEBUG: review accumulated items in TransferData
250255
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
251256
attribs = transfer_data.__dict__
252257
for item in attribs["data"]["DATA"]:
253258
if item["DATA_TYPE"] == "transfer_item":
254-
print(f" source item: {item['source_path']}")
259+
global_variable_tarfiles_pushed += 1
260+
print(
261+
f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}",
262+
flush=True,
263+
)
255264

256265
# SUBMIT new transfer here
257266
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
@@ -263,6 +272,7 @@ def globus_transfer( # noqa: C901
263272
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
264273
)
265274

275+
# Nullify the submitted transfer data structure so that a new one will be created on next call.
266276
transfer_data = None
267277
except TransferAPIError as e:
268278
if e.code == "NoCredException":
@@ -310,9 +320,13 @@ def globus_block_wait(
310320
while retry_count < max_retries:
311321
try:
312322
# Wait for the task to complete
323+
logger.info(
324+
f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}"
325+
)
313326
transfer_client.task_wait(
314327
task_id, timeout=wait_timeout, polling_interval=10
315328
)
329+
logger.info(f"{ts_utc()}: done with wait")
316330
except Exception as e:
317331
logger.error(f"Unexpected Exception: {e}")
318332
else:
@@ -350,7 +364,7 @@ def globus_wait(task_id: str):
350364
with 20 second timeout limit. If the task is ACTIVE after time runs
351365
out 'task_wait' returns False, and True otherwise.
352366
"""
353-
while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
367+
while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20):
354368
pass
355369
"""
356370
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
@@ -387,10 +401,24 @@ def globus_finalize(non_blocking: bool = False):
387401
global transfer_client
388402
global transfer_data
389403
global task_id
404+
global global_variable_tarfiles_pushed
390405

391406
last_task_id = None
392407

393408
if transfer_data:
409+
# DEBUG: review accumulated items in TransferData
410+
logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
411+
attribs = transfer_data.__dict__
412+
for item in attribs["data"]["DATA"]:
413+
if item["DATA_TYPE"] == "transfer_item":
414+
global_variable_tarfiles_pushed += 1
415+
print(
416+
f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}",
417+
flush=True,
418+
)
419+
420+
# SUBMIT new transfer here
421+
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
394422
try:
395423
last_task = submit_transfer_with_checks(transfer_data)
396424
last_task_id = last_task.get("task_id")

0 commit comments

Comments
 (0)