diff --git a/tests/base.py b/tests/base.py index 5e93b32b..0327ed21 100644 --- a/tests/base.py +++ b/tests/base.py @@ -177,8 +177,11 @@ def setupDirs(self, test_name): """ Set up directories for testing. """ - # FIXME: Item "None" of "Optional[Any]" has no attribute "lower"mypy(error) - if self.hpss_path.lower() == "none": # type: ignore + if self.hpss_path: + hpss_path = self.hpss_path + else: + raise Exception("Invalid self.hpss_path={}".format(self.hpss_path)) + if hpss_path.lower() == "none": use_hpss = False else: use_hpss = True diff --git a/tests/test_check.py b/tests/test_check.py index e5dd5642..e2086c08 100644 --- a/tests/test_check.py +++ b/tests/test_check.py @@ -87,15 +87,14 @@ def helperCheckVerboseMismatch(self, test_name, hpss_path, zstash_path=ZSTASH_PA "{}/index.db".format(self.cache), "{}/index_old.db".format(self.cache) ) print("Messing up the MD5 of all of the files with an even id.") - cmd = [ + sqlite_cmd = [ "sqlite3", "{}/index.db".format(self.cache), "UPDATE files SET md5 = 0 WHERE id % 2 = 0;", ] - run_cmd(cmd) - # FIXME: Incompatible types in assignment (expression has type "str", variable has type "List[str]") mypy(error) - cmd = "{}zstash check -v --hpss={}".format(zstash_path, self.hpss_path) # type: ignore - output, err = run_cmd(cmd) + run_cmd(sqlite_cmd) + zstash_cmd = "{}zstash check -v --hpss={}".format(zstash_path, self.hpss_path) + output, err = run_cmd(zstash_cmd) # These files have an even `id` in the sqlite3 table. expected_present = [ "md5 mismatch for: dir/file1.txt", @@ -110,7 +109,7 @@ def helperCheckVerboseMismatch(self, test_name, hpss_path, zstash_path=ZSTASH_PA "ERROR: 000003.tar", "ERROR: 000005.tar", ] - self.check_strings(cmd, output + err, expected_present, expected_absent) + self.check_strings(zstash_cmd, output + err, expected_present, expected_absent) # Put the original index.db back. os.remove("{}/index.db".format(self.cache)) shutil.copy( diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 37658072..9bbfb8e4 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -80,17 +80,16 @@ def helperCheckParallelVerboseMismatch( "{}/index.db".format(self.cache), "{}/index_old.db".format(self.cache) ) print("Messing up the MD5 of all of the files with an even id.") - cmd = [ + sqlite_cmd = [ "sqlite3", "{}/index.db".format(self.cache), "UPDATE files SET md5 = 0 WHERE id % 2 = 0;", ] - run_cmd(cmd) - # FIXME: Incompatible types in assignment (expression has type "str", variable has type "List[str]") mypy(error) - cmd = "{}zstash check -v --hpss={} --workers=3".format( # type: ignore + run_cmd(sqlite_cmd) + zstash_cmd = "{}zstash check -v --hpss={} --workers=3".format( zstash_path, self.hpss_path ) - output, err = run_cmd(cmd) + output, err = run_cmd(zstash_cmd) # These files have an even `id` in the sqlite3 table. expected_present = [ "md5 mismatch for: dir/file1.txt", @@ -105,7 +104,7 @@ def helperCheckParallelVerboseMismatch( "ERROR: 000003.tar", "ERROR: 000005.tar", ] - self.check_strings(cmd, output + err, expected_present, expected_absent) + self.check_strings(zstash_cmd, output + err, expected_present, expected_absent) # Put the original index.db back. os.remove("{}/index.db".format(self.cache)) shutil.copy( @@ -115,12 +114,12 @@ def helperCheckParallelVerboseMismatch( print("Verifying the data from database with the actual files") # Checksums from HPSS - cmd = [ + sqlite_cmd = [ "sqlite3", "{}/{}/index.db".format(self.test_dir, self.cache), "SELECT md5, name FROM files;", ] - output_hpss, err_hpss = run_cmd(cmd) + output_hpss, err_hpss = run_cmd(sqlite_cmd) hpss_dict = {} for checksum in output_hpss.split("\n"): @@ -131,13 +130,11 @@ def helperCheckParallelVerboseMismatch( hpss_dict[f_name] = f_hash # Checksums from local files - # FIXME: Incompatible types in assignment (expression has type "str", variable has type "List[str]") mypy(error) - cmd = "find {} ".format(self.backup_dir) # type: ignore - cmd += ( - # FIXME: W605 invalid escape sequence '\.' - """-regex .*\.txt.* -exec md5sum {} + """ # Literal {}, not for formatting # noqa: W605 + find_cmd = "find {} ".format(self.backup_dir) + find_cmd += ( + r"""-regex .*\.txt.* -exec md5sum {} + """ # Literal {}, not for formatting ) - output_local, err_local = run_cmd(cmd) + output_local, err_local = run_cmd(find_cmd) local_dict = {} for checksum in output_local.split("\n"): diff --git a/tests/test_extract.py b/tests/test_extract.py index 881dc892..29baa304 100644 --- a/tests/test_extract.py +++ b/tests/test_extract.py @@ -150,8 +150,7 @@ def helperExtractKeep(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): ], ): error_message = "The zstash directory does not contain expected files.\nIt has: {}".format( - # FIXME: F821 undefined name 'd' - os.listdir(d) # type: ignore # noqa: F821 + os.listdir(self.cache) ) self.stop(error_message) os.chdir(TOP_LEVEL) diff --git a/zstash/chgrp.py b/zstash/chgrp.py index 99114e88..84393b77 100644 --- a/zstash/chgrp.py +++ b/zstash/chgrp.py @@ -11,7 +11,7 @@ def chgrp(): # Parser - parser = argparse.ArgumentParser( + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="zstash chgrp [] group hpss_archive", description="Change the group of an HPSS repository.", ) @@ -24,11 +24,12 @@ def chgrp(): "-v", "--verbose", action="store_true", help="increase output verbosity" ) - args = parser.parse_args(sys.argv[2:]) + args: argparse.Namespace = parser.parse_args(sys.argv[2:]) if args.hpss and args.hpss.lower() == "none": args.hpss = "none" + # Start doing actual work if args.verbose: logger.setLevel(logging.DEBUG) - recurse = True if args.R else False + recurse: bool = True if args.R else False hpss_chgrp(args.hpss, args.group, recurse) diff --git a/zstash/create.py b/zstash/create.py index ee310e8a..fcec3415 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -6,32 +6,103 @@ import os.path import sqlite3 import sys -from typing import List, Tuple +from typing import Any, List, Tuple from .hpss import hpss_put from .hpss_utils import add_files from .settings import DEFAULT_CACHE, config, get_db_filename, logger -from .utils import exclude_files, run_command - -con = None -cur = None +from .utils import get_files_to_archive, run_command def create(): + cache: str + exclude: str + cache, exclude = setup_create() + + # Check config fields + if config.path is not None: + path: str = config.path + else: + raise Exception("Invalid config.path={}".format(config.path)) + if config.hpss is not None: + hpss: str = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + + # Start doing actual work + logger.debug("Running zstash create") + logger.debug("Local path : {}".format(path)) + logger.debug("HPSS path : {}".format(hpss)) + logger.debug("Max size : {}".format(config.maxsize)) + logger.debug("Keep local tar files : {}".format(config.keep)) + + # Make sure input path exists and is a directory + logger.debug("Making sure input path exists and is a directory") + if not os.path.isdir(path): + # Input path is not a directory + input_path_error_str: str = "Input path should be a directory: {}".format(path) + logger.error(input_path_error_str) + raise Exception(input_path_error_str) + + if hpss != "none": + # config.hpss is not "none", so we need to + # create target HPSS directory + logger.debug("Creating target HPSS directory") + mkdir_command: str = "hsi -q mkdir -p {}".format(hpss) + mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss) + run_command(mkdir_command, mkdir_error_str) + + # Make sure it is exists and is empty + logger.debug("Making sure target HPSS directory exists and is empty") + ls_command: str = 'hsi -q "cd {}; ls -l"'.format(hpss) + ls_error_str: str = "Target HPSS directory is not empty" + run_command(ls_command, ls_error_str) + + # Create cache directory + logger.debug("Creating local cache directory") + os.chdir(path) + try: + os.makedirs(cache) + except OSError as exc: + if exc.errno != errno.EEXIST: + cache_error_str: str = "Cannot create local cache directory" + logger.error(cache_error_str) + raise Exception(cache_error_str) + + # TODO: Verify that cache is empty + + # Create and set up the database + failures: List[str] = create_database(cache, exclude) + + # Transfer to HPSS. Always keep a local copy. + hpss_put(hpss, get_db_filename(cache), cache, keep=True) + + if len(failures) > 0: + # List the failures + logger.warning("Some files could not be archived") + for file_path in failures: + logger.error("Failed to archive {}".format(file_path)) + + +def setup_create() -> Tuple[str, str]: # Parser - parser = argparse.ArgumentParser( + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="zstash create [] path", description="Create a new zstash archive" ) parser.add_argument("path", type=str, help="root directory to archive") - required = parser.add_argument_group("required named arguments") + required: argparse._ArgumentGroup = parser.add_argument_group( + "required named arguments" + ) required.add_argument( "--hpss", type=str, help='path to storage on HPSS. Set to "none" for local archiving. Must be set to "none" if the machine does not have HPSS access.', required=True, ) - optional = parser.add_argument_group("optional named arguments") + optional: argparse._ArgumentGroup = parser.add_argument_group( + "optional named arguments" + ) optional.add_argument( "--exclude", type=str, help="comma separated list of file patterns to exclude" ) @@ -56,7 +127,7 @@ def create(): ) # Now that we're inside a subcommand, ignore the first two argvs # (zstash create) - args = parser.parse_args(sys.argv[2:]) + args: argparse.Namespace = parser.parse_args(sys.argv[2:]) if args.hpss and args.hpss.lower() == "none": args.hpss = "none" if args.verbose: @@ -65,67 +136,27 @@ def create(): # Copy configuration config.path = os.path.abspath(args.path) config.hpss = args.hpss - # FIXME: Incompatible types in assignment (expression has type "int", variable has type "None") mypy(error) - # Solution: https://stackoverflow.com/a/42279784 - config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) # type: ignore + config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) config.keep = args.keep + cache: str if args.cache: cache = args.cache else: cache = DEFAULT_CACHE - # Start doing actual work - logger.debug("Running zstash create") - logger.debug("Local path : %s" % (config.path)) - logger.debug("HPSS path : %s" % (config.hpss)) - logger.debug("Max size : %i" % (config.maxsize)) # type: ignore - logger.debug("Keep local tar files : %s" % (config.keep)) - - # Make sure input path exists and is a directory - logger.debug("Making sure input path exists and is a directory") - # FIXME: Argument 1 to "isdir" has incompatible type "None"; expected "Union[str, bytes, _PathLike[str], _PathLike[bytes]]"mypy(error) - if not os.path.isdir(config.path): # type: ignore - error_str = "Input path should be a directory: {}".format(config.path) - logger.error(error_str) - raise Exception(error_str) - - if config.hpss != "none": - # Create target HPSS directory if needed - logger.debug("Creating target HPSS directory") - command = "hsi -q mkdir -p {}".format(config.hpss) - error_str = "Could not create HPSS directory: {}".format(config.hpss) - run_command(command, error_str) - - # Make sure it is empty - logger.debug("Making sure target HPSS directory exists and is empty") + return cache, args.exclude - command = 'hsi -q "cd {}; ls -l"'.format(config.hpss) - error_str = "Target HPSS directory is not empty" - run_command(command, error_str) - - # Create cache directory - logger.debug("Creating local cache directory") - # FIXME: Argument 1 to "chdir" has incompatible type "None"; expected "Union[int, Union[str, bytes, _PathLike[str], _PathLike[bytes]]]" mypy(error) - os.chdir(config.path) # type: ignore - try: - os.makedirs(cache) - except OSError as exc: - if exc.errno != errno.EEXIST: - error_str = "Cannot create local cache directory" - logger.error(error_str) - raise Exception(error_str) - pass - - # Verify that cache is empty - # ...to do (?) +def create_database(cache: str, exclude: str) -> List[str]: # Create new database logger.debug("Creating index database") if os.path.exists(get_db_filename(cache)): + # Remove old database os.remove(get_db_filename(cache)) - global con, cur - con = sqlite3.connect(get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES) - cur = con.cursor() + con: sqlite3.Connection = sqlite3.connect( + get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES + ) + cur: sqlite3.Cursor = con.cursor() # Create 'config' table cur.execute( @@ -155,48 +186,25 @@ def create(): con.commit() # Store configuration in database + # Loop through all attributes of config. for attr in dir(config): - value = getattr(config, attr) + value: Any = getattr(config, attr) if not callable(value) and not attr.startswith("__"): + # config.{attr} is not a function. + # The attribute name does not start with "__" + # This creates a new row in the 'config' table. + # Insert attr for column 1 ('arg') + # Insert value for column 2 ('text') cur.execute(u"insert into config values (?,?)", (attr, value)) con.commit() - # List of files - logger.info("Gathering list of files to archive") - files: List[Tuple[str, str]] = [] - for root, dirnames, filenames in os.walk("."): - # Empty directory - if not dirnames and not filenames: - files.append((root, "")) - # Loop over files - for filename in filenames: - files.append((root, filename)) - - # Sort files by directories and filenames - files = sorted(files, key=lambda x: (x[0], x[1])) - - # Relative file path, eliminating top level zstash directory - # FIXME: Name 'files' already defined mypy(error) - files: List[str] = [ # type: ignore - os.path.normpath(os.path.join(x[0], x[1])) - for x in files - if x[0] != os.path.join(".", cache) - ] - - # Eliminate files based on exclude pattern - if args.exclude is not None: - files = exclude_files(args.exclude, files) + files: List[str] = get_files_to_archive(cache, exclude) # Add files to archive - failures = add_files(cur, con, -1, files, cache) + failures: List[str] = add_files(cur, con, -1, files, cache) - # Close database and transfer to HPSS. Always keep local copy + # Close database con.commit() con.close() - hpss_put(config.hpss, get_db_filename(cache), cache, keep=True) - # List failures - if len(failures) > 0: - logger.warning("Some files could not be archived") - for file_path in failures: - logger.error("Archiving %s" % (file_path)) + return failures diff --git a/zstash/extract.py b/zstash/extract.py index 374922cf..303ffb91 100644 --- a/zstash/extract.py +++ b/zstash/extract.py @@ -12,7 +12,10 @@ import tarfile import traceback from datetime import datetime -from typing import List +from typing import DefaultDict, List, Optional, Tuple + +import _hashlib +import _io from . import parallel from .hpss import hpss_get @@ -20,108 +23,50 @@ BLOCK_SIZE, DEFAULT_CACHE, TIME_TOL, + FilesRow, + TupleFilesRow, config, get_db_filename, logger, ) - -con = None -cur = None +from .utils import update_config -def multiprocess_extract(num_workers, matches, keep_files, keep_tars, cache): +def extract(keep_files: bool = True): """ - Extract the files from the matches in parallel. - - A single unit of work is a tar and all of - the files in it to extract. + Given an HPSS path in the zstash database or passed via the command line, + extract the archived data based on the file pattern (if given). """ - # A dict of tar -> size of files in it. - # This is because we're trying to balance the load between - # the processes. - # FIXME: Need type annotation for 'tar_to_size' mypy(error) - tar_to_size = collections.defaultdict(float) # type: ignore - for db_row in matches: - tar, size = db_row[5], db_row[2] - tar_to_size[tar] += size - # Sort by the size. - # FIXME: Incompatible types in assignment (expression has type "OrderedDict[Any, Any]", variable has type "defaultdict[Any, Any]") mypy(error) - tar_to_size = collections.OrderedDict( # type: ignore - sorted(tar_to_size.items(), key=lambda x: x[1]) - ) - - # We don't want to instantiate more processes than we need to. - num_workers = min(num_workers, len(tar_to_size)) - - # For worker i, workers_to_tars[i] is a set of tars - # that worker i will work on. - # FIXME: Need type annotation for 'workers_to_tars' mypy(error) - workers_to_tars = [set() for _ in range(num_workers)] # type: ignore - # A min heap, of (work, worker_idx) tuples, work is the size of data - # that worker_idx needs to work on. - # We can efficiently get the worker with the least amount of work. - work_to_workers = [(0, i) for i in range(num_workers)] - heapq.heapify(workers_to_tars) + args: argparse.Namespace + cache: str + args, cache = setup_extract() - # Using a greedy approach, populate workers_to_tars. - for _, tar in enumerate(tar_to_size): - # The worker with the least work should get the current largest amount of work. - workers_work, worker_idx = heapq.heappop(work_to_workers) - workers_to_tars[worker_idx].add(tar) - # Add this worker back to the heap, with the new amount of work. - heapq.heappush(work_to_workers, (workers_work + tar_to_size[tar], worker_idx)) + failures: List[FilesRow] = extract_database(args, cache, keep_files) - # For worker i, workers_to_matches[i] is a list of - # matches from the database for it to process. - # FIXME: Need type annotation for 'workers_to_matches' - workers_to_matches = [[] for _ in range(num_workers)] # type: ignore - for db_row in matches: - tar = db_row[5] - for worker_idx in range(len(workers_to_tars)): - if tar in workers_to_tars[worker_idx]: - # This worker gets this db_row. - workers_to_matches[worker_idx].append(db_row) - - tar_ordering = sorted([tar for tar in tar_to_size]) - monitor = parallel.PrintMonitor(tar_ordering) + if failures: + logger.error("Encountered an error for files:") - # The return value for extractFiles will be added here. - # FIXME: Need type annotation for 'failure_queue' mypy(error) - failure_queue = multiprocessing.Queue() # type: ignore - processes = [] - for matches in workers_to_matches: - tars_for_this_worker = list(set(match[5] for match in matches)) - worker = parallel.ExtractWorker(monitor, tars_for_this_worker, failure_queue) - process = multiprocessing.Process( - target=extractFiles, args=(matches, keep_files, keep_tars, cache, worker) - ) - process.start() - processes.append(process) + for fail in failures: + logger.error("{} in {}".format(fail.name, fail.tar)) - # While the processes are running, we need to empty the queue. - # Otherwise, it causes hanging. - # No need to join() each of the processes when doing this, - # cause we'll be in this loop until completion. - failures = [] - while any(p.is_alive() for p in processes): - while not failure_queue.empty(): - failures.append(failure_queue.get()) + broken_tars: List[str] = sorted(set([f.tar for f in failures])) - # Sort the failures, since they can come in at any order. - failures.sort(key=lambda x: (x[1], x[5], x[6])) - return failures + logger.error("The following tar archives had errors:") + for tar in broken_tars: + logger.error(tar) + else: + verb: str = "extracting" if keep_files else "checking" + logger.info("No failures detected when {} the files.".format(verb)) -def extract(keep_files=True): - """ - Given an HPSS path in the zstash database or passed via the command line, - extract the archived data based on the file pattern (if given). - """ - parser = argparse.ArgumentParser( +def setup_extract() -> Tuple[argparse.Namespace, str]: + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="zstash extract [] [files]", description="Extract files from existing archive", ) - optional = parser.add_argument_group("optional named arguments") + optional: argparse._ArgumentGroup = parser.add_argument_group( + "optional named arguments" + ) optional.add_argument( "--hpss", type=str, @@ -144,7 +89,7 @@ def extract(keep_files=True): "-v", "--verbose", action="store_true", help="increase output verbosity" ) parser.add_argument("files", nargs="*", default=["*"]) - args = parser.parse_args(sys.argv[2:]) + args: argparse.Namespace = parser.parse_args(sys.argv[2:]) if args.hpss and args.hpss.lower() == "none": args.hpss = "none" if args.cache: @@ -157,88 +102,105 @@ def extract(keep_files=True): if args.verbose or args.workers > 1: logger.setLevel(logging.DEBUG) + return args, cache + + +def extract_database( + args: argparse.Namespace, cache: str, keep_files: bool +) -> List[FilesRow]: + # Open database logger.debug("Opening index database") if not os.path.exists(get_db_filename(cache)): # Will need to retrieve from HPSS if args.hpss is not None: config.hpss = args.hpss - hpss_get(config.hpss, get_db_filename(cache), cache) + if config.hpss is not None: + hpss: str = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + hpss_get(hpss, get_db_filename(cache), cache) else: - error_str = ( + error_str: str = ( "--hpss argument is required when local copy of database is unavailable" ) logger.error(error_str) raise Exception(error_str) - global con, cur - con = sqlite3.connect(get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES) - cur = con.cursor() - - # Retrieve some configuration settings from database - for attr in dir(config): - value = getattr(config, attr) - if not callable(value) and not attr.startswith("__"): - cur.execute(u"select value from config where arg=?", (attr,)) - value = cur.fetchone()[0] - setattr(config, attr, value) - # FIXME: No overload variant of "int" matches argument type "None" mypy(error) - config.maxsize = int(config.maxsize) # type: ignore - # FIXME: Incompatible types in assignment (expression has type "bool", variable has type "None")mypy(error) - config.keep = bool(int(config.keep)) # type: ignore + con: sqlite3.Connection = sqlite3.connect( + get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES + ) + cur: sqlite3.Cursor = con.cursor() + + update_config(cur) + if config.maxsize is not None: + maxsize = config.maxsize + else: + raise Exception("Invalid config.maxsize={}".format(config.maxsize)) + config.maxsize = int(maxsize) + if config.keep is not None: + keep = config.keep + else: + raise Exception("Invalid config.keep={}".format(config.keep)) + config.keep = bool(int(keep)) # The command line arg should always have precedence if args.hpss is not None: config.hpss = args.hpss if config.hpss == "none": # If no HPSS is available, always keep the files. - # FIXME: Incompatible types in assignment (expression has type "bool", variable has type "None") mypy(error) - config.keep = True # type: ignore + config.keep = True else: config.keep = args.keep # Start doing actual work - cmd = "extract" if keep_files else "check" + cmd: str = "extract" if keep_files else "check" logger.debug("Running zstash " + cmd) - logger.debug("Local path : %s" % (config.path)) - logger.debug("HPSS path : %s" % (config.hpss)) - # FIXME: Incompatible types in string interpolation (expression has type "None", placeholder has type "Union[int, float, SupportsInt]") mypy(error) - logger.debug("Max size : %i" % (config.maxsize)) # type: ignore - logger.debug("Keep local tar files : %s" % (config.keep)) + logger.debug("Local path : {}".format(config.path)) + logger.debug("HPSS path : {}".format(config.hpss)) + logger.debug("Max size : {}".format(config.maxsize)) + logger.debug("Keep local tar files : {}".format(config.keep)) # Find matching files - matches: List[str] = [] + matches_: List[TupleFilesRow] = [] for args_file in args.files: cur.execute( u"select * from files where name GLOB ? or tar GLOB ?", (args_file, args_file), ) - matches = matches + cur.fetchall() + matches_ = matches_ + cur.fetchall() + + matches: List[FilesRow] = list(map(lambda match: FilesRow(match), matches_)) # Sort by the filename, tape (so the tar archive), # and order within tapes (offset). - matches.sort(key=lambda x: (x[1], x[5], x[6])) + matches.sort(key=lambda t: (t.name, t.tar, t.offset)) # Based off the filenames, keep only the last instance of a file. # This is because we may have different versions of the # same file across many tars. + insert_idx: int + iter_inx: int insert_idx, iter_idx = 0, 1 for iter_idx in range(1, len(matches)): # If the filenames are unique, just increment insert_idx. # iter_idx will increment after this iteration. - if matches[insert_idx][1] != matches[iter_idx][1]: + # (matches[x][1] is the name.) + if matches[insert_idx].name != matches[iter_idx].name: insert_idx += 1 # Always copy over the value at the correct location. matches[insert_idx] = matches[iter_idx] + # `matches` will only be as long as the number of unique filenames matches = matches[: insert_idx + 1] # Sort by tape and offset, so that we make sure # that extract the files by tape order. - matches.sort(key=lambda x: (x[5], x[6])) + matches.sort(key=lambda t: (t.tar, t.offset)) # Retrieve from tapes + failures: List[FilesRow] if args.workers > 1: logger.debug("Running zstash {} with multiprocessing".format(cmd)) failures = multiprocess_extract( @@ -251,48 +213,113 @@ def extract(keep_files=True): logger.debug("Closing index database") con.close() - if failures: - logger.error("Encountered an error for files:") + return failures - for fail in failures: - logger.error("{} in {}".format(fail[1], fail[5])) - broken_tars = sorted(set([f[5] for f in failures])) +def multiprocess_extract( + num_workers: int, + matches: List[FilesRow], + keep_files: bool, + keep_tars: Optional[bool], + cache: str, +) -> List[FilesRow]: + """ + Extract the files from the matches in parallel. - logger.error("The following tar archives had errors:") - for tar in broken_tars: - logger.error(tar) - else: - verb = "extracting" if keep_files else "checking" - logger.info("No failures detected when {} the files.".format(verb)) + A single unit of work is a tar and all of + the files in it to extract. + """ + # A dict of tar -> size of files in it. + # This is because we're trying to balance the load between + # the processes. + tar_to_size_unsorted: DefaultDict[str, float] = collections.defaultdict(float) + db_row: FilesRow + tar: str + size: int + for db_row in matches: + tar, size = db_row.tar, db_row.size + tar_to_size_unsorted[tar] += size + # Sort by the size. + tar_to_size: collections.OrderedDict[str, float] = collections.OrderedDict( + sorted(tar_to_size_unsorted.items(), key=lambda x: x[1]) + ) + # We don't want to instantiate more processes than we need to. + # So, if the number of tars is less than the number of workers, + # set the number of workers to the number of tars. + num_workers = min(num_workers, len(tar_to_size)) -def should_extract_file(db_row): - """ - If a file is on disk already with the correct - timestamp and size, don't extract the file. - """ - file_name, size_db, mod_time_db = db_row[1], db_row[2], db_row[3] + # For worker i, workers_to_tars[i] is a set of tars + # that worker i will work on. + workers_to_tars: List[set] = [set() for _ in range(num_workers)] + # A min heap, of (work, worker_idx) tuples, work is the size of data + # that worker_idx needs to work on. + # We can efficiently get the worker with the least amount of work. + work_to_workers: List[Tuple[int, int]] = [(0, i) for i in range(num_workers)] + heapq.heapify(workers_to_tars) - if not os.path.exists(file_name): - # We must get files that are not on disk. - return True + # Using a greedy approach, populate workers_to_tars. + for _, tar in enumerate(tar_to_size): + # The worker with the least work should get the current largest amount of work. + workers_work: int + worker_idx: int + workers_work, worker_idx = heapq.heappop(work_to_workers) + workers_to_tars[worker_idx].add(tar) + # Add this worker back to the heap, with the new amount of work. + worker_tuple: Tuple[float, int] = (workers_work + tar_to_size[tar], worker_idx) + # FIXME: error: Cannot infer type argument 1 of "heappush" + heapq.heappush(work_to_workers, worker_tuple) # type: ignore - size_disk = os.path.getsize(file_name) - mod_time_disk = datetime.utcfromtimestamp(os.path.getmtime(file_name)) + # For worker i, workers_to_matches[i] is a list of + # matches from the database for it to process. + workers_to_matches: List[List[FilesRow]] = [[] for _ in range(num_workers)] + for db_row in matches: + tar = db_row.tar + workers_idx: int + for worker_idx in range(len(workers_to_tars)): + if tar in workers_to_tars[worker_idx]: + # This worker gets this db_row. + workers_to_matches[worker_idx].append(db_row) - # Only extract when the times and sizes are not the same. - # We have a TIME_TOL because mod_time_disk doesn't have the microseconds. - return not ( - size_disk == size_db - and abs(mod_time_disk - mod_time_db).total_seconds() < TIME_TOL - ) + tar_ordering: List[str] = sorted([tar for tar in tar_to_size]) + monitor: parallel.PrintMonitor = parallel.PrintMonitor(tar_ordering) + # The return value for extractFiles will be added here. + failure_queue: multiprocessing.Queue[FilesRow] = multiprocessing.Queue() + processes: List[multiprocessing.Process] = [] + for matches in workers_to_matches: + tars_for_this_worker: List[str] = list(set(match.tar for match in matches)) + worker: parallel.ExtractWorker = parallel.ExtractWorker( + monitor, tars_for_this_worker, failure_queue + ) + process: multiprocessing.Process = multiprocessing.Process( + target=extractFiles, args=(matches, keep_files, keep_tars, cache, worker) + ) + process.start() + processes.append(process) -# FIXME: C901 'extractFiles' is too complex (29) + # While the processes are running, we need to empty the queue. + # Otherwise, it causes hanging. + # No need to join() each of the processes when doing this, + # because we'll be in this loop until completion. + failures: List[FilesRow] = [] + while any(p.is_alive() for p in processes): + while not failure_queue.empty(): + failures.append(failure_queue.get()) + + # Sort the failures, since they can come in at any order. + failures.sort(key=lambda t: (t.name, t.tar, t.offset)) + return failures + + +# C901 'extractFiles' is too complex (20) def extractFiles( # noqa: C901 - files, keep_files, keep_tars, cache, multiprocess_worker=None -): + files: List[FilesRow], + keep_files: bool, + keep_tars: Optional[bool], + cache: str, + multiprocess_worker: Optional[parallel.ExtractWorker] = None, +) -> List[FilesRow]: """ Given a list of database rows, extract the files from the tar archives to the current location on disk. @@ -307,101 +334,76 @@ def extractFiles( # noqa: C901 If running in parallel, then multiprocess_worker is the Worker that called this function. We need a reference to it so we can signal it to print - the contents of what's in it's print queue. + the contents of what's in its print queue. """ - failures = [] - tfname = None - newtar = True - nfiles = len(files) + failures: List[FilesRow] = [] + tfname: Optional[str] = None + newtar: bool = True + nfiles: int = len(files) if multiprocess_worker: # All messages to the logger will now be sent to # this queue, instead of sys.stdout. - sh = logging.StreamHandler(multiprocess_worker.print_queue) + # error: Argument 1 to "StreamHandler" has incompatible type "PrintQueue"; expected "Optional[IO[str]]" + sh = logging.StreamHandler(multiprocess_worker.print_queue) # type: ignore sh.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(levelname)s: %(message)s") + formatter: logging.Formatter = logging.Formatter("%(levelname)s: %(message)s") sh.setFormatter(formatter) logger.addHandler(sh) # Don't have the logger print to the console as the message come in. logger.propagate = False for i in range(nfiles): - # The current structure of each of the db row, `file`, is: - # (id, name, size, mtime, md5, tar, offset) - file_tuple = files[i] + files_row: FilesRow = files[i] # Open new tar archive if newtar: - newtar = False - tfname = os.path.join(cache, file_tuple[5]) - # Everytime we're extracting a new tar, if running in parallel, - # let the process know. - # This is to synchronize the print statements. - if multiprocess_worker: - multiprocess_worker.set_curr_tar(file_tuple[5]) - - if not os.path.exists(tfname): - # Will need to retrieve from HPSS - hpss_get(config.hpss, tfname, cache) - - logger.info("Opening tar archive %s" % (tfname)) - tar = tarfile.open(tfname, "r") + newtar, tfname, tar = open_new_tar_archive( + cache, files_row, multiprocess_worker + ) # Extract file - cmd = "Extracting" if keep_files else "Checking" - logger.info(cmd + " %s" % (file_tuple[1])) + cmd: str = "Extracting" if keep_files else "Checking" + logger.info(cmd + " %s" % (files_row.name)) # if multiprocess_worker: # print('{} is {} {} from {}'.format(multiprocess_worker, cmd, file[1], file[5])) - if keep_files and not should_extract_file(file_tuple): + if keep_files and not should_extract_file(files_row): # If we were going to extract, but aren't # because a matching file is on disk - msg = "Not extracting {}, because it" + msg: str = "Not extracting {}, because it" msg += " already exists on disk with the same" msg += " size and modification date." - logger.info(msg.format(file_tuple[1])) + logger.info(msg.format(files_row.name)) # True if we should actually extract the file from the tar - extract_this_file = keep_files and should_extract_file(file_tuple) + extract_this_file: bool = keep_files and should_extract_file(files_row) try: # Seek file position - # FIXME: Item "None" of "Optional[IO[bytes]]" has no attribute "seek" mypy(error) - tar.fileobj.seek(file_tuple[6]) # type: ignore + if tar.fileobj is not None: + fileobj: _io.BufferedReader = tar.fileobj + else: + raise Exception("Invalid tar.fileobj={}".format(tar.fileobj)) + fileobj.seek(files_row.offset) # Get next member - tarinfo = tar.tarinfo.fromtarfile(tar) + tarinfo: tarfile.TarInfo = tar.tarinfo.fromtarfile(tar) if tarinfo.isfile(): # fileobj to extract - try: - fin = tar.extractfile(tarinfo) - fname = tarinfo.name - path, name = os.path.split(fname) - if path != "" and extract_this_file: - if not os.path.isdir(path): - os.makedirs(path) - if extract_this_file: - # If we're keeping the files, - # then have an output file - fout = open(fname, "wb") - - hash_md5 = hashlib.md5() - while True: - # FIXME: Item "None" of "Optional[IO[bytes]]" has no attribute "read" mypy(error) - s = fin.read(BLOCK_SIZE) # type: ignore - if len(s) > 0: - hash_md5.update(s) - if extract_this_file: - fout.write(s) - if len(s) < BLOCK_SIZE: - break - finally: - # FIXME: Item "None" of "Optional[IO[bytes]]" has no attribute "close" mypy(error) - fin.close() # type: ignore - if extract_this_file: - fout.close() - - md5 = hash_md5.hexdigest() + # FIXME: error: Name 'tarfile.ExFileObject' is not defined + extracted_file: Optional[tarfile.ExFileObject] = tar.extractfile(tarinfo) # type: ignore + if extracted_file: + # FIXME: error: Name 'tarfile.ExFileObject' is not defined + fin: tarfile.ExFileObject = extracted_file # type: ignore + else: + raise Exception("Invalid extracted_file={}".format(extracted_file)) + hash_md5: Optional[_hashlib.HASH] + fname: str + hash_md5, fname = extract_file(tarinfo, extract_this_file, fin) + md5: Optional[str] = None + if hash_md5: + md5 = hash_md5.hexdigest() if extract_this_file: # numeric_owner is a required arg in Python 3. # If True, "only the numbers for user/group names @@ -410,18 +412,19 @@ def extractFiles( # noqa: C901 tar.chmod(tarinfo, fname) tar.utime(tarinfo, fname) # Verify size - if os.path.getsize(fname) != file_tuple[2]: - logger.error("size mismatch for: %s" % (fname)) + if os.path.getsize(fname) != files_row.size: + logger.error("size mismatch for: {}".format(fname)) # Verify md5 checksum - if md5 != file_tuple[4]: - logger.error("md5 mismatch for: %s" % (fname)) - logger.error("md5 of extracted file: %s" % (md5)) - logger.error("md5 of original file: %s" % (file_tuple[4])) + files_row_md5: Optional[str] = files_row.md5 + if md5 != files_row_md5: + logger.error("md5 mismatch for: {}".format(fname)) + logger.error("md5 of extracted file: {}".format(md5)) + logger.error("md5 of original file: {}".format(files_row_md5)) - failures.append(file_tuple) + failures.append(files_row) else: - logger.debug("Valid md5: %s %s" % (md5, fname)) + logger.debug("Valid md5: {} {}".format(md5, fname)) elif extract_this_file: tar.extract(tarinfo) @@ -430,38 +433,28 @@ def extractFiles( # noqa: C901 # relying here on 'touch'. This is not the prettiest solution. # Maybe a better one can be implemented later. if tarinfo.issym(): - tmp1 = tarinfo.mtime - tmp2 = datetime.fromtimestamp(tmp1) - tmp3 = tmp2.strftime("%Y%m%d%H%M.%S") + tmp1: int = tarinfo.mtime + tmp2: datetime = datetime.fromtimestamp(tmp1) + tmp3: str = tmp2.strftime("%Y%m%d%H%M.%S") os.system("touch -h -t %s %s" % (tmp3, tarinfo.name)) except Exception: traceback.print_exc() - logger.error("Retrieving %s" % (file_tuple[1])) - failures.append(file_tuple) + logger.error("Retrieving {}".format(files_row.name)) + failures.append(files_row) if multiprocess_worker: multiprocess_worker.print_contents() # Close current archive? - if i == nfiles - 1 or files[i][5] != files[i + 1][5]: - # Close current archive file - logger.debug("Closing tar archive %s" % (tfname)) - tar.close() - - if multiprocess_worker: - multiprocess_worker.done_enqueuing_output_for_tar(file_tuple[5]) - - # Open new archive next time - newtar = True - - # Delete this tar if the corresponding command-line arg was used. - if not keep_tars: - # FIXME: Argument 1 to "remove" has incompatible type "Optional[Any]"; expected "Union[str, bytes, _PathLike[str], _PathLike[bytes]]"mypy(error) - os.remove(tfname) # type: ignore + if i == nfiles - 1 or files[i].tar != files[i + 1].tar: + # We're either on the last file or the tar is distinct from the tar of the next file. + newtar = close_current_archive( + tfname, tar, multiprocess_worker, keep_tars, files_row + ) if multiprocess_worker: - # If there are stuff left to print, print them. + # If there are things left to print, print them. multiprocess_worker.print_all_contents() # Add the failures to the queue. @@ -469,5 +462,120 @@ def extractFiles( # noqa: C901 # that calls this extractFiles() function will return the failures as a list. for f in failures: multiprocess_worker.failure_queue.put(f) - else: - return failures + return failures + + +def should_extract_file(db_row: FilesRow) -> bool: + """ + If a file is on disk already with the correct + timestamp and size, don't extract the file. + """ + file_name: str + size_db: int + mod_time_db: datetime + file_name, size_db, mod_time_db = db_row.name, db_row.size, db_row.mtime + + if not os.path.exists(file_name): + # The file doesn't exist locally. + # We must get files that are not on disk. + return True + + size_disk: int = os.path.getsize(file_name) + mod_time_disk: datetime = datetime.utcfromtimestamp(os.path.getmtime(file_name)) + + # Only extract when the times and sizes are not the same (within tolerance) + # We have a TIME_TOL because mod_time_disk doesn't have the microseconds. + return not ( + (size_disk == size_db) + and (abs(mod_time_disk - mod_time_db).total_seconds() < TIME_TOL) + ) + + +def open_new_tar_archive( + cache: str, + files_row: FilesRow, + multiprocess_worker: Optional[parallel.ExtractWorker], +) -> Tuple[bool, str, tarfile.TarFile]: + newtar: bool = False + tfname: str = os.path.join(cache, files_row.tar) + # Everytime we're extracting a new tar, if running in parallel, + # let the process know. + # This is to synchronize the print statements. + if multiprocess_worker: + multiprocess_worker.set_curr_tar(files_row.tar) + + if not os.path.exists(tfname): + # Will need to retrieve from HPSS + if config.hpss is not None: + hpss: str = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + hpss_get(hpss, tfname, cache) + + logger.info("Opening tar archive %s" % (tfname)) + tar: tarfile.TarFile = tarfile.open(tfname, "r") + + return newtar, tfname, tar + + +# FIXME: error: Name 'tarfile.ExFileObject' is not defined +def extract_file( + tarinfo: tarfile.TarInfo, extract_this_file: bool, fin: tarfile.ExFileObject # type: ignore +) -> Tuple[Optional[_hashlib.HASH], str]: + return_hash: Optional[_hashlib.HASH] = None + try: + fname: str = tarinfo.name + path: str + name: str + path, name = os.path.split(fname) + if path != "" and extract_this_file: + if not os.path.isdir(path): + # The path doesn't exist, so create it. + os.makedirs(path) + if extract_this_file: + # If we're keeping the files, + # then have an output file + fout: _io.BufferedWriter = open(fname, "wb") + + hash_md5: _hashlib.HASH = hashlib.md5() + while True: + s: bytes = fin.read(BLOCK_SIZE) + if len(s) > 0: + hash_md5.update(s) + if extract_this_file: + fout.write(s) + if len(s) < BLOCK_SIZE: + break + return_hash = hash_md5 + finally: + fin.close() + if extract_this_file: + fout.close() + return return_hash, fname + + +def close_current_archive( + tfname: Optional[str], + tar: tarfile.TarFile, + multiprocess_worker: Optional[parallel.ExtractWorker], + keep_tars: Optional[bool], + files_row: FilesRow, +) -> bool: + # Close current archive file + logger.debug("Closing tar archive {}".format(tfname)) + tar.close() + + if multiprocess_worker: + multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar) + + # Open new archive next time + newtar = True + + # Delete this tar if the corresponding command-line arg was used. + if not keep_tars: + if tfname is not None: + os.remove(tfname) + else: + raise Exception("Invalid tfname={}".format(tfname)) + + return newtar diff --git a/zstash/hpss.py b/zstash/hpss.py index 394a1e29..e11d19fb 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -2,92 +2,115 @@ import os.path import subprocess +from typing import List from .settings import get_db_filename, logger from .utils import run_command -def hpss_transfer(hpss, file_path, transfer_type, cache, keep=None): +def hpss_transfer( + hpss: str, file_path: str, transfer_type: str, cache: str, keep: bool = False +): if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) if transfer_type == "put" and file_path != get_db_filename(cache): + # We are adding a file (that is not the cache) to the local non-HPSS archive logger.info( "{}: Keeping tar files locally and removing write permissions".format( transfer_type ) ) # https://unix.stackexchange.com/questions/46915/get-the-chmod-numerical-value-for-a-file - display_mode = "stat --format '%a' {}".format(file_path).split() - output = subprocess.check_output(display_mode).strip() - # FIXME: On Python 3 '{}'.format(b'abc') produces "b'abc'", not 'abc'; use '{!r}'.format(b'abc') if this is desired behavior mypy(error) - logger.info("{} original mode={}".format(file_path, output)) # type: ignore + display_mode_command: List[str] = "stat --format '%a' {}".format( + file_path + ).split() + display_mode_output: bytes = subprocess.check_output( + display_mode_command + ).strip() + logger.info( + "{!r} original mode={!r}".format(file_path, display_mode_output) + ) # https://www.washington.edu/doit/technology-tips-chmod-overview # Remove write-permission from user, group, and others, # without changing read or execute permissions for any. - change_mode = "chmod ugo-w {}".format(file_path).split() - subprocess.check_output(change_mode) - output = subprocess.check_output(display_mode).strip() - # FIXME: On Python 3 '{}'.format(b'abc') produces "b'abc'", not 'abc'; use '{!r}'.format(b'abc') if this is desired behavior mypy(error) - logger.info("{} new mode={}".format(file_path, output)) # type: ignore - return - if transfer_type == "put": - transfer_word = "to" - transfer_command = "put" - elif transfer_type == "get": - transfer_word = "from" - transfer_command = "get" + change_mode_command: List[str] = "chmod ugo-w {}".format(file_path).split() + # An error will be raised if this line fails. + subprocess.check_output(change_mode_command) + new_display_mode_output: bytes = subprocess.check_output( + display_mode_command + ).strip() + logger.info("{!r} new mode={!r}".format(file_path, new_display_mode_output)) + # else: no action needed else: - raise Exception("Invalid transfer_type={}".format(transfer_type)) - logger.info("Transferring file {} HPSS: {}".format(transfer_word, file_path)) - path, name = os.path.split(file_path) + transfer_word: str + transfer_command: str + if transfer_type == "put": + transfer_word = "to" + transfer_command = "put" + elif transfer_type == "get": + transfer_word = "from" + transfer_command = "get" + else: + raise Exception("Invalid transfer_type={}".format(transfer_type)) + logger.info("Transferring file {} HPSS: {}".format(transfer_word, file_path)) + path: str + name: str + path, name = os.path.split(file_path) - # Need to be in local directory for hsi put to work - cwd = os.getcwd() - if path != "": - if (transfer_type == "get") and (not os.path.isdir(path)): - os.makedirs(path) - os.chdir(path) + # Need to be in local directory for `hsi` to work + cwd = os.getcwd() + if path != "": + if (transfer_type == "get") and (not os.path.isdir(path)): + # We are getting a file from HPSS. + # The directory the file is in doesn't exist locally. + # So, make the path locally + os.makedirs(path) + # Enter the path (directory) + # For `put`, this directory contains the file we want to transfer to HPSS. + # For `get`, this directory is where the file we get from HPSS will go. + os.chdir(path) - # Transfer file using hsi put - command = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) - error_str = "Transferring file {} HPSS: {}".format(transfer_word, name) - run_command(command, error_str) + # Transfer file using `hsi` + command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) + error_str: str = "Transferring file {} HPSS: {}".format(transfer_word, name) + run_command(command, error_str) - # Back to original working directory - if path != "": - os.chdir(cwd) + # Return to original working directory + if path != "": + os.chdir(cwd) - if transfer_type == "put": - # Remove local file if requested - if not keep: - os.remove(file_path) + if transfer_type == "put": + if not keep: + # We should not keep the local file, so delete it now that it is on HPSS + os.remove(file_path) -def hpss_put(hpss, file_path, cache, keep=True): +def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True): """ Put a file to the HPSS archive. """ hpss_transfer(hpss, file_path, "put", cache, keep) -def hpss_get(hpss, file_path, cache): +def hpss_get(hpss: str, file_path: str, cache: str): """ Get a file from the HPSS archive. """ hpss_transfer(hpss, file_path, "get", cache, False) -def hpss_chgrp(hpss, group, recurse=False): +def hpss_chgrp(hpss: str, group: str, recurse: bool = False): """ Change the group of the HPSS archive. """ if hpss == "none": logger.info("chgrp: HPSS is unavailable") - return - if recurse: - recurse_str = "-R " else: - recurse_str = "" - command = "hsi chgrp {}{} {}".format(recurse_str, group, hpss) - error_str = "Changing group of HPSS archive {} to {}".format(hpss, group) - run_command(command, error_str) + recurse_str: str + if recurse: + recurse_str = "-R " + else: + recurse_str = "" + command: str = "hsi chgrp {}{} {}".format(recurse_str, group, hpss) + error_str: str = "Changing group of HPSS archive {} to {}".format(hpss, group) + run_command(command, error_str) diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index fe19a5fc..ce86c007 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -2,21 +2,36 @@ import hashlib import os.path +import sqlite3 import tarfile import traceback from datetime import datetime -from typing import Optional +from typing import List, Optional, Tuple + +import _hashlib +import _io from .hpss import hpss_put -from .settings import BLOCK_SIZE, config, logger +from .settings import BLOCK_SIZE, TupleFilesRowNoId, config, logger -def add_files(cur, con, itar, files, cache): +def add_files( + cur: sqlite3.Cursor, + con: sqlite3.Connection, + itar: int, + files: List[str], + cache: str, +) -> List[str]: # Now, perform the actual archiving - failures = [] - newtar = True - nfiles = len(files) + failures: List[str] = [] + newtar: bool = True + nfiles: int = len(files) + archived: List[TupleFilesRowNoId] + tarsize: int + tname: str + tfname: str + tar: tarfile.TarFile for i in range(nfiles): # New tar archive in the local cache @@ -25,37 +40,66 @@ def add_files(cur, con, itar, files, cache): archived = [] tarsize = 0 itar += 1 + # Create a hex value at least 6 digits long tname = "{0:0{1}x}".format(itar, 6) - tfname = "%s.tar" % (tname) - logger.info("Creating new tar archive %s" % (tfname)) + # Create the tar file name by adding ".tar" + tfname = "{}.tar".format(tname) + logger.info("Creating new tar archive {}".format(tfname)) + # Open that tar file in the cache tar = tarfile.open(os.path.join(cache, tfname), "w") # Add current file to tar archive - current_file = files[i] - logger.info("Archiving %s" % (current_file)) + current_file: str = files[i] + logger.info("Archiving {}".format(current_file)) try: + offset: int + size: int + mtime: datetime + md5: Optional[str] offset, size, mtime, md5 = add_file(tar, current_file) - archived.append((current_file, size, mtime, md5, tfname, offset)) + t: TupleFilesRowNoId = ( + current_file, + size, + mtime, + md5, + tfname, + offset, + ) + archived.append(t) + # Increase tarsize by the size of the current file tarsize += size except Exception: traceback.print_exc() - logger.error("Archiving %s" % (current_file)) + logger.error("Archiving {}".format(current_file)) failures.append(current_file) - # Close tar archive if current file is the last one or adding one more - # would push us over the limit. - next_file_size = tar.gettarinfo(current_file).size - # FIXME: Unsupported operand types for > ("int" and "None") mypy(error) - if i == nfiles - 1 or tarsize + next_file_size > config.maxsize: # type: ignore + # Close tar archive if current file is the last one or + # if adding one more would push us over the limit. + next_file_size: int = tar.gettarinfo(current_file).size + if config.maxsize is not None: + maxsize: int = config.maxsize + else: + raise Exception("Invalid config.maxsize={}".format(config.maxsize)) + if i == nfiles - 1 or tarsize + next_file_size > maxsize: # Close current temporary file - logger.debug("Closing tar archive %s" % (tfname)) + logger.debug("Closing tar archive {}".format(tfname)) tar.close() # Transfer tar archive to HPSS - hpss_put(config.hpss, os.path.join(cache, tfname), cache, config.keep) + if config.hpss is not None: + hpss: str = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + if config.keep is not None: + keep: bool = config.keep + else: + raise Exception("Invalid config.keep={}".format(config.keep)) + hpss_put(hpss, os.path.join(cache, tfname), cache, keep) # Update database with files that have been archived + # Add a row to the "files" table, + # the last 6 columns matching the values of `archived` cur.executemany(u"insert into files values (NULL,?,?,?,?,?,?)", archived) con.commit() @@ -67,34 +111,54 @@ def add_files(cur, con, itar, files, cache): # Add file to tar archive while computing its hash # Return file offset (in tar archive), size and md5 hash -def add_file(tar, file_name): - offset = tar.offset - tarinfo = tar.gettarinfo(file_name) +def add_file( + tar: tarfile.TarFile, file_name: str +) -> Tuple[int, int, datetime, Optional[str]]: + + # FIXME: error: "TarFile" has no attribute "offset" + offset: int = tar.offset # type: ignore + tarinfo: tarfile.TarInfo = tar.gettarinfo(file_name) # Change the size of any hardlinks from 0 to the size of the actual file if tarinfo.islnk(): tarinfo.size = os.path.getsize(file_name) + # Add the file to the tar tar.addfile(tarinfo) md5: Optional[str] = None # Only add files or hardlinks. - # So don't add directories or softlinks. + # (So don't add directories or softlinks.) if tarinfo.isfile() or tarinfo.islnk(): - f = open(file_name, "rb") - hash_md5 = hashlib.md5() + f: _io.TextIOWrapper = open(file_name, "rb") + hash_md5: _hashlib.HASH = hashlib.md5() + if tar.fileobj is not None: + fileobj: _io.BufferedWriter = tar.fileobj + else: + raise Exception("Invalid tar.fileobj={}".format(tar.fileobj)) while True: - s = f.read(BLOCK_SIZE) + s: str = f.read(BLOCK_SIZE) if len(s) > 0: - tar.fileobj.write(s) + # If the block read in is non-empty, write it to fileobj and update the hash + fileobj.write(s) hash_md5.update(s) if len(s) < BLOCK_SIZE: + # If the block read in is smaller than BLOCK_SIZE, + # then we have reached the end of the file. + # blocks = how many blocks of tarfile.BLOCKSIZE fit in tarinfo.size + # remainder = how much more content is required to reach tarinfo.size + blocks: int + remainder: int blocks, remainder = divmod(tarinfo.size, tarfile.BLOCKSIZE) if remainder > 0: - tar.fileobj.write(tarfile.NUL * (tarfile.BLOCKSIZE - remainder)) + null_bytes: bytes = tarfile.NUL + # Write null_bytes to get the last block to tarfile.BLOCKSIZE + fileobj.write(null_bytes * (tarfile.BLOCKSIZE - remainder)) blocks += 1 - tar.offset += blocks * tarfile.BLOCKSIZE + # Increase the offset by the amount already saved to the tar + # FIXME: error: "TarFile" has no attribute "offset" + tar.offset += blocks * tarfile.BLOCKSIZE # type: ignore break f.close() md5 = hash_md5.hexdigest() - size = tarinfo.size - mtime = datetime.utcfromtimestamp(tarinfo.mtime) + size: int = tarinfo.size + mtime: datetime = datetime.utcfromtimestamp(tarinfo.mtime) return offset, size, mtime, md5 diff --git a/zstash/ls.py b/zstash/ls.py index d1eb120f..55626d17 100644 --- a/zstash/ls.py +++ b/zstash/ls.py @@ -5,13 +5,18 @@ import os import sqlite3 import sys -from typing import List +from typing import List, Tuple from .hpss import hpss_get -from .settings import DEFAULT_CACHE, config, get_db_filename, logger - -con = None -cur = None +from .settings import ( + DEFAULT_CACHE, + FilesRow, + TupleFilesRow, + config, + get_db_filename, + logger, +) +from .utils import update_config def ls(): @@ -19,11 +24,34 @@ def ls(): List all of the files in the HPSS path. Supports the '-l' argument for more information. """ - parser = argparse.ArgumentParser( + + args: argparse.Namespace + cache: str + args, cache = setup_ls() + + matches: List[FilesRow] = ls_database(args, cache) + + # Print the results + match: FilesRow + for match in matches: + if args.long: + # Print all contents of each match + for col in match.to_tuple(): + print(col, end="\t") + print("") + else: + # Just print the file name + print(match.name) + + +def setup_ls() -> Tuple[argparse.Namespace, str]: + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="zstash ls [] [files]", description="List the files from an existing archive. If `files` is specified, then only the files specified will be listed. If `hpss=none`, then this will list the directories and files in the current directory excluding the cache.", ) - optional = parser.add_argument_group("optional named arguments") + optional: argparse._ArgumentGroup = parser.add_argument_group( + "optional named arguments" + ) optional.add_argument("--hpss", type=str, help="path to HPSS storage") optional.add_argument( "-l", @@ -42,9 +70,10 @@ def ls(): ) parser.add_argument("files", nargs="*", default=["*"]) - args = parser.parse_args(sys.argv[2:]) + args: argparse.Namespace = parser.parse_args(sys.argv[2:]) if args.hpss and args.hpss.lower() == "none": args.hpss = "none" + cache: str if args.cache: cache = args.cache else: @@ -52,35 +81,47 @@ def ls(): if args.verbose: logger.setLevel(logging.DEBUG) + return args, cache + + +def ls_database(args: argparse.Namespace, cache: str) -> List[FilesRow]: # Open database logger.debug("Opening index database") if not os.path.exists(get_db_filename(cache)): # Will need to retrieve from HPSS if args.hpss is not None: config.hpss = args.hpss - hpss_get(config.hpss, get_db_filename(cache), cache) + if config.hpss is not None: + hpss = config.hpss + else: + # TODO: move these typechecks further up to the beginning of the functions + raise Exception("Invalid config.hpss={}".format(config.hpss)) + # Retrieve from HPSS + hpss_get(hpss, get_db_filename(cache), cache) else: - error_str = ( + error_str: str = ( "--hpss argument is required when local copy of database is unavailable" ) logger.error(error_str) raise Exception(error_str) - global con, cur - con = sqlite3.connect(get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES) - cur = con.cursor() - - # Retrieve some configuration settings from database - for attr in dir(config): - value = getattr(config, attr) - if not callable(value) and not attr.startswith("__"): - cur.execute(u"select value from config where arg=?", (attr,)) - value = cur.fetchone()[0] - setattr(config, attr, value) - # FIXME: No overload variant of "int" matches argument type "None" mypy(error) - config.maxsize = int(config.maxsize) # type: ignore - # FIXME: No overload variant of "int" matches argument type "None" mypy(error) - config.keep = bool(int(config.keep)) # type: ignore + con: sqlite3.Connection = sqlite3.connect( + get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES + ) + cur: sqlite3.Cursor = con.cursor() + + update_config(cur) + + if config.maxsize is not None: + maxsize: int = config.maxsize + else: + raise Exception("Invalid config.maxsize={}".format(config.maxsize)) + config.maxsize = maxsize + if config.keep is not None: + keep: bool = config.keep + else: + raise Exception("Invalid config.keep={}".format(config.keep)) + config.keep = keep # The command line arg should always have precedence if args.hpss is not None: @@ -91,33 +132,29 @@ def ls(): logger.debug("HPSS path : %s" % (config.hpss)) # Find matching files - matches: List[str] = [] + matches_: List[TupleFilesRow] = [] for args_file in args.files: cur.execute( u"select * from files where name GLOB ? or tar GLOB ?", (args_file, args_file), ) - matches = matches + cur.fetchall() + matches_ = matches_ + cur.fetchall() # Remove duplicates - matches = list(set(matches)) + matches_ = list(set(matches_)) + matches: List[FilesRow] = list(map(FilesRow, matches_)) # Sort by tape and order within tapes (offset) - matches = sorted(matches, key=lambda x: (x[5], x[6])) + matches = sorted(matches, key=lambda t: (t.tar, t.offset)) if args.long: - # Get the names of the cols + # Get the names of the columns cur.execute(u"PRAGMA table_info(files);") cols = [str(col_info[1]) for col_info in cur.fetchall()] print("\t".join(cols)) - # Print the results - for match in matches: - if args.long: - # Print all contents of each match - for col in match: - print(col, end="\t") - print("") - else: - # Just print the file name - print(match[1]) + # Close database + con.commit() + con.close() + + return matches diff --git a/zstash/main.py b/zstash/main.py index ea715964..188b5c93 100755 --- a/zstash/main.py +++ b/zstash/main.py @@ -17,6 +17,7 @@ # ----------------------------------------------------------------------------- +# TODO: get the types of these parameters def handler(signal_received, frame): # Handle any cleanup here @@ -31,7 +32,7 @@ def main(): signal(SIGINT, handler) # Parser - parser = argparse.ArgumentParser( + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="""For {}, zstash [] Available zstash commands: @@ -52,7 +53,7 @@ def main(): parser.add_argument("command", help="command to run (create, update, extract, ...)") # parse_args defaults to [1:] for args, but you need to # exclude the rest of the args too, or validation will fail - args = parser.parse_args(sys.argv[1:2]) + args: argparse.Namespace = parser.parse_args(sys.argv[1:2]) if args.command == "version": print(__version__) diff --git a/zstash/parallel.py b/zstash/parallel.py index 6dc787b6..53beb53d 100644 --- a/zstash/parallel.py +++ b/zstash/parallel.py @@ -3,6 +3,9 @@ import collections import ctypes import multiprocessing +from typing import Dict, List, Optional + +from .settings import FilesRow class NotYourTurnError(Exception): @@ -21,31 +24,40 @@ class PrintMonitor(object): for that tar will print it's output. """ - def __init__(self, tars_to_print, *args, **kwargs): + def __init__(self, tars_to_print: List[str], *args, **kwargs): # A list of tars to print. # Ex: ['000000.tar', '000008.tar', '00001a.tar'] if not tars_to_print: - msg = "You must pass in a list of tars, which dictates" + msg: str = "You must pass in a list of tars, which dictates" msg += " the order of which to print the results." raise RuntimeError(msg) # The variables below are modified/accessed by different processes, # so they need to be in shared memory. - self._cv = multiprocessing.Condition() + self._cv: multiprocessing.synchronize.Condition = multiprocessing.Condition() - # FIXME: Need type annotation for '_tars_to_print' mypy(error) - self._tars_to_print = multiprocessing.Queue() # type: ignore + self._tars_to_print: multiprocessing.Queue[str] = multiprocessing.Queue() + tar: str for tar in tars_to_print: + # Add the tar to the queue to be printed. self._tars_to_print.put(tar) # We need a manager to instantiate the Value instead of multiprocessing.Value. # If we didn't use a manager, it seems to get some junk value. - self._manager = multiprocessing.Manager() - self._current_tar = self._manager.Value( + self._manager: multiprocessing.managers.SyncManager = multiprocessing.Manager() + self._current_tar: multiprocessing.managers.ValueProxy = self._manager.Value( ctypes.c_char_p, self._tars_to_print.get() ) - def wait_turn(self, worker, workers_curr_tar, indef_wait=True, *args, **kwargs): + def wait_turn( + # TODO: worker has type `ExtractWorker` + self, + worker, + workers_curr_tar: str, + indef_wait: bool = True, + *args, + **kwargs + ): """ While a worker's current tar isn't the one needed to be printed, wait. @@ -57,7 +69,7 @@ def wait_turn(self, worker, workers_curr_tar, indef_wait=True, *args, **kwargs): the worker's turn. """ with self._cv: - attempted = False + attempted: bool = False while self._current_tar.value != workers_curr_tar: if attempted and not indef_wait: # It's not this worker's turn. @@ -67,7 +79,14 @@ def wait_turn(self, worker, workers_curr_tar, indef_wait=True, *args, **kwargs): # Wait 0.001 to see if it's the worker's turn. self._cv.wait(0.001) - def done_dequeuing_output_for_tar(self, worker, workers_curr_tar, *args, **kwargs): + def done_dequeuing_output_for_tar( + # TODO: worker has type `ExtractWorker` + self, + worker, + workers_curr_tar: str, + *args, + **kwargs + ): """ A worker has finished printing the output for workers_curr_tar from the print queue. @@ -94,52 +113,42 @@ class ExtractWorker(object): This worker is called during `zstash extract`. """ - class PrintQueue(collections.deque): - """ - A queue with a write() function. - This is so that this can be replaced with sys.stdout in the extractFiles function. - This way, all calls to `print()` will be sent here. - """ - - def __init__(self): - # FIXME: NamedTuple type as an attribute is not supported mypy(error) - self.TarAndMsg = collections.namedtuple("TarAndMsg", ["tar", "msg"]) # type: ignore - self.curr_tar = None - - def write(self, msg): - if self.curr_tar: - self.append(self.TarAndMsg(self.curr_tar, msg)) - - def flush(self): - # Not needed, but it's called by some internal Python code. - # So we need to provide a function like this. - pass - - def __init__(self, print_monitor, tars_to_work_on, failure_queue, *args, **kwargs): + def __init__( + self, + print_monitor: PrintMonitor, + tars_to_work_on: List[str], + # TODO: failure_queue has type `multiprocessing.Queue[FilesRow]` + failure_queue, + *args, + **kwargs + ): """ print_monitor is used to determine if it's this worker's turn to print. tars_to_work_on is a list of the tars that this worker will process. Any failures are added to the failure_queue, to return any failed values. """ - self.print_monitor = print_monitor + self.print_monitor: PrintMonitor = print_monitor # Every call to print() in the original function will # be piped to this queue instead of the screen. - self.print_queue = self.PrintQueue() + self.print_queue: PrintQueue = PrintQueue() # A tar is mapped to True when all of its output is in the queue. - self.is_output_done_enqueuing = {tar: False for tar in tars_to_work_on} + self.is_output_done_enqueuing: Dict[str, bool] = { + tar: False for tar in tars_to_work_on + } # After extractFiles is done, all of the failures will be added to this queue. - self.failure_queue = failure_queue + self.failure_queue: multiprocessing.Queue[FilesRow] = failure_queue - def set_curr_tar(self, tar): + def set_curr_tar(self, tar: str): """ Sets the current tar this worker is working on. """ self.print_queue.curr_tar = tar - def done_enqueuing_output_for_tar(self, tar): + def done_enqueuing_output_for_tar(self, tar: str): """ All of the output for extracting this tar is in the print queue. """ + msg: str if tar not in self.is_output_done_enqueuing: msg = "This tar {} isn't assigned to this worker." raise RuntimeError(msg.format(tar)) @@ -161,7 +170,7 @@ def print_contents(self): # It's not our turn, so try again the next time this function is called. pass - def has_to_print(self): + def has_to_print(self) -> bool: """ Returns True if this Worker still has things to print. """ @@ -176,12 +185,12 @@ def print_all_contents(self, *args, **kwargs): """ while self.has_to_print(): # Try to print the first element in the queue. - tar_to_print = self.print_queue[0].tar + tar_to_print: str = self.print_queue[0].tar self.print_monitor.wait_turn(self, tar_to_print, *args, **kwargs) # Print all applicable values in the print_queue. - while self.print_queue and self.print_queue[0].tar == tar_to_print: - msg = self.print_queue.popleft().msg + while self.print_queue and (self.print_queue[0].tar == tar_to_print): + msg: str = self.print_queue.popleft().msg print(msg, end="", flush=True) # If True, then all of the output for extracting tar_to_print was in the queue. @@ -189,3 +198,29 @@ def print_all_contents(self, *args, **kwargs): if self.is_output_done_enqueuing[tar_to_print]: # Let all of the other workers know that this worker is done. self.print_monitor.done_dequeuing_output_for_tar(self, tar_to_print) + + +class PrintQueue(collections.deque): + """ + A queue with a write() function. + This is so that this can be replaced with sys.stdout in the extractFiles function. + This way, all calls to `print()` will be sent here. + """ + + def __init__(self): + self.curr_tar: Optional[str] = None + + def write(self, msg: str): + if self.curr_tar: + self.append(TarAndMsg(self.curr_tar, msg)) + + def flush(self): + # Not needed, but it's called by some internal Python code. + # So we need to provide a function like this. + pass + + +class TarAndMsg(object): + def __init__(self, tar: str, msg: str): + self.tar: str = tar + self.msg: str = msg diff --git a/zstash/settings.py b/zstash/settings.py index 05318166..76a5c8cf 100644 --- a/zstash/settings.py +++ b/zstash/settings.py @@ -1,35 +1,64 @@ from __future__ import absolute_import, print_function +import datetime import logging import os.path +from typing import Optional, Tuple # Class to hold configuration class Config(object): - path = None - hpss = None - maxsize = None - keep = None + path: Optional[str] = None + hpss: Optional[str] = None + maxsize: Optional[int] = None + keep: Optional[bool] = None -def get_db_filename(cache): +def get_db_filename(cache: str) -> str: # Database filename return os.path.join(cache, "index.db") # Block size -BLOCK_SIZE = 1024 * 1014 +BLOCK_SIZE: int = 1024 * 1014 # Default sub-directory to hold cache -DEFAULT_CACHE = "zstash" +DEFAULT_CACHE: str = "zstash" # Time tolerance (in seconds) for file modification time -TIME_TOL = 1.0 +TIME_TOL: float = 1.0 # Initialize config -config = Config() +config: Config = Config() # Initialize logger -# FIXME: "basicConfig" does not return a value mypy(error) -logger = logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) # type: ignore -logger = logging.getLogger(__name__) +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) +logger: logging.Logger = logging.getLogger(__name__) + +# Type aliases +TupleFilesRow = Tuple[int, str, int, datetime.datetime, Optional[str], str, int] +# No corresponding class needed for this tuple. +TupleFilesRowNoId = Tuple[str, int, datetime.datetime, Optional[str], str, int] + + +# Corresponding class to make accessing variables easier +class FilesRow(object): + def __init__(self, t: TupleFilesRow): + self.identifier: int = t[0] + self.name: str = t[1] + self.size: int = t[2] + self.mtime: datetime.datetime = t[3] + self.md5: Optional[str] = t[4] + self.tar: str = t[5] + self.offset: int = t[6] + + def to_tuple(self) -> TupleFilesRow: + return ( + self.identifier, + self.name, + self.size, + self.mtime, + self.md5, + self.tar, + self.offset, + ) diff --git a/zstash/update.py b/zstash/update.py index 4ec17041..e69ecc90 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -7,27 +7,59 @@ import stat import sys from datetime import datetime -from typing import List, Tuple +from typing import List, Optional, Tuple from .hpss import hpss_get, hpss_put from .hpss_utils import add_files -from .settings import DEFAULT_CACHE, TIME_TOL, config, get_db_filename, logger -from .utils import exclude_files +from .settings import ( + DEFAULT_CACHE, + TIME_TOL, + FilesRow, + TupleFilesRow, + config, + get_db_filename, + logger, +) +from .utils import get_files_to_archive, update_config -con = None -cur = None +def update(): -# FIXME: C901 'update' is too complex (26) -def update(): # noqa: C901 + args: argparse.Namespace + cache: str + args, cache = setup_update() + result: Optional[List[str]] = update_database(args, cache) + + if result is None: + # There was either nothing to update or `--dry-run` was set. + return + else: + failures = result + + # Transfer to HPSS. Always keep a local copy. + if config.hpss is not None: + hpss = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + hpss_put(hpss, get_db_filename(cache), cache, keep=True) + + # List failures + if len(failures) > 0: + logger.warning("Some files could not be archived") + for file_path in failures: + logger.error("Archiving {}".format(file_path)) + + +def setup_update() -> Tuple[argparse.Namespace, str]: # Parser - parser = argparse.ArgumentParser( + parser: argparse.ArgumentParser = argparse.ArgumentParser( usage="zstash update []", description="Update an existing zstash archive" ) - # FIXME: F841 local variable 'required' is assigned to but never used - required = parser.add_argument_group("required named arguments") # noqa: F841 - optional = parser.add_argument_group("optional named arguments") + parser.add_argument_group("required named arguments") + optional: argparse._ArgumentGroup = parser.add_argument_group( + "optional named arguments" + ) optional.add_argument( "--hpss", type=str, @@ -54,9 +86,10 @@ def update(): # noqa: C901 optional.add_argument( "-v", "--verbose", action="store_true", help="increase output verbosity" ) - args = parser.parse_args(sys.argv[2:]) + args: argparse.Namespace = parser.parse_args(sys.argv[2:]) if args.hpss and args.hpss.lower() == "none": args.hpss = "none" + cache: str if args.cache: cache = args.cache else: @@ -64,141 +97,130 @@ def update(): # noqa: C901 if args.verbose: logger.setLevel(logging.DEBUG) + return args, cache + + +def update_database(args: argparse.Namespace, cache: str) -> Optional[List[str]]: # Open database logger.debug("Opening index database") if not os.path.exists(get_db_filename(cache)): - # will need to retrieve from HPSS + # The database file doesn't exist in the cache. + # We need to retrieve it from HPSS if args.hpss is not None: config.hpss = args.hpss - hpss_get(config.hpss, get_db_filename(cache), cache) + if config.hpss is not None: + hpss: str = config.hpss + else: + raise Exception("Invalid config.hpss={}".format(config.hpss)) + hpss_get(hpss, get_db_filename(cache), cache) else: - error_str = ( + error_str: str = ( "--hpss argument is required when local copy of database is unavailable" ) logger.error(error_str) raise Exception(error_str) - global con, cur - con = sqlite3.connect(get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES) - cur = con.cursor() - - # Retrieve some configuration settings from database - for attr in dir(config): - value = getattr(config, attr) - if not callable(value) and not attr.startswith("__"): - cur.execute(u"select value from config where arg=?", (attr,)) - value = cur.fetchone()[0] - setattr(config, attr, value) - # FIXME: Incompatible types in assignment (expression has type "bool", variable has type "None") mypy(error) - config.maxsize = int(config.maxsize) # type: ignore - # FIXME: Incompatible types in assignment (expression has type "bool", variable has type "None") mypy(error) - config.keep = bool(int(config.keep)) # type: ignore + + con: sqlite3.Connection = sqlite3.connect( + get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES + ) + cur: sqlite3.Cursor = con.cursor() + + update_config(cur) + + if config.maxsize is not None: + maxsize = config.maxsize + else: + raise Exception("Invalid config.maxsize={}".format(config.maxsize)) + config.maxsize = int(maxsize) + if config.keep is not None: + keep = config.keep + else: + raise Exception("Invalid config.keep={}".format(config.keep)) + config.keep = bool(int(keep)) # The command line arg should always have precedence if args.hpss == "none": # If no HPSS is available, always keep the files. - # FIXME: Incompatible types in string interpolation (expression has type "None", placeholder has type "Union[int, float, SupportsInt]") mypy(error) - config.keep = True # type: ignore + config.keep = True else: + # If HPSS is used, let the user specify whether or not to keep the files. config.keep = args.keep if args.hpss is not None: config.hpss = args.hpss # Start doing actual work logger.debug("Running zstash update") - logger.debug("Local path : %s" % (config.path)) - logger.debug("HPSS path : %s" % (config.hpss)) - # FIXME: Incompatible types in string interpolation (expression has type "None", placeholder has type "Union[int, float, SupportsInt]") mypy(error) - logger.debug("Max size : %i" % (config.maxsize)) # type: ignore - logger.debug("Keep local tar files : %s" % (config.keep)) - - # List of files - logger.info("Gathering list of files to archive") - files: List[Tuple[str, str]] = [] - for root, dirnames, filenames in os.walk("."): - # Empty directory - if not dirnames and not filenames: - files.append((root, "")) - # Loop over files - for filename in filenames: - files.append((root, filename)) - - # Sort files by directories and filenames - files = sorted(files, key=lambda x: (x[0], x[1])) - - # Relative file path, eliminating top level zstash directory - # FIXME: Name 'files' already defined mypy(error) - files: List[str] = [ # type: ignore - os.path.normpath(os.path.join(x[0], x[1])) - for x in files - if x[0] != os.path.join(".", cache) - ] - - # Eliminate files based on exclude pattern - if args.exclude is not None: - files = exclude_files(args.exclude, files) + logger.debug("Local path : {}".format(config.path)) + logger.debug("HPSS path : {}".format(config.hpss)) + logger.debug("Max size : {}".format(maxsize)) + logger.debug("Keep local tar files : {}".format(keep)) + + files: List[str] = get_files_to_archive(cache, args.exclude) # Eliminate files that are already archived and up to date - newfiles = [] + newfiles: List[str] = [] for file_path in files: - # FIXME: Argument 1 to "lstat" has incompatible type "Tuple[str, str]"; expected "Union[str, bytes, _PathLike[str], _PathLike[bytes]]" mypy(error) - statinfo = os.lstat(file_path) # type: ignore - mdtime_new = datetime.utcfromtimestamp(statinfo.st_mtime) - mode = statinfo.st_mode + statinfo: os.stat_result = os.lstat(file_path) + mdtime_new: datetime = datetime.utcfromtimestamp(statinfo.st_mtime) + mode: int = statinfo.st_mode # For symbolic links or directories, size should be 0 + size_new: int if stat.S_ISLNK(mode) or stat.S_ISDIR(mode): size_new = 0 else: size_new = statinfo.st_size + # Select the file matching the path. cur.execute(u"select * from files where name = ?", (file_path,)) - new = True + new: bool = True while True: - match = cur.fetchone() - if match is None: + # Get the corresponding row in the 'files' table + match_: Optional[TupleFilesRow] = cur.fetchone() + if match_ is None: break - size = match[2] - mdtime = match[3] + else: + match: FilesRow = FilesRow(match_) - if (size_new == size) and ( - abs((mdtime_new - mdtime).total_seconds()) <= TIME_TOL + if (size_new == match.size) and ( + abs((mdtime_new - match.mtime).total_seconds()) <= TIME_TOL ): # File exists with same size and modification time within tolerance new = False break - # print(file,size_new,size,mdtime_new,mdtime) if new: newfiles.append(file_path) # Anything to do? if len(newfiles) == 0: logger.info("Nothing to update") - return + # Close database + con.commit() + con.close() + return None # --dry-run option if args.dry_run: print("List of files to be updated") for file_path in newfiles: print(file_path) - return + # Close database + con.commit() + con.close() + return None # Find last used tar archive - itar = -1 + itar: int = -1 cur.execute(u"select distinct tar from files") - tfiles = cur.fetchall() + tfiles: List[Tuple[str]] = cur.fetchall() for tfile in tfiles: - itar = max(itar, int(tfile[0][0:6], 16)) + tfile_string: str = tfile[0] + itar = max(itar, int(tfile_string[0:6], 16)) # Add files - failures = add_files(cur, con, itar, newfiles, cache) + failures: List[str] = add_files(cur, con, itar, newfiles, cache) - # Close database and transfer to HPSS. Always keep local copy + # Close database con.commit() con.close() - hpss_put(config.hpss, get_db_filename(cache), cache, keep=True) - # List failures - if len(failures) > 0: - logger.warning("Some files could not be archived") - for file_path in failures: - # FIXME: Not all arguments converted during string formatting mypy(error) - logger.error("Archiving %s" % (file_path)) # type: ignore + return failures diff --git a/zstash/utils.py b/zstash/utils.py index ccfbbee5..c63af48a 100644 --- a/zstash/utils.py +++ b/zstash/utils.py @@ -1,18 +1,21 @@ from __future__ import absolute_import, print_function +import os import shlex +import sqlite3 import subprocess from fnmatch import fnmatch +from typing import Any, List, Tuple -from .settings import logger +from .settings import config, logger -def exclude_files(exclude, files): +def exclude_files(exclude: str, files: List[str]) -> List[str]: # Construct lits of files to exclude, based on # https://codereview.stackexchange.com/questions/33624/ # filtering-a-long-list-of-files-through-a-set-of-ignore-patterns-using-iterators - exclude_patterns = exclude.split(",") + exclude_patterns: List[str] = exclude.split(",") # If exclude pattern ends with a trailing '/', the user intends to exclude # the entire subdirectory content, therefore replace '/' with '/*' @@ -21,24 +24,25 @@ def exclude_files(exclude, files): exclude_patterns[i] += "*" # Actual files to exclude - exclude_files = [] + exclude_files: List[str] = [] for file_name in files: if any(fnmatch(file_name, pattern) for pattern in exclude_patterns): exclude_files.append(file_name) - continue - # Now, remove them + # Now, remove those files new_files = [f for f in files if f not in exclude_files] return new_files -def run_command(command, error_str): - p1 = subprocess.Popen( +def run_command(command: str, error_str: str): + p1: subprocess.Popen = subprocess.Popen( shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + stdout: bytes + stderr: bytes (stdout, stderr) = p1.communicate() - status = p1.returncode + status: int = p1.returncode if status != 0: error_str = "Error={}, Command was `{}`".format(error_str, command) if "hsi" in command: @@ -46,6 +50,56 @@ def run_command(command, error_str): error_str ) logger.error(error_str) - logger.debug("stdout:\n%s", stdout) - logger.debug("stderr:\n%s", stderr) + logger.debug("stdout:\n{!r}".format(stdout)) + logger.debug("stderr:\n{!r}".format(stderr)) raise Exception(error_str) + + +def get_files_to_archive(cache: str, exclude: str) -> List[str]: + # List of files + logger.info("Gathering list of files to archive") + # Tuples of the form (path, filename) + file_tuples: List[Tuple[str, str]] = [] + # Walk the current directory + for root, dirnames, filenames in os.walk("."): + if not dirnames and not filenames: + # There are no subdirectories nor are there files. + # This directory is empty. + file_tuples.append((root, "")) + for filename in filenames: + # Loop over files + # filenames is a list, so if it is empty, no looping will occur. + file_tuples.append((root, filename)) + + # Sort first on directories (x[0]) + # Further sort on filenames (x[1]) + file_tuples = sorted(file_tuples, key=lambda x: (x[0], x[1])) + + # Relative file paths, excluding the cache + files: List[str] = [ + os.path.normpath(os.path.join(x[0], x[1])) + for x in file_tuples + if x[0] != os.path.join(".", cache) + ] + + # Eliminate files based on exclude pattern + if exclude is not None: + files = exclude_files(exclude, files) + + return files + + +def update_config(cur: sqlite3.Cursor): + # Retrieve some configuration settings from database + # Loop through all attributes of config. + for attr in dir(config): + value: Any = getattr(config, attr) + if not callable(value) and not attr.startswith("__"): + # config.{attr} is not a function. + # The attribute name does not start with "__" + # Get the value (column 2) for attribute `attr` (column 1) + # i.e., for the row where column 1 is the attribute, get the value from column 2 + cur.execute(u"select value from config where arg=?", (attr,)) + value = cur.fetchone()[0] + # Update config with the new attribute-value pair + setattr(config, attr, value)