Skip to content

Commit 73bbea7

Browse files
committed
Add simple Globus script
1 parent 13f8dc8 commit 73bbea7

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed

examples/simple_globus.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import configparser
2+
import os
3+
import re
4+
import shutil
5+
from typing import List, Optional
6+
from urllib.parse import ParseResult, urlparse
7+
8+
from fair_research_login.client import NativeClient
9+
from globus_sdk import TransferAPIError, TransferClient, TransferData
10+
from globus_sdk.response import GlobusHTTPResponse
11+
12+
# Minimal example of how Globus is used in zstash
13+
# 1. Log into endpoints at globus.org
14+
# File Manager > Add the endpoints in the "Collection" fields
15+
# 2. To start fresh, with no consents:
16+
# https://auth.globus.org/v2/web/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all"
17+
18+
HSI_DIR: str = "zstash_test_370_20250723"
19+
ENDPOINT_NAME: str = (
20+
"LCRC Improv DTN" # Change this to the name of the endpoint you want to use
21+
)
22+
REQUEST_SCOPES_EARLY: bool = True # False will emulate zstash behavior
23+
24+
# Globus-specific settings ####################################################
25+
GLOBUS_CFG: str = os.path.expanduser("~/.globus-native-apps.cfg")
26+
INI_PATH: str = os.path.expanduser("~/.zstash.ini")
27+
ZSTASH_CLIENT_ID: str = "6c1629cf-446c-49e7-af95-323c6412397f"
28+
NAME_TO_ENDPOINT_MAP = {
29+
# "Globus Tutorial Collection 1": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc", # The Unit test endpoint
30+
"NERSC HPSS": "9cd89cfd-6d04-11e5-ba46-22000b92c6ec",
31+
"NERSC Perlmutter": "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79",
32+
"LCRC Improv DTN": "15288284-7006-4041-ba1a-6b52501e49f1",
33+
}
34+
35+
36+
# Functions ###################################################################
37+
def main():
38+
base_dir = os.getcwd()
39+
print(f"Starting in {base_dir}")
40+
if os.path.exists(INI_PATH):
41+
os.remove(INI_PATH)
42+
if os.path.exists(GLOBUS_CFG):
43+
os.remove(GLOBUS_CFG)
44+
skipped_second_auth: bool = False
45+
try:
46+
skipped_second_auth = simple_transfer("toy_run")
47+
except RuntimeError:
48+
print("Now that we have the authentications, let's re-run.")
49+
print(f"For toy_run, skipped_second_auth={skipped_second_auth}")
50+
if skipped_second_auth:
51+
# We want to enter this block!
52+
print(
53+
"We didn't need to authenticate a second time! That means we don't have to re-run the previous command to start the transfer!"
54+
)
55+
else:
56+
# Without `get_all_endpoint_scopes`, we ended up in this block!
57+
#
58+
# /global/homes/f/forsyth/.globus-native-apps.cfg does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run.
59+
#
60+
# Might ask for 1st authentication prompt:
61+
# Please paste the following URL in a browser:
62+
# Authenticated for the 1st time!
63+
#
64+
# Might ask for 2nd authentication prompt:
65+
# Please paste the following URL in a browser:
66+
# Authenticated for the 2nd time!
67+
# Consents added, please re-run the previous command to start transfer
68+
# Now that we have the authentications, let's re-run.
69+
os.chdir(base_dir)
70+
print(f"Now in {os.getcwd()}")
71+
assert os.path.exists(INI_PATH)
72+
assert os.path.exists(GLOBUS_CFG)
73+
skipped_second_auth = simple_transfer("real_run")
74+
print(f"For real_run, skipped_second_auth={skipped_second_auth}")
75+
# /global/homes/f/forsyth/.globus-native-apps.cfg exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')
76+
#
77+
# Might ask for 1st authentication prompt:
78+
# Authenticated for the 1st time!
79+
#
80+
# Bypassed 2nd authentication.
81+
#
82+
# Wait for task to complete, wait_timeout=300
83+
print(f"To see transferred files, run: hsi ls {HSI_DIR}")
84+
# To see transferred files, run: hsi ls zstash_debugging_20250415_v2
85+
# Shows file0.txt
86+
assert skipped_second_auth
87+
88+
89+
def simple_transfer(run_dir: str) -> bool:
90+
hpss_path = f"globus://{NAME_TO_ENDPOINT_MAP['NERSC HPSS']}/~/{HSI_DIR}"
91+
if os.path.exists(run_dir):
92+
shutil.rmtree(run_dir)
93+
os.mkdir(run_dir)
94+
os.chdir(run_dir)
95+
print(f"Now in {os.getcwd()}")
96+
dir_to_archive: str = "dir_to_archive"
97+
txt_file: str = "file0.txt"
98+
os.mkdir(dir_to_archive)
99+
with open(f"{dir_to_archive}/{txt_file}", "w") as f:
100+
f.write("file contents")
101+
url: ParseResult = urlparse(hpss_path)
102+
assert url.scheme == "globus"
103+
if os.path.exists(GLOBUS_CFG):
104+
print(
105+
f"{GLOBUS_CFG} exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')"
106+
)
107+
else:
108+
print(
109+
f"{GLOBUS_CFG} does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run."
110+
)
111+
config_path: str = os.path.abspath(dir_to_archive)
112+
assert os.path.isdir(config_path)
113+
remote_endpoint: str = url.netloc
114+
# Simulate globus_activate > set_local_endpoint
115+
ini = configparser.ConfigParser()
116+
local_endpoint: Optional[str] = None
117+
if ini.read(INI_PATH):
118+
if "local" in ini.sections():
119+
local_endpoint = ini["local"].get("globus_endpoint_uuid")
120+
else:
121+
ini["local"] = {"globus_endpoint_uuid": ""}
122+
with open(INI_PATH, "w") as f:
123+
ini.write(f)
124+
if not local_endpoint:
125+
# nersc_hostname = os.environ.get("NERSC_HOST")
126+
# assert nersc_hostname == "perlmutter"
127+
local_endpoint = NAME_TO_ENDPOINT_MAP[ENDPOINT_NAME]
128+
native_client = NativeClient(
129+
client_id=ZSTASH_CLIENT_ID,
130+
app_name="Zstash",
131+
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
132+
)
133+
# May print 'Please Paste your Auth Code Below:'
134+
# This is the 1st authentication prompt!
135+
print("Might ask for 1st authentication prompt:")
136+
if REQUEST_SCOPES_EARLY:
137+
all_scopes: str = get_all_endpoint_scopes(list(NAME_TO_ENDPOINT_MAP.values()))
138+
native_client.login(
139+
requested_scopes=all_scopes, no_local_server=True, refresh_tokens=True
140+
)
141+
else:
142+
native_client.login(no_local_server=True, refresh_tokens=True)
143+
print("Authenticated for the 1st time!")
144+
transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org")
145+
transfer_client: TransferClient = TransferClient(authorizer=transfer_authorizer)
146+
for ep_id in [
147+
local_endpoint,
148+
remote_endpoint,
149+
]:
150+
r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
151+
assert r.get("code") != "AutoActivationFailed"
152+
os.chdir(config_path)
153+
print(f"Now in {os.getcwd()}")
154+
url_path: str = str(url.path)
155+
assert local_endpoint is not None
156+
src_path: str = os.path.join(os.getcwd(), txt_file)
157+
dst_path: str = os.path.join(url_path, txt_file)
158+
subdir = os.path.basename(os.path.normpath(url_path))
159+
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
160+
filename = txt_file.split(".")[0]
161+
label = subdir_label + " " + filename
162+
transfer_data: TransferData = TransferData(
163+
transfer_client,
164+
local_endpoint, # src_ep
165+
remote_endpoint, # dst_ep
166+
label=label,
167+
verify_checksum=True,
168+
preserve_timestamp=True,
169+
fail_on_quota_errors=True,
170+
)
171+
transfer_data.add_item(src_path, dst_path)
172+
transfer_data["label"] = label
173+
task: GlobusHTTPResponse
174+
skipped_second_auth: bool = False
175+
try:
176+
task = transfer_client.submit_transfer(transfer_data)
177+
print("Bypassed 2nd authentication.")
178+
skipped_second_auth = True
179+
except TransferAPIError as err:
180+
if err.info.consent_required:
181+
scopes = "urn:globus:auth:scope:transfer.api.globus.org:all["
182+
for ep_id in [remote_endpoint, local_endpoint]:
183+
scopes += f" *https://auth.globus.org/scopes/{ep_id}/data_access"
184+
scopes += " ]"
185+
native_client = NativeClient(client_id=ZSTASH_CLIENT_ID, app_name="Zstash")
186+
# May print 'Please Paste your Auth Code Below:'
187+
# This is the 2nd authentication prompt!
188+
print("Might ask for 2nd authentication prompt:")
189+
native_client.login(requested_scopes=scopes)
190+
print("Authenticated for the 2nd time!")
191+
print(
192+
"Consents added, please re-run the previous command to start transfer"
193+
)
194+
raise RuntimeError("Re-run now that authentications are set up!")
195+
else:
196+
if err.info.authorization_parameters:
197+
print("Error is in authorization parameters")
198+
raise err
199+
task_id = task.get("task_id")
200+
wait_timeout = 300 # 300 sec = 5 min
201+
print(f"Wait for task to complete, wait_timeout={wait_timeout}")
202+
transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=10)
203+
curr_task: GlobusHTTPResponse = transfer_client.get_task(task_id)
204+
task_status = curr_task["status"]
205+
assert task_status == "SUCCEEDED"
206+
return skipped_second_auth
207+
208+
209+
def get_all_endpoint_scopes(endpoints: List[str]) -> str:
210+
inner = " ".join(
211+
[f"*https://auth.globus.org/scopes/{ep}/data_access" for ep in endpoints]
212+
)
213+
return f"urn:globus:auth:scope:transfer.api.globus.org:all[{inner}]"
214+
215+
216+
# Run #########################################################################
217+
if __name__ == "__main__":
218+
main()

0 commit comments

Comments
 (0)