1- import configparser
21import os
32import re
43import shutil
5- from typing import List , Optional
4+ from typing import List , Tuple
65from urllib .parse import ParseResult , urlparse
76
87from fair_research_login .client import NativeClient
98from globus_sdk import TransferAPIError , TransferClient , TransferData
109from globus_sdk .response import GlobusHTTPResponse
1110
12- # Minimal example of how Globus is used in zstash
13- # 1. Log into endpoints at globus.org
14- # File Manager > Add the endpoints in the "Collection" fields
15- # 2. To start fresh, with no consents:
16- # https://auth.globus.org/v2/web/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all"
17-
18- HSI_DIR : str = "zstash_test_370_20250723"
19- ENDPOINT_NAME : str = (
20- "LCRC Improv DTN" # Change this to the name of the endpoint you want to use
21- )
22- REQUEST_SCOPES_EARLY : bool = True # False will emulate zstash behavior
23-
24- # Globus-specific settings ####################################################
25- GLOBUS_CFG : str = os .path .expanduser ("~/.globus-native-apps.cfg" )
26- INI_PATH : str = os .path .expanduser ("~/.zstash.ini" )
11+ """
12+ Minimal example of how Globus is used in zstash
13+
14+ To start fresh with Globus:
15+ 1. Log into endpoints at globus.org: File Manager > Add the endpoints in the "Collection" fields
16+ 2. To start fresh, with no consents: https://auth.globus.org/v2/web/consents > Manage Your Consents > Globus Endpoint Performance Monitoring > rescind all"
17+
18+ To run on Chrysalis:
19+
20+ # Set up environment
21+ lcrc_conda # Function to set up conda
22+ rm -rf build/
23+ conda clean --all --y
24+ conda env create -f conda/dev.yml -n zstash-simple-globus-script-20250804
25+ conda activate zstash-simple-globus-script-20250804
26+ pre-commit run --all-files
27+ python -m pip install .
28+ cd examples
29+
30+ # Run
31+
32+ Case 1: authenticate 2x on toy run, 0x on real run
33+ python simple_globus.py # REQUEST_SCOPES_EARLY=False
34+ # TOY RUN:
35+ # Prompts 1st time for auth code, no login requested
36+ # Prompts 2nd time for auth code:
37+ # > Argonne prompt > enter Argonne credentials
38+ # > NERSC prompt > no login requested
39+ # "Consents added, please re-run the previous command to start transfer"
40+ # "Now that we have the authentications, let's re-run."
41+ # REAL RUN:
42+ # "Might ask for 1st authentication prompt:"
43+ # No prompt at all!
44+ # "Authenticated for the 1st time!"
45+
46+ # Reset Globus state, as described above
47+
48+ # Case 2: authenticate 1x on toy run, 1x on real run
49+ python simple_globus.py # REQUEST_SCOPES_EARLY=True
50+ # TOY RUN:
51+ # Prompts 1st time for auth code:
52+ # > Argonne prompt > no login requested
53+ # > NERSC prompt > no login requested
54+ # "Bypassed 2nd authentication."
55+ # "We didn't need to authenticate a second time on the toy run!"
56+ # REAL RUN:
57+ # Prompts 1st time for auth code, no login requested
58+ # "Bypassed 2nd authentication."
59+ """
60+
61+ # Settings ####################################################################
62+ REQUEST_SCOPES_EARLY : bool = True
63+ REMOTE_DIR_PREFIX : str = "zstash_simple_globus_try2"
64+
65+ LOCAL_ENDPOINT : str = "LCRC Improv DTN"
66+ REMOTE_ENDPOINT : str = "NERSC Perlmutter"
67+
68+ # Constants ###################################################################
2769ZSTASH_CLIENT_ID : str = "6c1629cf-446c-49e7-af95-323c6412397f"
2870NAME_TO_ENDPOINT_MAP = {
29- # "Globus Tutorial Collection 1": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc", # The Unit test endpoint
3071 "NERSC HPSS" : "9cd89cfd-6d04-11e5-ba46-22000b92c6ec" ,
3172 "NERSC Perlmutter" : "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79" ,
3273 "LCRC Improv DTN" : "15288284-7006-4041-ba1a-6b52501e49f1" ,
3778def main ():
3879 base_dir = os .getcwd ()
3980 print (f"Starting in { base_dir } " )
40- if os .path .exists (INI_PATH ):
41- os .remove (INI_PATH )
42- if os .path .exists (GLOBUS_CFG ):
43- os .remove (GLOBUS_CFG )
4481 skipped_second_auth : bool = False
4582 try :
4683 skipped_second_auth = simple_transfer ("toy_run" )
4784 except RuntimeError :
4885 print ("Now that we have the authentications, let's re-run." )
4986 print (f"For toy_run, skipped_second_auth={ skipped_second_auth } " )
5087 if skipped_second_auth :
51- # We want to enter this block!
52- print (
53- "We didn't need to authenticate a second time! That means we don't have to re-run the previous command to start the transfer!"
54- )
55- else :
56- # Without `get_all_endpoint_scopes`, we ended up in this block!
57- #
58- # /global/homes/f/forsyth/.globus-native-apps.cfg does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run.
59- #
60- # Might ask for 1st authentication prompt:
61- # Please paste the following URL in a browser:
62- # Authenticated for the 1st time!
63- #
64- # Might ask for 2nd authentication prompt:
65- # Please paste the following URL in a browser:
66- # Authenticated for the 2nd time!
67- # Consents added, please re-run the previous command to start transfer
68- # Now that we have the authentications, let's re-run.
69- os .chdir (base_dir )
70- print (f"Now in { os .getcwd ()} " )
71- assert os .path .exists (INI_PATH )
72- assert os .path .exists (GLOBUS_CFG )
73- skipped_second_auth = simple_transfer ("real_run" )
74- print (f"For real_run, skipped_second_auth={ skipped_second_auth } " )
75- # /global/homes/f/forsyth/.globus-native-apps.cfg exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')
76- #
77- # Might ask for 1st authentication prompt:
78- # Authenticated for the 1st time!
79- #
80- # Bypassed 2nd authentication.
81- #
82- # Wait for task to complete, wait_timeout=300
83- print (f"To see transferred files, run: hsi ls { HSI_DIR } " )
84- # To see transferred files, run: hsi ls zstash_debugging_20250415_v2
85- # Shows file0.txt
88+ print ("We didn't need to authenticate a second time on the toy run!" )
89+ os .chdir (base_dir )
90+ print (f"Now in { os .getcwd ()} " )
91+ skipped_second_auth = simple_transfer ("real_run" )
92+ print (f"For real_run, skipped_second_auth={ skipped_second_auth } " )
8693 assert skipped_second_auth
8794
8895
8996def simple_transfer (run_dir : str ) -> bool :
90- hpss_path = f"globus://{ NAME_TO_ENDPOINT_MAP ['NERSC HPSS' ]} /~/{ HSI_DIR } "
91- if os .path .exists (run_dir ):
92- shutil .rmtree (run_dir )
93- os .mkdir (run_dir )
94- os .chdir (run_dir )
95- print (f"Now in { os .getcwd ()} " )
96- dir_to_archive : str = "dir_to_archive"
97- txt_file : str = "file0.txt"
98- os .mkdir (dir_to_archive )
99- with open (f"{ dir_to_archive } /{ txt_file } " , "w" ) as f :
100- f .write ("file contents" )
101- url : ParseResult = urlparse (hpss_path )
97+ remote_path = f"globus://{ NAME_TO_ENDPOINT_MAP [REMOTE_ENDPOINT ]} /~/{ REMOTE_DIR_PREFIX } _{ run_dir } "
98+ config_path : str
99+ txt_file : str
100+ config_path , txt_file = get_dir_and_file_to_archive (run_dir )
101+ url : ParseResult = urlparse (remote_path )
102102 assert url .scheme == "globus"
103- if os .path .exists (GLOBUS_CFG ):
104- print (
105- f"{ GLOBUS_CFG } exists. If this file does not have the proper settings, it may cause a TransferAPIError (e.g., 'Token is not active', 'No credentials supplied')"
106- )
107- else :
108- print (
109- f"{ GLOBUS_CFG } does not exist. zstash will need to prompt for authentications twice, and then you will need to re-run."
110- )
111- config_path : str = os .path .abspath (dir_to_archive )
112- assert os .path .isdir (config_path )
113103 remote_endpoint : str = url .netloc
114- # Simulate globus_activate > set_local_endpoint
115- ini = configparser .ConfigParser ()
116- local_endpoint : Optional [str ] = None
117- if ini .read (INI_PATH ):
118- if "local" in ini .sections ():
119- local_endpoint = ini ["local" ].get ("globus_endpoint_uuid" )
120- else :
121- ini ["local" ] = {"globus_endpoint_uuid" : "" }
122- with open (INI_PATH , "w" ) as f :
123- ini .write (f )
124- if not local_endpoint :
125- # nersc_hostname = os.environ.get("NERSC_HOST")
126- # assert nersc_hostname == "perlmutter"
127- local_endpoint = NAME_TO_ENDPOINT_MAP [ENDPOINT_NAME ]
104+ print (f"url.scheme={ url .scheme } , url.netloc={ url .netloc } " )
105+ local_endpoint : str = NAME_TO_ENDPOINT_MAP [LOCAL_ENDPOINT ]
106+ both_endpoints : List [str ] = [local_endpoint , remote_endpoint ]
128107 native_client = NativeClient (
129108 client_id = ZSTASH_CLIENT_ID ,
130109 app_name = "Zstash" ,
@@ -134,7 +113,7 @@ def simple_transfer(run_dir: str) -> bool:
134113 # This is the 1st authentication prompt!
135114 print ("Might ask for 1st authentication prompt:" )
136115 if REQUEST_SCOPES_EARLY :
137- all_scopes : str = get_all_endpoint_scopes (list ( NAME_TO_ENDPOINT_MAP . values ()) )
116+ all_scopes : str = get_all_endpoint_scopes (both_endpoints )
138117 native_client .login (
139118 requested_scopes = all_scopes , no_local_server = True , refresh_tokens = True
140119 )
@@ -143,33 +122,14 @@ def simple_transfer(run_dir: str) -> bool:
143122 print ("Authenticated for the 1st time!" )
144123 transfer_authorizer = native_client .get_authorizers ().get ("transfer.api.globus.org" )
145124 transfer_client : TransferClient = TransferClient (authorizer = transfer_authorizer )
146- for ep_id in [
147- local_endpoint ,
148- remote_endpoint ,
149- ]:
125+ for ep_id in both_endpoints :
150126 r = transfer_client .endpoint_autoactivate (ep_id , if_expires_in = 600 )
151127 assert r .get ("code" ) != "AutoActivationFailed"
152128 os .chdir (config_path )
153129 print (f"Now in { os .getcwd ()} " )
154- url_path : str = str (url .path )
155- assert local_endpoint is not None
156- src_path : str = os .path .join (os .getcwd (), txt_file )
157- dst_path : str = os .path .join (url_path , txt_file )
158- subdir = os .path .basename (os .path .normpath (url_path ))
159- subdir_label = re .sub ("[^A-Za-z0-9_ -]" , "" , subdir )
160- filename = txt_file .split ("." )[0 ]
161- label = subdir_label + " " + filename
162- transfer_data : TransferData = TransferData (
163- transfer_client ,
164- local_endpoint , # src_ep
165- remote_endpoint , # dst_ep
166- label = label ,
167- verify_checksum = True ,
168- preserve_timestamp = True ,
169- fail_on_quota_errors = True ,
130+ transfer_data : TransferData = construct_TransferData (
131+ url , txt_file , transfer_client , local_endpoint , remote_endpoint
170132 )
171- transfer_data .add_item (src_path , dst_path )
172- transfer_data ["label" ] = label
173133 task : GlobusHTTPResponse
174134 skipped_second_auth : bool = False
175135 try :
@@ -179,7 +139,7 @@ def simple_transfer(run_dir: str) -> bool:
179139 except TransferAPIError as err :
180140 if err .info .consent_required :
181141 scopes = "urn:globus:auth:scope:transfer.api.globus.org:all["
182- for ep_id in [ remote_endpoint , local_endpoint ] :
142+ for ep_id in both_endpoints :
183143 scopes += f" *https://auth.globus.org/scopes/{ ep_id } /data_access"
184144 scopes += " ]"
185145 native_client = NativeClient (client_id = ZSTASH_CLIENT_ID , app_name = "Zstash" )
@@ -201,18 +161,61 @@ def simple_transfer(run_dir: str) -> bool:
201161 print (f"Wait for task to complete, wait_timeout={ wait_timeout } " )
202162 transfer_client .task_wait (task_id , timeout = wait_timeout , polling_interval = 10 )
203163 curr_task : GlobusHTTPResponse = transfer_client .get_task (task_id )
204- task_status = curr_task ["status" ]
205- assert task_status == "SUCCEEDED"
164+ assert curr_task ["status" ] == "SUCCEEDED"
206165 return skipped_second_auth
207166
208167
168+ def get_dir_and_file_to_archive (run_dir : str ) -> Tuple [str , str ]:
169+ if os .path .exists (run_dir ):
170+ shutil .rmtree (run_dir )
171+ os .mkdir (run_dir )
172+ os .chdir (run_dir )
173+ print (f"Now in { os .getcwd ()} " )
174+ dir_to_archive : str = "dir_to_archive"
175+ txt_file : str = "file0.txt"
176+ os .mkdir (dir_to_archive )
177+ with open (f"{ dir_to_archive } /{ txt_file } " , "w" ) as f :
178+ f .write ("file contents" )
179+ config_path : str = os .path .abspath (dir_to_archive )
180+ assert os .path .isdir (config_path )
181+ return config_path , txt_file
182+
183+
209184def get_all_endpoint_scopes (endpoints : List [str ]) -> str :
210185 inner = " " .join (
211186 [f"*https://auth.globus.org/scopes/{ ep } /data_access" for ep in endpoints ]
212187 )
213188 return f"urn:globus:auth:scope:transfer.api.globus.org:all[{ inner } ]"
214189
215190
191+ def construct_TransferData (
192+ url : ParseResult ,
193+ txt_file : str ,
194+ transfer_client : TransferClient ,
195+ local_endpoint : str ,
196+ remote_endpoint : str ,
197+ ) -> TransferData :
198+ url_path : str = str (url .path )
199+ src_path : str = os .path .join (os .getcwd (), txt_file )
200+ dst_path : str = os .path .join (url_path , txt_file )
201+ subdir = os .path .basename (os .path .normpath (url_path ))
202+ subdir_label : str = re .sub ("[^A-Za-z0-9_ -]" , "" , subdir )
203+ filename : str = txt_file .split ("." )[0 ]
204+ label : str = subdir_label + " " + filename
205+ transfer_data : TransferData = TransferData (
206+ transfer_client ,
207+ local_endpoint , # src_ep
208+ remote_endpoint , # dst_ep
209+ label = label ,
210+ verify_checksum = True ,
211+ preserve_timestamp = True ,
212+ fail_on_quota_errors = True ,
213+ )
214+ transfer_data .add_item (src_path , dst_path )
215+ transfer_data ["label" ] = label
216+ return transfer_data
217+
218+
216219# Run #########################################################################
217220if __name__ == "__main__" :
218221 main ()
0 commit comments