Skip to content
2 changes: 1 addition & 1 deletion zstash/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ def check():
"""
# This basically just goes through the process of extracting the files,
# but doesn't actually save the output.
extract.extract(keep_files=False)
extract.extract(do_extract_files=False)
133 changes: 63 additions & 70 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import os.path
import sqlite3
import sys
from typing import Any, List, Tuple

from six.moves.urllib.parse import urlparse
from typing import Any, List

from .globus import globus_activate, globus_finalize
from .hpss import hpss_put
from .hpss_utils import add_files
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
from .settings import logger
from .utils import (
CommandInfo,
HPSSType,
create_tars_table,
get_files_to_archive,
run_command,
Expand All @@ -24,60 +24,54 @@


def create():
cache: str
cache, args = setup_create()

# Check config fields
if config.path is not None:
path: str = config.path
else:
raise TypeError("Invalid config.path={}".format(config.path))
if config.hpss is not None:
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
command_info = CommandInfo("create")
args = setup_create(command_info)

# Start doing actual work
logger.debug(f"{ts_utc()}: Running zstash create")
logger.debug("Local path : {}".format(path))
logger.debug("HPSS path : {}".format(hpss))
logger.debug("Max size : {}".format(config.maxsize))
logger.debug("Keep local tar files : {}".format(args.keep))
logger.debug(f"Local path: {command_info.config.path}")
logger.debug(f"HPSS path: {command_info.config.hpss}")
logger.debug(f"Max size: {command_info.config.maxsize}")
logger.debug(f"Keep local tar files: {command_info.keep}")

# Make sure input path exists and is a directory
logger.debug("Making sure input path exists and is a directory")
if not os.path.isdir(path):
if not command_info.config.path:
raise ValueError("config.path is undefined")
if not os.path.isdir(command_info.config.path):
# Input path is not a directory
input_path_error_str: str = "Input path should be a directory: {}".format(path)
input_path_error_str: str = (
f"Input path should be a directory: {command_info.config.path}"
)
logger.error(input_path_error_str)
raise NotADirectoryError(input_path_error_str)

if hpss != "none":
url = urlparse(hpss)
if url.scheme == "globus":
# identify globus endpoints
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug(f"{ts_utc()}: Creating target HPSS directory {hpss}")
mkdir_command: str = "hsi -q mkdir -p {}".format(hpss)
mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss)
run_command(mkdir_command, mkdir_error_str)
if command_info.globus_info:
# identify globus endpoints
logger.debug(f"{ts_utc()}: Calling globus_activate")
globus_activate(command_info.globus_info)
elif command_info.hpss_type == HPSSType.SAME_MACHINE_HPSS:
logger.debug(
f"{ts_utc()}: Creating target HPSS directory {command_info.config.hpss}"
)
mkdir_command: str = f"hsi -q mkdir -p {command_info.config.hpss}"
mkdir_error_str: str = (
f"Could not create HPSS directory: {command_info.config.hpss}"
)
run_command(mkdir_command, mkdir_error_str)

# Make sure it is exists and is empty
logger.debug("Making sure target HPSS directory exists and is empty")
# Make sure it is exists and is empty
logger.debug("Making sure target HPSS directory exists and is empty")

ls_command: str = 'hsi -q "cd {}; ls -l"'.format(hpss)
ls_error_str: str = "Target HPSS directory is not empty"
run_command(ls_command, ls_error_str)
ls_command: str = f'hsi -q "cd {command_info.config.hpss}; ls -l"'
ls_error_str: str = "Target HPSS directory is not empty"
run_command(ls_command, ls_error_str)

# Create cache directory
logger.debug(f"{ts_utc()}: Creating local cache directory")
os.chdir(path)
os.chdir(command_info.config.path)
try:
os.makedirs(cache)
os.makedirs(command_info.cache_dir)
except OSError as exc:
if exc.errno != errno.EEXIST:
cache_error_str: str = "Cannot create local cache directory"
Expand All @@ -88,23 +82,24 @@ def create():

# Create and set up the database
logger.debug(f"{ts_utc()}: Calling create_database()")
failures: List[str] = create_database(cache, args)
failures: List[str] = create_database(command_info, args)

# Transfer to HPSS. Always keep a local copy.
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
# Transfer to HPSS. Always keep a local copy of the database.
logger.debug(f"{ts_utc()}: calling hpss_put() for {command_info.get_db_name()}")
hpss_put(command_info, command_info.get_db_name())

logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(non_blocking=args.non_blocking)
if command_info.globus_info:
logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(command_info.globus_info, non_blocking=args.non_blocking)

if len(failures) > 0:
# List the failures
logger.warning("Some files could not be archived")
for file_path in failures:
logger.error("Failed to archive {}".format(file_path))
logger.error(f"Failed to archive {file_path}")


def setup_create() -> Tuple[str, argparse.Namespace]:
def setup_create(command_info: CommandInfo) -> argparse.Namespace:
# Parser
parser: argparse.ArgumentParser = argparse.ArgumentParser(
usage="zstash create [<args>] path", description="Create a new zstash archive"
Expand Down Expand Up @@ -175,27 +170,25 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
if args.verbose:
logger.setLevel(logging.DEBUG)

# Copy configuration
config.path = os.path.abspath(args.path)
config.hpss = args.hpss
config.maxsize = int(1024 * 1024 * 1024 * args.maxsize)
cache: str
if args.cache:
cache = args.cache
else:
cache = DEFAULT_CACHE
command_info.cache_dir = args.cache
command_info.keep = args.keep
command_info.set_dir_to_archive(args.path)
command_info.set_and_scale_maxsize(args.maxsize)
command_info.set_hpss_parameters(args.hpss)

return cache, args
return args


def create_database(cache: str, args: argparse.Namespace) -> List[str]:
def create_database(command_info: CommandInfo, args: argparse.Namespace) -> List[str]:
# Create new database
logger.debug(f"{ts_utc()}:Creating index database")
if os.path.exists(get_db_filename(cache)):
db_name: str = command_info.get_db_name()
if os.path.exists(db_name):
# Remove old database
os.remove(get_db_filename(cache))
os.remove(db_name)
con: sqlite3.Connection = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
db_name, detect_types=sqlite3.PARSE_DECLTYPES
)
cur: sqlite3.Cursor = con.cursor()

Expand Down Expand Up @@ -233,8 +226,8 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:

# Store configuration in database
# Loop through all attributes of config.
for attr in dir(config):
value: Any = getattr(config, attr)
for attr in dir(command_info.config):
value: Any = getattr(command_info.config, attr)
if not callable(value) and not attr.startswith("__"):
# config.{attr} is not a function.
# The attribute name does not start with "__"
Expand All @@ -244,19 +237,20 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
cur.execute("insert into config values (?,?)", (attr, value))
con.commit()

files: List[str] = get_files_to_archive(cache, args.include, args.exclude)
files: List[str] = get_files_to_archive(
command_info.cache_dir, args.include, args.exclude
)

failures: List[str]
if args.follow_symlinks:
try:
# Add files to archive
failures = add_files(
command_info,
cur,
con,
-1,
files,
cache,
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
Expand All @@ -266,12 +260,11 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
else:
# Add files to archive
failures = add_files(
command_info,
cur,
con,
-1,
files,
cache,
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
Expand Down
Loading
Loading