Skip to content

Commit f8fed5e

Browse files
committed
Remove global variables from globus.py
1 parent ad42c10 commit f8fed5e

File tree

5 files changed

+167
-130
lines changed

5 files changed

+167
-130
lines changed

zstash/create.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
import os.path
77
import sqlite3
88
import sys
9-
from typing import Any, List, Tuple
9+
from typing import Any, List, Optional, Tuple
1010

1111
from six.moves.urllib.parse import urlparse
1212

13-
from .globus import globus_activate, globus_finalize
13+
from .globus import GlobusTransferCollection, globus_activate, globus_finalize
1414
from .hpss import hpss_put
1515
from .hpss_utils import add_files
1616
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
@@ -52,12 +52,13 @@ def create():
5252
logger.error(input_path_error_str)
5353
raise NotADirectoryError(input_path_error_str)
5454

55+
gtc: Optional[GlobusTransferCollection] = None
5556
if hpss != "none":
5657
url = urlparse(hpss)
5758
if url.scheme == "globus":
5859
# identify globus endpoints
59-
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
60-
globus_activate(hpss)
60+
logger.debug(f"{ts_utc()}:Calling globus_activate()")
61+
gtc = globus_activate(hpss)
6162
else:
6263
# config.hpss is not "none", so we need to
6364
# create target HPSS directory
@@ -92,10 +93,12 @@ def create():
9293

9394
# Transfer to HPSS. Always keep a local copy.
9495
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
95-
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
96+
hpss_put(
97+
hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True, gtc=gtc
98+
)
9699

97100
logger.debug(f"{ts_utc()}: calling globus_finalize()")
98-
globus_finalize(non_blocking=args.non_blocking)
101+
globus_finalize(gtc, non_blocking=args.non_blocking)
99102

100103
if len(failures) > 0:
101104
# List the failures

zstash/globus.py

Lines changed: 123 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -19,151 +19,160 @@
1919
from .settings import logger
2020
from .utils import ts_utc
2121

22-
remote_endpoint = None
23-
local_endpoint = None
24-
transfer_client: TransferClient = None
25-
transfer_data: TransferData = None
26-
task_id = None
27-
archive_directory_listing: IterableTransferResponse = None
28-
29-
30-
def globus_activate(hpss: str):
31-
"""
32-
Read the local globus endpoint UUID from ~/.zstash.ini.
33-
If the ini file does not exist, create an ini file with empty values,
34-
and try to find the local endpoint UUID based on the FQDN
35-
"""
36-
global transfer_client
37-
global local_endpoint
38-
global remote_endpoint
3922

23+
class GlobusTransfer(object):
24+
def __init__(self):
25+
self.transfer_data: Optional[TransferData] = None
26+
self.task_id: Optional[str] = None
27+
# https://docs.globus.org/api/transfer/task/#task_fields
28+
# ACTIVE, SUCCEEDED, FAILED, INACTIVE
29+
self.task_status: Optional[str] = None
30+
31+
32+
class GlobusTransferCollection(object):
33+
def __init__(self):
34+
# Attributes common to all the transfers
35+
self.remote_endpoint: Optional[str] = None
36+
self.local_endpoint: Optional[str] = None
37+
self.transfer_client: Optional[TransferClient] = None
38+
self.archive_directory_listing: Optional[IterableTransferResponse] = None
39+
40+
self.transfers: List[GlobusTransfer] = (
41+
[]
42+
) # TODO: Replace with collections.deque?
43+
self.cumulative_tarfiles_pushed: int = 0
44+
45+
def get_most_recent_transfer(self) -> Optional[GlobusTransfer]:
46+
return self.transfers[-1] if self.transfers else None
47+
48+
49+
def globus_activate(
50+
hpss: str, gtc: Optional[GlobusTransferCollection] = None
51+
) -> Optional[GlobusTransferCollection]:
4052
url = urlparse(hpss)
4153
if url.scheme != "globus":
42-
return
54+
return None
55+
if gtc is None:
56+
gtc = GlobusTransferCollection()
4357
check_state_files()
44-
remote_endpoint = url.netloc
45-
local_endpoint = get_local_endpoint_id(local_endpoint)
46-
if remote_endpoint.upper() in HPSS_ENDPOINT_MAP.keys():
47-
remote_endpoint = HPSS_ENDPOINT_MAP.get(remote_endpoint.upper())
48-
both_endpoints: List[Optional[str]] = [local_endpoint, remote_endpoint]
49-
transfer_client = get_transfer_client_with_auth(both_endpoints)
58+
gtc.remote_endpoint = url.netloc
59+
gtc.local_endpoint = get_local_endpoint_id(gtc.local_endpoint)
60+
upper_remote_ep = gtc.remote_endpoint.upper()
61+
if upper_remote_ep in HPSS_ENDPOINT_MAP.keys():
62+
gtc.remote_endpoint = HPSS_ENDPOINT_MAP.get(upper_remote_ep)
63+
both_endpoints: List[Optional[str]] = [gtc.local_endpoint, gtc.remote_endpoint]
64+
gtc.transfer_client = get_transfer_client_with_auth(both_endpoints)
5065
for ep_id in both_endpoints:
51-
r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
66+
r = gtc.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
5267
if r.get("code") == "AutoActivationFailed":
5368
logger.error(
5469
f"The {ep_id} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{ep_id} and (re)activate the endpoint."
5570
)
5671
sys.exit(1)
72+
return gtc
5773

5874

59-
def file_exists(name: str) -> bool:
60-
global archive_directory_listing
61-
75+
def file_exists(archive_directory_listing, name: str) -> bool:
6276
for entry in archive_directory_listing:
6377
if entry.get("name") == name:
6478
return True
6579
return False
6680

6781

68-
global_variable_tarfiles_pushed = 0
69-
70-
7182
# C901 'globus_transfer' is too complex (20)
7283
def globus_transfer( # noqa: C901
73-
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
84+
gtc: Optional[GlobusTransferCollection],
85+
remote_ep: str,
86+
remote_path: str,
87+
name: str,
88+
transfer_type: str,
89+
non_blocking: bool,
7490
):
75-
global transfer_client
76-
global local_endpoint
77-
global remote_endpoint
78-
global transfer_data
79-
global task_id
80-
global archive_directory_listing
81-
global global_variable_tarfiles_pushed
82-
8391
logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
8492
logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}")
85-
if not transfer_client:
86-
globus_activate("globus://" + remote_ep)
87-
if not transfer_client:
93+
if (not gtc) or (not gtc.transfer_client):
94+
gtc = globus_activate("globus://" + remote_ep)
95+
if (not gtc) or (not gtc.transfer_client):
8896
sys.exit(1)
8997

9098
if transfer_type == "get":
91-
if not archive_directory_listing:
92-
archive_directory_listing = transfer_client.operation_ls(
93-
remote_endpoint, remote_path
99+
if not gtc.archive_directory_listing:
100+
gtc.archive_directory_listing = gtc.transfer_client.operation_ls(
101+
gtc.remote_endpoint, remote_path
94102
)
95-
if not file_exists(name):
103+
if not file_exists(gtc.archive_directory_listing, name):
96104
logger.error(
97105
"Remote file globus://{}{}/{} does not exist".format(
98106
remote_ep, remote_path, name
99107
)
100108
)
101109
sys.exit(1)
102110

103-
transfer_data = set_up_TransferData(
111+
mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer()
112+
transfer_data: TransferData = set_up_TransferData(
104113
transfer_type,
105-
local_endpoint, # Global
106-
remote_endpoint, # Global
114+
gtc.local_endpoint,
115+
gtc.remote_endpoint,
107116
remote_path,
108117
name,
109-
transfer_client, # Global
110-
transfer_data, # Global
118+
gtc.transfer_client,
119+
mrt.transfer_data if mrt else None,
111120
)
112121

113122
task: GlobusHTTPResponse
114123
try:
115-
if task_id:
116-
task = transfer_client.get_task(task_id)
117-
prev_task_status = task["status"]
124+
if mrt and mrt.task_id:
125+
task = gtc.transfer_client.get_task(mrt.task_id)
126+
mrt.task_status = task["status"]
118127
# one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE}
119128
# NOTE: How we behave here depends upon whether we want to support mutliple active transfers.
120129
# Presently, we do not, except inadvertantly (if status == PENDING)
121-
if prev_task_status == "ACTIVE":
130+
if mrt.task_status == "ACTIVE":
122131
logger.info(
123-
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE."
132+
f"{ts_utc()}: Previous task_id {mrt.task_id} Still Active. Returning ACTIVE."
124133
)
125134
return "ACTIVE"
126-
elif prev_task_status == "SUCCEEDED":
135+
elif mrt.task_status == "SUCCEEDED":
127136
logger.info(
128-
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED."
137+
f"{ts_utc()}: Previous task_id {mrt.task_id} status = SUCCEEDED."
129138
)
130139
src_ep = task["source_endpoint_id"]
131140
dst_ep = task["destination_endpoint_id"]
132141
label = task["label"]
133142
ts = ts_utc()
134143
logger.info(
135-
"{}:Globus transfer {}, from {} to {}: {} succeeded".format(
136-
ts, task_id, src_ep, dst_ep, label
137-
)
144+
f"{ts}:Globus transfer {mrt.task_id}, from {src_ep} to {dst_ep}: {label} succeeded"
138145
)
139146
else:
140147
logger.error(
141-
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}."
148+
f"{ts_utc()}: Previous task_id {mrt.task_id} status = {mrt.task_status}."
142149
)
143150

144151
# DEBUG: review accumulated items in TransferData
145152
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
146153
attribs = transfer_data.__dict__
147154
for item in attribs["data"]["DATA"]:
148155
if item["DATA_TYPE"] == "transfer_item":
149-
global_variable_tarfiles_pushed += 1
156+
gtc.cumulative_tarfiles_pushed += 1
150157
print(
151-
f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}",
158+
f" (routine) PUSHING (#{gtc.cumulative_tarfiles_pushed}) STORED source item: {item['source_path']}",
152159
flush=True,
153160
)
154161

155162
# SUBMIT new transfer here
156163
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
157-
task = submit_transfer_with_checks(transfer_client, transfer_data)
164+
task = submit_transfer_with_checks(gtc.transfer_client, transfer_data)
158165
task_id = task.get("task_id")
159166
# NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer,
160167
# the "lable" given here refers only to the LAST tarfile in the TransferData list.
161168
logger.info(
162169
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
163170
)
164-
165-
# Nullify the submitted transfer data structure so that a new one will be created on next call.
166-
transfer_data = None
171+
new_transfer = GlobusTransfer()
172+
new_transfer.transfer_data = transfer_data
173+
new_transfer.task_id = task_id
174+
new_transfer.task_status = "UNKNOWN"
175+
gtc.transfers.append(new_transfer)
167176
except TransferAPIError as e:
168177
if e.code == "NoCredException":
169178
logger.error(
@@ -178,28 +187,34 @@ def globus_transfer( # noqa: C901
178187
logger.error("Exception: {}".format(e))
179188
sys.exit(1)
180189

181-
# test for blocking on new task_id
182-
task_status = "UNKNOWN"
183-
if not non_blocking:
184-
task_status = globus_block_wait(
185-
task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5
186-
)
187-
else:
188-
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}")
189-
190-
if transfer_type == "put":
191-
return task_status
190+
new_mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer()
192191

193-
if transfer_type == "get" and task_id:
194-
globus_wait(task_id)
192+
# test for blocking on new task_id
193+
if new_mrt and new_mrt.task_id:
194+
if not non_blocking:
195+
new_mrt.task_status = globus_block_wait(
196+
transfer_client=gtc.transfer_client,
197+
task_id=new_mrt.task_id,
198+
wait_timeout=7200,
199+
polling_interval=10,
200+
max_retries=5,
201+
)
202+
else:
203+
logger.info(
204+
f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {new_mrt.task_id}"
205+
)
195206

196-
return task_status
207+
if transfer_type == "get":
208+
globus_wait(gtc.transfer_client, new_mrt.task_id)
197209

198210

199211
def globus_block_wait(
200-
task_id: str, wait_timeout: int, polling_interval: int, max_retries: int
212+
transfer_client: TransferClient,
213+
task_id: str,
214+
wait_timeout: int,
215+
polling_interval: int,
216+
max_retries: int,
201217
):
202-
global transfer_client
203218

204219
# poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours
205220
logger.info(
@@ -243,9 +258,7 @@ def globus_block_wait(
243258
return task_status
244259

245260

246-
def globus_wait(task_id: str):
247-
global transfer_client
248-
261+
def globus_wait(transfer_client: TransferClient, task_id: str):
249262
try:
250263
"""
251264
A Globus transfer job (task) can be in one of the three states:
@@ -287,30 +300,37 @@ def globus_wait(task_id: str):
287300
sys.exit(1)
288301

289302

290-
def globus_finalize(non_blocking: bool = False):
291-
global transfer_client
292-
global transfer_data
293-
global task_id
294-
global global_variable_tarfiles_pushed
295-
303+
def globus_finalize(
304+
gtc: Optional[GlobusTransferCollection], non_blocking: bool = False
305+
):
296306
last_task_id = None
297307

298-
if transfer_data:
308+
if gtc is None:
309+
logger.warning("No GlobusTransferCollection object provided for finalization")
310+
return
311+
312+
transfer = gtc.get_most_recent_transfer()
313+
314+
if transfer and transfer.transfer_data:
299315
# DEBUG: review accumulated items in TransferData
300316
logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:")
301-
attribs = transfer_data.__dict__
317+
attribs = transfer.transfer_data.__dict__
302318
for item in attribs["data"]["DATA"]:
303319
if item["DATA_TYPE"] == "transfer_item":
304-
global_variable_tarfiles_pushed += 1
320+
gtc.cumulative_tarfiles_pushed += 1
305321
print(
306-
f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}",
322+
f" (finalize) PUSHING ({gtc.cumulative_tarfiles_pushed}) source item: {item['source_path']}",
307323
flush=True,
308324
)
309325

310326
# SUBMIT new transfer here
311-
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
327+
logger.info(
328+
f"{ts_utc()}: DIVING: Submit Transfer for {transfer.transfer_data['label']}"
329+
)
312330
try:
313-
last_task = submit_transfer_with_checks(transfer_client, transfer_data)
331+
last_task = submit_transfer_with_checks(
332+
gtc.transfer_client, transfer.transfer_data
333+
)
314334
last_task_id = last_task.get("task_id")
315335
except TransferAPIError as e:
316336
if e.code == "NoCredException":
@@ -327,7 +347,7 @@ def globus_finalize(non_blocking: bool = False):
327347
sys.exit(1)
328348

329349
if not non_blocking:
330-
if task_id:
331-
globus_wait(task_id)
350+
if transfer and transfer.task_id:
351+
globus_wait(gtc.transfer_client, transfer.task_id)
332352
if last_task_id:
333-
globus_wait(last_task_id)
353+
globus_wait(gtc.transfer_client, last_task_id)

0 commit comments

Comments
 (0)