Skip to content

Commit 33945e3

Browse files
committed
Script for debugging Globus integration
1 parent f37a83f commit 33945e3

File tree

1 file changed

+393
-0
lines changed

1 file changed

+393
-0
lines changed

examples/zstash_create_globus.py

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
import configparser
2+
import datetime
3+
import os
4+
import re
5+
import shutil
6+
from typing import List, Optional, Tuple
7+
from urllib.parse import ParseResult, urlparse
8+
9+
from fair_research_login.client import NativeClient
10+
from globus_sdk import TransferAPIError, TransferClient, TransferData
11+
from globus_sdk.response import GlobusHTTPResponse
12+
13+
# Minimal example of how Globus is used in zstash
14+
# 1. Log into endpoints at globus.org
15+
# 2. To start fresh, with no consents:
16+
# https://app.globus.org/settings/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all"
17+
18+
HSI_DIR = "zstash_debugging_20250414_v2"
19+
20+
# Globus-specific settings ####################################################
21+
GLOBUS_CFG: str = os.path.expanduser("~/.globus-native-apps.cfg")
22+
INI_PATH: str = os.path.expanduser("~/.zstash.ini")
23+
ZSTASH_CLIENT_ID: str = "6c1629cf-446c-49e7-af95-323c6412397f"
24+
NAME_TO_ENDPOINT_MAP = {
25+
"Globus Tutorial Collection 1": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc", # The Unit test endpoint
26+
"NERSC HPSS": "9cd89cfd-6d04-11e5-ba46-22000b92c6ec",
27+
"NERSC Perlmutter": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79",
28+
}
29+
30+
31+
class GlobusInfo(object):
32+
def __init__(self, hpss_path: str):
33+
url: ParseResult = urlparse(hpss_path)
34+
assert url.scheme == "globus"
35+
self.hpss_path: str = hpss_path
36+
self.url: ParseResult = url
37+
self.remote_endpoint: Optional[str] = None
38+
self.local_endpoint: Optional[str] = None
39+
self.transfer_client: Optional[TransferClient] = None
40+
self.transfer_data: Optional[TransferData] = None
41+
self.task_id = None
42+
self.tarfiles_pushed: int = 0
43+
44+
45+
# zstash general settings #####################################################
46+
BLOCK_SIZE: int = 1024 * 1014
47+
48+
TupleFilesRowNoId = Tuple[str, int, datetime.datetime, Optional[str], str, int]
49+
50+
51+
class Config(object):
52+
def __init__(self):
53+
self.path: Optional[str] = None
54+
self.hpss: Optional[str] = None
55+
self.maxsize: int = int(1024 * 1024 * 1024 * 256)
56+
57+
58+
class CommandInfo(object):
59+
def __init__(self, dir_to_archive: str, hpss_path: str):
60+
self.config: Config = Config()
61+
self.prev_transfers: List[str] = []
62+
self.curr_transfers: List[str] = []
63+
# Simulate CommandInfo.set_dir_to_archive
64+
self.config.path = os.path.abspath(dir_to_archive)
65+
# Simulate CommandInfo.set_hpss_parameters
66+
self.config.hpss = hpss_path
67+
url: ParseResult = urlparse(hpss_path)
68+
assert url.scheme == "globus"
69+
self.globus_info: GlobusInfo = GlobusInfo(hpss_path)
70+
if os.path.exists(GLOBUS_CFG):
71+
print(
72+
f"{GLOBUS_CFG} exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')"
73+
)
74+
else:
75+
print(
76+
f"{GLOBUS_CFG} does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run."
77+
)
78+
79+
80+
# Functions ###################################################################
81+
def main():
82+
hpss_path = f"globus://{NAME_TO_ENDPOINT_MAP['NERSC HPSS']}/~/{HSI_DIR}"
83+
dir_to_archive: str = "dir_to_archive"
84+
base_dir = os.getcwd()
85+
toy_run(hpss_path, dir_to_archive)
86+
# /global/homes/f/forsyth/.globus-native-apps.cfg does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run.
87+
#
88+
# Might ask for 1st authentication prompt:
89+
# Please paste the following URL in a browser:
90+
# Authenticated for the 1st time!
91+
#
92+
# Might ask for 2nd authentication prompt:
93+
# Please paste the following URL in a browser:
94+
# Authenticated for the 2nd time!
95+
# Consents added, please re-run the previous command to start transfer
96+
# Now that we have the authentications, let's re-run.
97+
os.chdir(base_dir)
98+
real_run(hpss_path, dir_to_archive)
99+
# /global/homes/f/forsyth/.globus-native-apps.cfg exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')
100+
#
101+
# Might ask for 1st authentication prompt:
102+
# Authenticated for the 1st time!
103+
#
104+
# distilled_globus_block_wait. task_wait, retry_count=0
105+
# To see transferred files, run: hsi ls zstash_debugging_20250414_v2
106+
# Shows file0.txt
107+
print(f"To see transferred files, run: hsi ls {HSI_DIR}")
108+
109+
110+
# Toy run to get everything set up correctly.
111+
def toy_run(
112+
hpss_path: str,
113+
dir_to_archive: str,
114+
):
115+
# Start fresh
116+
if os.path.exists(INI_PATH):
117+
os.remove(INI_PATH)
118+
if os.path.exists(GLOBUS_CFG):
119+
os.remove(GLOBUS_CFG)
120+
run_dir = "toy_run"
121+
if os.path.exists(run_dir):
122+
shutil.rmtree(run_dir)
123+
# Start doing work
124+
os.mkdir(run_dir)
125+
os.chdir(run_dir)
126+
os.mkdir(dir_to_archive)
127+
with open(f"{dir_to_archive}/file0.txt", "w") as f:
128+
f.write("file0 stuff")
129+
# Simulate running `zstash create`
130+
try:
131+
distilled_create(
132+
hpss_path,
133+
dir_to_archive,
134+
)
135+
except RuntimeError:
136+
print("Now that we have the authentications, let's re-run.")
137+
138+
139+
def real_run(
140+
hpss_path: str,
141+
dir_to_archive: str,
142+
):
143+
# Start fresh
144+
assert os.path.exists(INI_PATH)
145+
assert os.path.exists(GLOBUS_CFG)
146+
run_dir = "real_run"
147+
if os.path.exists(run_dir):
148+
shutil.rmtree(run_dir)
149+
# Start doing work
150+
os.mkdir(run_dir)
151+
os.chdir(run_dir)
152+
os.mkdir(dir_to_archive)
153+
with open(f"{dir_to_archive}/file0.txt", "w") as f:
154+
f.write("file0 stuff")
155+
# Simulate running `zstash create`
156+
distilled_create(
157+
hpss_path,
158+
dir_to_archive,
159+
)
160+
161+
162+
# Distilled versions of zstash functions ######################################
163+
164+
165+
def distilled_create(hpss_path: str, dir_to_archive: str):
166+
command_info = CommandInfo(dir_to_archive, hpss_path)
167+
print(command_info.config.path)
168+
assert command_info.config.path is not None
169+
assert os.path.isdir(command_info.config.path)
170+
distilled_globus_activate(command_info.globus_info)
171+
os.chdir(command_info.config.path)
172+
distilled_hpss_put(
173+
command_info, os.path.join(command_info.config.path, "file0.txt")
174+
)
175+
distilled_globus_finalize(command_info.globus_info)
176+
177+
178+
def distilled_globus_activate(globus_info: GlobusInfo):
179+
globus_info.remote_endpoint = globus_info.url.netloc
180+
# Simulate globus_activate > set_local_endpoint
181+
ini = configparser.ConfigParser()
182+
if ini.read(INI_PATH):
183+
if "local" in ini.sections():
184+
globus_info.local_endpoint = ini["local"].get("globus_endpoint_uuid")
185+
else:
186+
ini["local"] = {"globus_endpoint_uuid": ""}
187+
with open(INI_PATH, "w") as f:
188+
ini.write(f)
189+
if not globus_info.local_endpoint:
190+
nersc_hostname = os.environ.get("NERSC_HOST")
191+
assert nersc_hostname == "perlmutter"
192+
globus_info.local_endpoint = NAME_TO_ENDPOINT_MAP["NERSC Perlmutter"]
193+
# Simulate globus_activate > set_clients
194+
native_client = NativeClient(
195+
client_id=ZSTASH_CLIENT_ID,
196+
app_name="Zstash",
197+
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
198+
)
199+
# May print 'Please Paste your Auth Code Below:'
200+
# This is the 1st authentication prompt!
201+
print("Might ask for 1st authentication prompt:")
202+
native_client.login(no_local_server=True, refresh_tokens=True)
203+
print("Authenticated for the 1st time!")
204+
transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org")
205+
globus_info.transfer_client = TransferClient(authorizer=transfer_authorizer)
206+
# Continue globus_activate
207+
for ep_id in [globus_info.local_endpoint, globus_info.remote_endpoint]:
208+
r = globus_info.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
209+
assert r.get("code") != "AutoActivationFailed"
210+
211+
212+
def distilled_hpss_put(command_info: CommandInfo, file_path: str):
213+
url = urlparse(command_info.config.hpss)
214+
url_path: str = str(url.path)
215+
command_info.curr_transfers.append(file_path)
216+
path: str
217+
name: str
218+
path, name = os.path.split(file_path)
219+
cwd: str = os.getcwd()
220+
if path != "":
221+
# This directory contains the file we want to transfer to HPSS.
222+
os.chdir(path)
223+
globus_status = distilled_globus_transfer(command_info.globus_info, url_path, name)
224+
if path != "":
225+
os.chdir(cwd)
226+
if globus_status == "SUCCEEDED":
227+
for src_path in command_info.prev_transfers:
228+
os.remove(src_path)
229+
command_info.prev_transfers = command_info.curr_transfers
230+
command_info.curr_transfers = list()
231+
232+
233+
def distilled_globus_transfer(globus_info: GlobusInfo, remote_path: str, name: str):
234+
assert globus_info.local_endpoint is not None
235+
src_ep: str = globus_info.local_endpoint
236+
src_path: str = os.path.join(os.getcwd(), name)
237+
assert globus_info.remote_endpoint is not None
238+
dst_ep: str = globus_info.remote_endpoint
239+
dst_path: str = os.path.join(remote_path, name)
240+
subdir = os.path.basename(os.path.normpath(remote_path))
241+
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
242+
filename = name.split(".")[0]
243+
label = subdir_label + " " + filename
244+
if not globus_info.transfer_data:
245+
globus_info.transfer_data = TransferData(
246+
globus_info.transfer_client,
247+
src_ep,
248+
dst_ep,
249+
label=label,
250+
verify_checksum=True,
251+
preserve_timestamp=True,
252+
fail_on_quota_errors=True,
253+
)
254+
globus_info.transfer_data.add_item(src_path, dst_path)
255+
globus_info.transfer_data["label"] = label
256+
task: GlobusHTTPResponse
257+
if globus_info.task_id:
258+
task = globus_info.transfer_client.get_task(globus_info.task_id)
259+
prev_task_status = task["status"]
260+
# one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE}
261+
# NOTE: How we behave here depends upon whether we want to support mutliple active transfers.
262+
# Presently, we do not, except inadvertantly (if status == PENDING)
263+
if prev_task_status == "ACTIVE":
264+
return "ACTIVE"
265+
elif prev_task_status == "SUCCEEDED":
266+
src_ep = task["source_endpoint_id"]
267+
dst_ep = task["destination_endpoint_id"]
268+
label = task["label"]
269+
attribs = globus_info.transfer_data.__dict__
270+
for item in attribs["data"]["DATA"]:
271+
if item["DATA_TYPE"] == "transfer_item":
272+
globus_info.tarfiles_pushed += 1
273+
# Simulate submit_transfer_with_checks
274+
task = distilled_submit_transfer_with_checks(globus_info)
275+
globus_info.task_id = task.get("task_id")
276+
# Nullify the submitted transfer data structure so that a new one will be created on next call.
277+
globus_info.transfer_data = None
278+
# wait_timeout = 7200 sec = 120 min = 2h
279+
# wait_timeout = 300 sec = 5 min
280+
task_status = distilled_globus_block_wait(
281+
globus_info, wait_timeout=300, max_retries=5
282+
)
283+
return task_status
284+
285+
286+
def distilled_submit_transfer_with_checks(
287+
globus_info: GlobusInfo,
288+
) -> GlobusHTTPResponse:
289+
try:
290+
assert globus_info.transfer_client is not None
291+
task: GlobusHTTPResponse = globus_info.transfer_client.submit_transfer(
292+
globus_info.transfer_data
293+
)
294+
except TransferAPIError as err:
295+
if err.info.consent_required:
296+
# Simulate check_consents
297+
scopes = "urn:globus:auth:scope:transfer.api.globus.org:all["
298+
for ep_id in [globus_info.remote_endpoint, globus_info.local_endpoint]:
299+
# Simulate check_endpoint_version_5
300+
assert globus_info.transfer_client is not None
301+
output = globus_info.transfer_client.get_endpoint(ep_id)
302+
version = output.get("gcs_version", "0.0")
303+
if output["gcs_version"] is None:
304+
valid_endpoint = False
305+
elif int(version.split(".")[0]) >= 5:
306+
valid_endpoint = True
307+
else:
308+
valid_endpoint = False
309+
# Continue check_consents
310+
if valid_endpoint:
311+
scopes += f" *https://auth.globus.org/scopes/{ep_id}/data_access"
312+
scopes += " ]"
313+
native_client = NativeClient(client_id=ZSTASH_CLIENT_ID, app_name="Zstash")
314+
# May print 'Please Paste your Auth Code Below:'
315+
# This is the 2nd authentication prompt!
316+
print("Might ask for 2nd authentication prompt:")
317+
native_client.login(requested_scopes=scopes)
318+
print("Authenticated for the 2nd time!")
319+
print(
320+
"Consents added, please re-run the previous command to start transfer"
321+
)
322+
raise RuntimeError("Re-run now that authentications are set up!")
323+
else:
324+
if err.info.authorization_parameters:
325+
print("Error is in authorization parameters")
326+
raise err
327+
return task
328+
329+
330+
def distilled_globus_block_wait(
331+
globus_info: GlobusInfo, wait_timeout: int, max_retries: int
332+
):
333+
task_status = "UNKNOWN"
334+
retry_count = 0
335+
while retry_count < max_retries:
336+
try:
337+
# Wait for the task to complete
338+
assert globus_info.transfer_client is not None
339+
print(f"distilled_globus_block_wait. task_wait, retry_count={retry_count}")
340+
globus_info.transfer_client.task_wait(
341+
globus_info.task_id, timeout=wait_timeout, polling_interval=10
342+
)
343+
except Exception as e:
344+
print(f"Unexpected Exception: {e}")
345+
else:
346+
assert globus_info.transfer_client is not None
347+
curr_task: GlobusHTTPResponse = globus_info.transfer_client.get_task(
348+
globus_info.task_id
349+
)
350+
task_status = curr_task["status"]
351+
if task_status == "SUCCEEDED":
352+
break
353+
finally:
354+
retry_count += 1
355+
if retry_count == max_retries:
356+
task_status = "EXHAUSTED_TIMEOUT_RETRIES"
357+
return task_status
358+
359+
360+
def distilled_globus_finalize(globus_info: GlobusInfo):
361+
if globus_info.transfer_data:
362+
attribs = globus_info.transfer_data.__dict__
363+
for item in attribs["data"]["DATA"]:
364+
if item["DATA_TYPE"] == "transfer_item":
365+
globus_info.tarfiles_pushed += 1
366+
last_task: GlobusHTTPResponse = distilled_submit_transfer_with_checks(
367+
globus_info
368+
)
369+
last_task_id = last_task.get("task_id")
370+
if globus_info.task_id:
371+
distilled_globus_wait(globus_info)
372+
if last_task_id:
373+
distilled_globus_wait(globus_info, last_task_id)
374+
375+
376+
def distilled_globus_wait(globus_info: GlobusInfo, alternative_task_id=None):
377+
if alternative_task_id:
378+
task_id = alternative_task_id
379+
else:
380+
task_id = globus_info.task_id
381+
assert globus_info.transfer_client is not None
382+
while not globus_info.transfer_client.task_wait(
383+
task_id, timeout=300, polling_interval=20
384+
):
385+
print("distilled_globus_wait. task_wait.")
386+
pass
387+
assert globus_info.transfer_client is not None
388+
_ = globus_info.transfer_client.get_task(task_id)
389+
390+
391+
# Run #########################################################################
392+
if __name__ == "__main__":
393+
main()

0 commit comments

Comments
 (0)