66import os .path
77import sqlite3
88import sys
9- from typing import Any , List , Tuple
9+ from typing import Any , List , Optional , Tuple
1010
1111from six .moves .urllib .parse import urlparse
1212
13- from .globus import globus_activate , globus_finalize
13+ from .globus import GlobusTransferCollection , globus_activate , globus_finalize
1414from .hpss import hpss_put
1515from .hpss_utils import add_files
1616from .settings import DEFAULT_CACHE , config , get_db_filename , logger
@@ -52,12 +52,13 @@ def create():
5252 logger .error (input_path_error_str )
5353 raise NotADirectoryError (input_path_error_str )
5454
55+ gtc : Optional [GlobusTransferCollection ] = None
5556 if hpss != "none" :
5657 url = urlparse (hpss )
5758 if url .scheme == "globus" :
5859 # identify globus endpoints
59- logger .debug (f"{ ts_utc ()} :Calling globus_activate(hpss )" )
60- globus_activate (hpss )
60+ logger .debug (f"{ ts_utc ()} :Calling globus_activate()" )
61+ gtc = globus_activate (hpss )
6162 else :
6263 # config.hpss is not "none", so we need to
6364 # create target HPSS directory
@@ -88,14 +89,16 @@ def create():
8889
8990 # Create and set up the database
9091 logger .debug (f"{ ts_utc ()} : Calling create_database()" )
91- failures : List [str ] = create_database (cache , args )
92+ failures : List [str ] = create_database (cache , args , gtc = gtc )
9293
9394 # Transfer to HPSS. Always keep a local copy.
9495 logger .debug (f"{ ts_utc ()} : calling hpss_put() for { get_db_filename (cache )} " )
95- hpss_put (hpss , get_db_filename (cache ), cache , keep = args .keep , is_index = True )
96+ hpss_put (
97+ hpss , get_db_filename (cache ), cache , keep = args .keep , is_index = True , gtc = gtc
98+ )
9699
97100 logger .debug (f"{ ts_utc ()} : calling globus_finalize()" )
98- globus_finalize (non_blocking = args .non_blocking )
101+ globus_finalize (gtc , non_blocking = args .non_blocking )
99102
100103 if len (failures ) > 0 :
101104 # List the failures
@@ -204,7 +207,9 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
204207 return cache , args
205208
206209
207- def create_database (cache : str , args : argparse .Namespace ) -> List [str ]:
210+ def create_database (
211+ cache : str , args : argparse .Namespace , gtc : Optional [GlobusTransferCollection ]
212+ ) -> List [str ]:
208213 # Create new database
209214 logger .debug (f"{ ts_utc ()} :Creating index database" )
210215 if os .path .exists (get_db_filename (cache )):
@@ -263,26 +268,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
263268 files : List [str ] = get_files_to_archive (cache , args .include , args .exclude )
264269
265270 failures : List [str ]
266- if args .follow_symlinks :
267- try :
268- # Add files to archive
269- failures = add_files (
270- cur ,
271- con ,
272- - 1 ,
273- files ,
274- cache ,
275- args .keep ,
276- args .follow_symlinks ,
277- skip_tars_md5 = args .no_tars_md5 ,
278- non_blocking = args .non_blocking ,
279- error_on_duplicate_tar = args .error_on_duplicate_tar ,
280- overwrite_duplicate_tars = args .overwrite_duplicate_tars ,
281- force_database_corruption = args .for_developers_force_database_corruption ,
282- )
283- except FileNotFoundError :
284- raise Exception ("Archive creation failed due to broken symlink." )
285- else :
271+ try :
286272 # Add files to archive
287273 failures = add_files (
288274 cur ,
@@ -297,7 +283,13 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
297283 error_on_duplicate_tar = args .error_on_duplicate_tar ,
298284 overwrite_duplicate_tars = args .overwrite_duplicate_tars ,
299285 force_database_corruption = args .for_developers_force_database_corruption ,
286+ gtc = gtc ,
300287 )
288+ except FileNotFoundError as e :
289+ if args .follow_symlinks :
290+ raise Exception ("Archive creation failed due to broken symlink." )
291+ else :
292+ raise e
301293
302294 # Close database
303295 con .commit ()
0 commit comments