Skip to content

Commit 805af24

Browse files
committed
Script for debugging Globus integration
1 parent f37a83f commit 805af24

File tree

1 file changed

+318
-0
lines changed

1 file changed

+318
-0
lines changed

examples/zstash_create_globus.py

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
import configparser
2+
import os
3+
import re
4+
import shutil
5+
from typing import Optional
6+
from urllib.parse import ParseResult, urlparse
7+
8+
from fair_research_login.client import NativeClient
9+
from globus_sdk import TransferAPIError, TransferClient, TransferData
10+
from globus_sdk.response import GlobusHTTPResponse
11+
12+
# Minimal example of how Globus is used in zstash
13+
# 1. Log into endpoints at globus.org
14+
# 2. To start fresh, with no consents:
15+
# https://app.globus.org/settings/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all"
16+
17+
HSI_DIR = "zstash_debugging_20250414_v4"
18+
19+
# Globus-specific settings ####################################################
20+
GLOBUS_CFG: str = os.path.expanduser("~/.globus-native-apps.cfg")
21+
INI_PATH: str = os.path.expanduser("~/.zstash.ini")
22+
ZSTASH_CLIENT_ID: str = "6c1629cf-446c-49e7-af95-323c6412397f"
23+
NAME_TO_ENDPOINT_MAP = {
24+
"Globus Tutorial Collection 1": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc", # The Unit test endpoint
25+
"NERSC HPSS": "9cd89cfd-6d04-11e5-ba46-22000b92c6ec",
26+
"NERSC Perlmutter": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79",
27+
}
28+
29+
30+
class GlobusInfo(object):
31+
def __init__(self, hpss_path: str):
32+
url: ParseResult = urlparse(hpss_path)
33+
assert url.scheme == "globus"
34+
self.hpss_path: str = hpss_path
35+
self.url: ParseResult = url
36+
self.remote_endpoint: Optional[str] = None
37+
self.local_endpoint: Optional[str] = None
38+
self.transfer_client: Optional[TransferClient] = None
39+
self.transfer_data: Optional[TransferData] = None
40+
self.task_id = None
41+
42+
43+
# zstash general settings #####################################################
44+
class Config(object):
45+
def __init__(self):
46+
self.path: Optional[str] = None
47+
self.hpss: Optional[str] = None
48+
self.maxsize: int = int(1024 * 1024 * 1024 * 256)
49+
50+
51+
class CommandInfo(object):
52+
def __init__(self, dir_to_archive: str, hpss_path: str):
53+
self.config: Config = Config()
54+
# Simulate CommandInfo.set_dir_to_archive
55+
self.config.path = os.path.abspath(dir_to_archive)
56+
# Simulate CommandInfo.set_hpss_parameters
57+
self.config.hpss = hpss_path
58+
url: ParseResult = urlparse(hpss_path)
59+
assert url.scheme == "globus"
60+
self.globus_info: GlobusInfo = GlobusInfo(hpss_path)
61+
if os.path.exists(GLOBUS_CFG):
62+
print(
63+
f"{GLOBUS_CFG} exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')"
64+
)
65+
else:
66+
print(
67+
f"{GLOBUS_CFG} does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run."
68+
)
69+
70+
71+
# Functions ###################################################################
72+
def main():
73+
hpss_path = f"globus://{NAME_TO_ENDPOINT_MAP['NERSC HPSS']}/~/{HSI_DIR}"
74+
dir_to_archive: str = "dir_to_archive"
75+
base_dir = os.getcwd()
76+
toy_run(hpss_path, dir_to_archive)
77+
# /global/homes/f/forsyth/.globus-native-apps.cfg does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run.
78+
#
79+
# Might ask for 1st authentication prompt:
80+
# Please paste the following URL in a browser:
81+
# Authenticated for the 1st time!
82+
#
83+
# Might ask for 2nd authentication prompt:
84+
# Please paste the following URL in a browser:
85+
# Authenticated for the 2nd time!
86+
# Consents added, please re-run the previous command to start transfer
87+
# Now that we have the authentications, let's re-run.
88+
os.chdir(base_dir)
89+
real_run(hpss_path, dir_to_archive)
90+
# /global/homes/f/forsyth/.globus-native-apps.cfg exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')
91+
#
92+
# Might ask for 1st authentication prompt:
93+
# Authenticated for the 1st time!
94+
#
95+
# distilled_globus_block_wait. task_wait, retry_count=0
96+
# To see transferred files, run: hsi ls zstash_debugging_20250414_v2
97+
# Shows file0.txt
98+
print(f"To see transferred files, run: hsi ls {HSI_DIR}")
99+
100+
101+
# Toy run to get everything set up correctly.
102+
def toy_run(
103+
hpss_path: str,
104+
dir_to_archive: str,
105+
):
106+
# Start fresh
107+
if os.path.exists(INI_PATH):
108+
os.remove(INI_PATH)
109+
if os.path.exists(GLOBUS_CFG):
110+
os.remove(GLOBUS_CFG)
111+
set_up_dirs("toy_run", dir_to_archive)
112+
try:
113+
distilled_create(
114+
hpss_path,
115+
dir_to_archive,
116+
)
117+
except RuntimeError:
118+
print("Now that we have the authentications, let's re-run.")
119+
120+
121+
def real_run(
122+
hpss_path: str,
123+
dir_to_archive: str,
124+
):
125+
# Start fresh
126+
assert os.path.exists(INI_PATH)
127+
assert os.path.exists(GLOBUS_CFG)
128+
set_up_dirs("real_run", dir_to_archive)
129+
distilled_create(
130+
hpss_path,
131+
dir_to_archive,
132+
)
133+
134+
135+
def set_up_dirs(run_dir: str, dir_to_archive: str):
136+
if os.path.exists(run_dir):
137+
shutil.rmtree(run_dir)
138+
os.mkdir(run_dir)
139+
os.chdir(run_dir)
140+
os.mkdir(dir_to_archive)
141+
with open(f"{dir_to_archive}/file0.txt", "w") as f:
142+
f.write("file0 stuff")
143+
144+
145+
# Distilled versions of zstash functions ######################################
146+
147+
148+
def distilled_create(hpss_path: str, dir_to_archive: str):
149+
command_info = CommandInfo(dir_to_archive, hpss_path)
150+
print(command_info.config.path)
151+
assert command_info.config.path is not None
152+
assert os.path.isdir(command_info.config.path)
153+
154+
# Begin simulating globus_activate ########################################
155+
command_info.globus_info.remote_endpoint = command_info.globus_info.url.netloc
156+
# Simulate globus_activate > set_local_endpoint
157+
ini = configparser.ConfigParser()
158+
if ini.read(INI_PATH):
159+
if "local" in ini.sections():
160+
command_info.globus_info.local_endpoint = ini["local"].get(
161+
"globus_endpoint_uuid"
162+
)
163+
else:
164+
ini["local"] = {"globus_endpoint_uuid": ""}
165+
with open(INI_PATH, "w") as f:
166+
ini.write(f)
167+
if not command_info.globus_info.local_endpoint:
168+
nersc_hostname = os.environ.get("NERSC_HOST")
169+
assert nersc_hostname == "perlmutter"
170+
command_info.globus_info.local_endpoint = NAME_TO_ENDPOINT_MAP[
171+
"NERSC Perlmutter"
172+
]
173+
# Simulate globus_activate > set_clients
174+
native_client = NativeClient(
175+
client_id=ZSTASH_CLIENT_ID,
176+
app_name="Zstash",
177+
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
178+
)
179+
# May print 'Please Paste your Auth Code Below:'
180+
# This is the 1st authentication prompt!
181+
print("Might ask for 1st authentication prompt:")
182+
native_client.login(no_local_server=True, refresh_tokens=True)
183+
print("Authenticated for the 1st time!")
184+
transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org")
185+
command_info.globus_info.transfer_client = TransferClient(
186+
authorizer=transfer_authorizer
187+
)
188+
# Continue globus_activate
189+
for ep_id in [
190+
command_info.globus_info.local_endpoint,
191+
command_info.globus_info.remote_endpoint,
192+
]:
193+
r = command_info.globus_info.transfer_client.endpoint_autoactivate(
194+
ep_id, if_expires_in=600
195+
)
196+
assert r.get("code") != "AutoActivationFailed"
197+
# End simulating globus_activate ##########################################
198+
199+
os.chdir(command_info.config.path)
200+
file_path = os.path.join(command_info.config.path, "file0.txt")
201+
202+
# Begin simulating hpss_put ###############################################
203+
url = urlparse(command_info.config.hpss)
204+
url_path: str = str(url.path)
205+
path: str
206+
name: str
207+
path, name = os.path.split(file_path)
208+
cwd: str = os.getcwd()
209+
if path != "":
210+
# This directory contains the file we want to transfer to HPSS.
211+
os.chdir(path)
212+
_ = distilled_globus_transfer(command_info.globus_info, url_path, name)
213+
if path != "":
214+
os.chdir(cwd)
215+
# End simulating hpss_put #################################################
216+
217+
assert command_info.globus_info.transfer_data is None
218+
219+
220+
def distilled_globus_transfer(
221+
globus_info: GlobusInfo, remote_path: str, name: str
222+
) -> str:
223+
assert globus_info.local_endpoint is not None
224+
src_ep: str = globus_info.local_endpoint
225+
src_path: str = os.path.join(os.getcwd(), name)
226+
assert globus_info.remote_endpoint is not None
227+
dst_ep: str = globus_info.remote_endpoint
228+
dst_path: str = os.path.join(remote_path, name)
229+
subdir = os.path.basename(os.path.normpath(remote_path))
230+
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
231+
filename = name.split(".")[0]
232+
label = subdir_label + " " + filename
233+
assert globus_info.transfer_data is None
234+
globus_info.transfer_data = TransferData(
235+
globus_info.transfer_client,
236+
src_ep,
237+
dst_ep,
238+
label=label,
239+
verify_checksum=True,
240+
preserve_timestamp=True,
241+
fail_on_quota_errors=True,
242+
)
243+
globus_info.transfer_data.add_item(src_path, dst_path)
244+
globus_info.transfer_data["label"] = label
245+
task: GlobusHTTPResponse
246+
if globus_info.task_id:
247+
task = globus_info.transfer_client.get_task(globus_info.task_id)
248+
prev_task_status = task["status"]
249+
if prev_task_status == "ACTIVE":
250+
return "ACTIVE"
251+
252+
# Begin simulating submit_transfer_with_checks ############################
253+
try:
254+
assert globus_info.transfer_client is not None
255+
task = globus_info.transfer_client.submit_transfer(globus_info.transfer_data)
256+
except TransferAPIError as err:
257+
if err.info.consent_required:
258+
scopes = "urn:globus:auth:scope:transfer.api.globus.org:all["
259+
for ep_id in [globus_info.remote_endpoint, globus_info.local_endpoint]:
260+
scopes += f" *https://auth.globus.org/scopes/{ep_id}/data_access"
261+
scopes += " ]"
262+
native_client = NativeClient(client_id=ZSTASH_CLIENT_ID, app_name="Zstash")
263+
# May print 'Please Paste your Auth Code Below:'
264+
# This is the 2nd authentication prompt!
265+
print("Might ask for 2nd authentication prompt:")
266+
native_client.login(requested_scopes=scopes)
267+
print("Authenticated for the 2nd time!")
268+
print(
269+
"Consents added, please re-run the previous command to start transfer"
270+
)
271+
raise RuntimeError("Re-run now that authentications are set up!")
272+
else:
273+
if err.info.authorization_parameters:
274+
print("Error is in authorization parameters")
275+
raise err
276+
# End simulating submit_transfer_with_checks ##############################
277+
278+
globus_info.task_id = task.get("task_id")
279+
# Nullify the submitted transfer data structure so that a new one will be created on next call.
280+
globus_info.transfer_data = None
281+
# wait_timeout = 7200 sec = 120 min = 2h
282+
# wait_timeout = 300 sec = 5 min
283+
wait_timeout = 300
284+
max_retries = 2
285+
286+
# Begin simulating globus_block_wait ######################################
287+
task_status = "UNKNOWN"
288+
retry_count = 0
289+
while retry_count < max_retries:
290+
try:
291+
# Wait for the task to complete
292+
assert globus_info.transfer_client is not None
293+
print(f"task_wait, retry_count={retry_count}")
294+
globus_info.transfer_client.task_wait(
295+
globus_info.task_id, timeout=wait_timeout, polling_interval=10
296+
)
297+
except Exception as e:
298+
print(f"Unexpected Exception: {e}")
299+
else:
300+
assert globus_info.transfer_client is not None
301+
curr_task: GlobusHTTPResponse = globus_info.transfer_client.get_task(
302+
globus_info.task_id
303+
)
304+
task_status = curr_task["status"]
305+
if task_status == "SUCCEEDED":
306+
break
307+
finally:
308+
retry_count += 1
309+
if retry_count == max_retries:
310+
task_status = "EXHAUSTED_TIMEOUT_RETRIES"
311+
# End simulating globus_block_wait ######################################
312+
313+
return task_status
314+
315+
316+
# Run #########################################################################
317+
if __name__ == "__main__":
318+
main()

0 commit comments

Comments
 (0)