Skip to content

Commit a5b1697

Browse files
committed
zstash create refactored
1 parent 8fa5ed5 commit a5b1697

File tree

4 files changed

+85
-114
lines changed

4 files changed

+85
-114
lines changed

zstash/create.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ def create():
8080

8181
# Transfer to HPSS. Always keep a local copy of the database.
8282
logger.debug(f"{ts_utc()}: calling hpss_put() for {command_info.get_db_name()}")
83-
# TODO: (A) Continue refactoring from here
8483
hpss_put(command_info, command_info.get_db_name(), is_index=True)
8584

86-
logger.debug(f"{ts_utc()}: calling globus_finalize()")
87-
globus_finalize(non_blocking=args.non_blocking)
85+
if command_info.globus_info:
86+
logger.debug(f"{ts_utc()}: calling globus_finalize()")
87+
globus_finalize(command_info.globus_info, non_blocking=args.non_blocking)
8888

8989
if len(failures) > 0:
9090
# List the failures
9191
logger.warning("Some files could not be archived")
9292
for file_path in failures:
93-
logger.error("Failed to archive {}".format(file_path))
93+
logger.error(f"Failed to archive {file_path}")
9494

9595

9696
def setup_create(ci: CommandInfo) -> argparse.Namespace:

zstash/globus.py

Lines changed: 64 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,6 @@
3131
r"perlmutter.*\.nersc\.gov": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79",
3232
}
3333

34-
remote_endpoint = None
35-
local_endpoint = None
36-
transfer_client: TransferClient = None
37-
transfer_data: TransferData = None
38-
task_id = None
39-
archive_directory_listing: IterableTransferResponse = None
40-
4134

4235
def check_endpoint_version_5(ep_id):
4336
output = transfer_client.get_endpoint(ep_id)
@@ -49,13 +42,13 @@ def check_endpoint_version_5(ep_id):
4942
return False
5043

5144

52-
def submit_transfer_with_checks(transfer_data):
45+
def submit_transfer_with_checks(globus_info: GlobusInfo):
5346
try:
54-
task = transfer_client.submit_transfer(transfer_data)
47+
task = globus_info.transfer_client.submit_transfer(globus_info.transfer_data)
5548
except TransferAPIError as err:
5649
if err.info.consent_required:
5750
scopes = "urn:globus:auth:scope:transfer.api.globus.org:all["
58-
for ep_id in [remote_endpoint, local_endpoint]:
51+
for ep_id in [globus_info.remote_endpoint, globus_info.local_endpoint]:
5952
if check_endpoint_version_5(ep_id):
6053
scopes += f" *https://auth.globus.org/scopes/{ep_id}/data_access"
6154
scopes += " ]"
@@ -147,72 +140,61 @@ def file_exists(name: str) -> bool:
147140
return False
148141

149142

150-
global_variable_tarfiles_pushed = 0
151-
152-
153143
# C901 'globus_transfer' is too complex (20)
154144
def globus_transfer( # noqa: C901
155-
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
145+
globus_info: GlobusInfo, remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
156146
):
157-
global transfer_client
158-
global local_endpoint
159-
global remote_endpoint
160-
global transfer_data
161-
global task_id
162-
global archive_directory_listing
163-
global global_variable_tarfiles_pushed
164147

165148
logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
166149
logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}")
167-
if not transfer_client:
150+
if not globus_info.transfer_client:
168151
globus_activate("globus://" + remote_ep)
169-
if not transfer_client:
152+
if not globus_info.transfer_client:
153+
logger.info(f"{ts_utc()}: Could not instantiate transfer client.")
170154
sys.exit(1)
171155

172156
if transfer_type == "get":
173-
if not archive_directory_listing:
174-
archive_directory_listing = transfer_client.operation_ls(
175-
remote_endpoint, remote_path
157+
if not globus_info.archive_directory_listing:
158+
globus_info.archive_directory_listing = globus_info.transfer_client.operation_ls(
159+
globus_info.remote_endpoint, remote_path
176160
)
177161
if not file_exists(name):
178162
logger.error(
179-
"Remote file globus://{}{}/{} does not exist".format(
180-
remote_ep, remote_path, name
181-
)
163+
f"Remote file globus://{remote_ep}{remote_path}/{name} does not exist"
182164
)
183165
sys.exit(1)
184166

185167
if transfer_type == "get":
186-
src_ep = remote_endpoint
168+
src_ep = globus_info.remote_endpoint
187169
src_path = os.path.join(remote_path, name)
188-
dst_ep = local_endpoint
170+
dst_ep = globus_info.local_endpoint
189171
dst_path = os.path.join(os.getcwd(), name)
190172
else:
191-
src_ep = local_endpoint
173+
src_ep = globus_info.local_endpoint
192174
src_path = os.path.join(os.getcwd(), name)
193-
dst_ep = remote_endpoint
175+
dst_ep = globus_info.remote_endpoint
194176
dst_path = os.path.join(remote_path, name)
195177

196178
subdir = os.path.basename(os.path.normpath(remote_path))
197179
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
198180
filename = name.split(".")[0]
199181
label = subdir_label + " " + filename
200182

201-
if not transfer_data:
202-
transfer_data = TransferData(
203-
transfer_client,
183+
if not globus_info.transfer_data:
184+
globus_info.transfer_data = TransferData(
185+
globus_info.transfer_client,
204186
src_ep,
205187
dst_ep,
206188
label=label,
207189
verify_checksum=True,
208190
preserve_timestamp=True,
209191
fail_on_quota_errors=True,
210192
)
211-
transfer_data.add_item(src_path, dst_path)
212-
transfer_data["label"] = label
193+
globus_info.transfer_data.add_item(src_path, dst_path)
194+
globus_info.transfer_data["label"] = label
213195
try:
214-
if task_id:
215-
task = transfer_client.get_task(task_id)
196+
if globus_info.task_id:
197+
task = globus_info.transfer_client.get_task(globus_info.task_id)
216198
prev_task_status = task["status"]
217199
# one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE}
218200
# NOTE: How we behave here depends upon whether we want to support mutliple active transfers.
@@ -231,78 +213,73 @@ def globus_transfer( # noqa: C901
231213
label = task["label"]
232214
ts = ts_utc()
233215
logger.info(
234-
"{}:Globus transfer {}, from {} to {}: {} succeeded".format(
235-
ts, task_id, src_ep, dst_ep, label
236-
)
216+
f"{ts}:Globus transfer {globus_info.task_id}, from {src_ep} to {dst_ep}: {label} succeeded"
237217
)
238218
else:
239219
logger.error(
240-
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}."
220+
f"{ts_utc()}: Previous task_id {globus_info.task_id} status = {prev_task_status}."
241221
)
242222

243223
# DEBUG: review accumulated items in TransferData
244224
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
245-
attribs = transfer_data.__dict__
225+
attribs = globus_info.transfer_data.__dict__
246226
for item in attribs["data"]["DATA"]:
247227
if item["DATA_TYPE"] == "transfer_item":
248-
global_variable_tarfiles_pushed += 1
228+
globus_info.tarfiles_pushed += 1
249229
print(
250-
f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}",
230+
f" (routine) PUSHING (#{globus_info.tarfiles_pushed}) STORED source item: {item['source_path']}",
251231
flush=True,
252232
)
253233

254234
# SUBMIT new transfer here
255235
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
256-
task = submit_transfer_with_checks(transfer_data)
257-
task_id = task.get("task_id")
236+
task = submit_transfer_with_checks(globus_info)
237+
globus_info.task_id = task.get("task_id")
258238
# NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer,
259239
# the "lable" given here refers only to the LAST tarfile in the TransferData list.
260240
logger.info(
261-
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
241+
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {globus_info.task_id} for label {transfer_data['label']}"
262242
)
263243

264244
# Nullify the submitted transfer data structure so that a new one will be created on next call.
265-
transfer_data = None
245+
globus_info.transfer_data = None
266246
except TransferAPIError as e:
267247
if e.code == "NoCredException":
268248
logger.error(
269-
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
270-
e.message
271-
)
249+
f"{e.message}. Please go to https://app.globus.org/endpoints and activate the endpoint."
272250
)
273251
else:
274252
logger.error(e)
275253
sys.exit(1)
276254
except Exception as e:
277-
logger.error("Exception: {}".format(e))
255+
logger.error(f"Exception: {e}")
278256
sys.exit(1)
279257

280258
# test for blocking on new task_id
281259
task_status = "UNKNOWN"
282260
if not non_blocking:
283261
task_status = globus_block_wait(
284-
task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5
262+
globus_info, wait_timeout=7200, polling_interval=10, max_retries=5
285263
)
286264
else:
287-
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}")
265+
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {globus_info.task_id}")
288266

289267
if transfer_type == "put":
290268
return task_status
291269

292270
if transfer_type == "get" and task_id:
293-
globus_wait(task_id)
271+
globus_wait(globus_info)
294272

295273
return task_status
296274

297275

298276
def globus_block_wait(
299-
task_id: str, wait_timeout: int, polling_interval: int, max_retries: int
277+
globus_info: GlobusInfo, wait_timeout: int, polling_interval: int, max_retries: int
300278
):
301-
global transfer_client
302279

303280
# poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours
304281
logger.info(
305-
f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}"
282+
f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {globus_info.task_id}"
306283
)
307284
task_status = "UNKNOWN"
308285
retry_count = 0
@@ -312,14 +289,14 @@ def globus_block_wait(
312289
logger.info(
313290
f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}"
314291
)
315-
transfer_client.task_wait(
316-
task_id, timeout=wait_timeout, polling_interval=10
292+
globus_info.transfer_client.task_wait(
293+
globus_info.task_id, timeout=wait_timeout, polling_interval=10
317294
)
318295
logger.info(f"{ts_utc()}: done with wait")
319296
except Exception as e:
320297
logger.error(f"Unexpected Exception: {e}")
321298
else:
322-
curr_task = transfer_client.get_task(task_id)
299+
curr_task = globus_info.transfer_client.get_task(task_id)
323300
task_status = curr_task["status"]
324301
if task_status == "SUCCEEDED":
325302
break
@@ -336,15 +313,17 @@ def globus_block_wait(
336313
task_status = "EXHAUSTED_TIMEOUT_RETRIES"
337314

338315
logger.info(
339-
f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}"
316+
f"{ts_utc()}: BLOCKING ENDS: task_id {globus_info.task_id} returned from task_wait with status {task_status}"
340317
)
341318

342319
return task_status
343320

344321

345-
def globus_wait(task_id: str):
346-
global transfer_client
347-
322+
def globus_wait(globus_info: GlobusInfo, alternative_task_id=None):
323+
if alternative_task_id:
324+
task_id = alternative_task_id
325+
else:
326+
task_id = globus_info.task_id
348327
try:
349328
"""
350329
A Globus transfer job (task) can be in one of the three states:
@@ -353,80 +332,69 @@ def globus_wait(task_id: str):
353332
with 20 second timeout limit. If the task is ACTIVE after time runs
354333
out 'task_wait' returns False, and True otherwise.
355334
"""
356-
while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20):
335+
while not globus_info.transfer_client.task_wait(task_id, timeout=300, polling_interval=20):
357336
pass
358337
"""
359338
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
360339
Check if the transfer SUCCEEDED or FAILED.
361340
"""
362-
task = transfer_client.get_task(task_id)
341+
task = globus_info.transfer_client.get_task(task_id)
363342
if task["status"] == "SUCCEEDED":
364343
src_ep = task["source_endpoint_id"]
365344
dst_ep = task["destination_endpoint_id"]
366345
label = task["label"]
367346
logger.info(
368-
"Globus transfer {}, from {} to {}: {} succeeded".format(
369-
task_id, src_ep, dst_ep, label
370-
)
347+
f"Globus transfer {task_id}, from {src_ep} to {dst_ep}: {label} succeeded"
371348
)
372349
else:
373350
logger.error("Transfer FAILED")
374351
except TransferAPIError as e:
375352
if e.code == "NoCredException":
376353
logger.error(
377-
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
378-
e.message
379-
)
354+
f"{e.message}. Please go to https://app.globus.org/endpoints and activate the endpoint."
380355
)
381356
else:
382357
logger.error(e)
383358
sys.exit(1)
384359
except Exception as e:
385-
logger.error("Exception: {}".format(e))
360+
logger.error(f"Exception: {e}")
386361
sys.exit(1)
387362

388363

389-
def globus_finalize(non_blocking: bool = False):
390-
global transfer_client
391-
global transfer_data
392-
global task_id
393-
global global_variable_tarfiles_pushed
394-
364+
def globus_finalize(globus_info: GlobusInfo, non_blocking: bool = False):
395365
last_task_id = None
396366

397-
if transfer_data:
367+
if globus_info.transfer_data:
398368
# DEBUG: review accumulated items in TransferData
399369
logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
400-
attribs = transfer_data.__dict__
370+
attribs = globus_info.transfer_data.__dict__
401371
for item in attribs["data"]["DATA"]:
402372
if item["DATA_TYPE"] == "transfer_item":
403-
global_variable_tarfiles_pushed += 1
373+
globus_info.tarfiles_pushed += 1
404374
print(
405-
f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}",
375+
f" (finalize) PUSHING ({globus_info.tarfiles_pushed}) source item: {item['source_path']}",
406376
flush=True,
407377
)
408378

409379
# SUBMIT new transfer here
410-
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
380+
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {globus_info.transfer_data['label']}")
411381
try:
412-
last_task = submit_transfer_with_checks(transfer_data)
382+
last_task = submit_transfer_with_checks(globus_info.transfer_data)
413383
last_task_id = last_task.get("task_id")
414384
except TransferAPIError as e:
415385
if e.code == "NoCredException":
416386
logger.error(
417-
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
418-
e.message
419-
)
387+
f"{e.message}. Please go to https://app.globus.org/endpoints and activate the endpoint."
420388
)
421389
else:
422390
logger.error(e)
423391
sys.exit(1)
424392
except Exception as e:
425-
logger.error("Exception: {}".format(e))
393+
logger.error(f"Exception: {e}")
426394
sys.exit(1)
427395

428396
if not non_blocking:
429-
if task_id:
430-
globus_wait(task_id)
397+
if globus_info.task_id:
398+
globus_wait(globus_info)
431399
if last_task_id:
432-
globus_wait(last_task_id)
400+
globus_wait(globus_info, last_task_id)

0 commit comments

Comments
 (0)