From eb82fd661ab4a80eed0557fe2cc502118bcf1a19 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 23:01:16 +0000 Subject: [PATCH 1/2] Initial plan From bdf3c7ade699300745aa8d4c905fdb7e0f06bd28 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 23:13:36 +0000 Subject: [PATCH 2/2] Address code review comments: DB cleanup, break, sleep comment, setuptools version, manager comment, polling comment, race detection Co-authored-by: forsyth2 <30700190+forsyth2@users.noreply.github.com> --- conda/dev.yml | 2 +- zstash/extract.py | 400 +++++++++++++++++++++++---------------------- zstash/parallel.py | 14 +- 3 files changed, 219 insertions(+), 197 deletions(-) diff --git a/conda/dev.yml b/conda/dev.yml index 85f64d32..9ea37574 100644 --- a/conda/dev.yml +++ b/conda/dev.yml @@ -6,7 +6,7 @@ dependencies: # ================= - pip - python >=3.11,<3.15 - - setuptools + - setuptools >=65.0.0 - sqlite - six >=1.16.0 - globus-sdk >=3.15.0,<4.0 diff --git a/zstash/extract.py b/zstash/extract.py index c72432fa..3dfaa81d 100644 --- a/zstash/extract.py +++ b/zstash/extract.py @@ -356,6 +356,7 @@ def multiprocess_extract( for workers_idx in range(len(workers_to_tars)): if db_row.tar in workers_to_tars[workers_idx]: workers_to_matches[workers_idx].append(db_row) + break # Ensure each worker processes tars in order for worker_matches in workers_to_matches: @@ -395,6 +396,8 @@ def multiprocess_extract( failures.append(failure_queue.get_nowait()) except queue.Empty: pass + # Sleep briefly to avoid busy-waiting. This limits failure detection + # to ~100 checks/second (~10 ms worst-case latency for short jobs). time.sleep(0.01) # Drain any remaining failures after all processes have exited. @@ -520,14 +523,19 @@ def extractFiles( # noqa: C901 will be opened for this worker process. """ # Open database connection if not provided (parallel case) + con: Optional[sqlite3.Connection] = None + close_db: bool = False if cur is None: - con: sqlite3.Connection = sqlite3.connect( - get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES - ) - cur = con.cursor() - close_db: bool = True - else: - close_db = False + try: + con = sqlite3.connect( + get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES + ) + cur = con.cursor() + close_db = True + except Exception: + if con is not None: + con.close() + raise failures: List[FilesRow] = [] tfname: str @@ -546,202 +554,204 @@ def extractFiles( # noqa: C901 # Don't have the logger print to the console as the message come in. logger.propagate = False - for i in range(nfiles): - files_row: FilesRow = files[i] - - # Open new tar archive - if newtar: - newtar = False - tfname = os.path.join(cache, files_row.tar) - # Everytime we're extracting a new tar, if running in parallel, - # let the process know. - # This is to synchronize the print statements. - - if multiprocess_worker: - multiprocess_worker.set_curr_tar(files_row.tar) - - # Use args.hpss, falling back to config.hpss when not provided - if args.hpss is not None: - hpss: str = args.hpss - elif config.hpss is not None: - hpss = config.hpss - else: - raise TypeError("Invalid config.hpss={}".format(config.hpss)) - - tries: int = args.retries + 1 - # Set to True to test the `--retries` option with a forced failure. - # Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries` - test_retry: bool = False - - while tries > 0: - tries -= 1 - do_retrieve: bool - - if not os.path.exists(tfname): - do_retrieve = True + try: + for i in range(nfiles): + files_row: FilesRow = files[i] + + # Open new tar archive + if newtar: + newtar = False + tfname = os.path.join(cache, files_row.tar) + # Everytime we're extracting a new tar, if running in parallel, + # let the process know. + # This is to synchronize the print statements. + + if multiprocess_worker: + multiprocess_worker.set_curr_tar(files_row.tar) + + # Use args.hpss, falling back to config.hpss when not provided + if args.hpss is not None: + hpss: str = args.hpss + elif config.hpss is not None: + hpss = config.hpss else: - do_retrieve = not check_sizes_match( - cur, tfname, args.error_on_duplicate_tar - ) - - try: - if test_retry: - test_retry = False - raise RuntimeError - if do_retrieve: - hpss_get(hpss, tfname, cache) - if not check_sizes_match( - cur, tfname, args.error_on_duplicate_tar - ): - raise RuntimeError( - f"{tfname} size does not match expected size." - ) - # `hpss_get` successful or not needed: no more tries needed - break - except RuntimeError as e: - if tries > 0: - logger.info(f"Retrying HPSS get: {tries} tries remaining.") - # Run the try-except block again - continue - else: - raise e - - logger.info("Opening tar archive %s" % (tfname)) - tar: tarfile.TarFile = tarfile.open(tfname, "r") - - # Extract file - cmd: str = "Extracting" if keep_files else "Checking" - logger.info(cmd + " %s" % (files_row.name)) - # if multiprocess_worker: - # print('{} is {} {} from {}'.format(multiprocess_worker, cmd, file[1], file[5])) + raise TypeError("Invalid config.hpss={}".format(config.hpss)) - if keep_files and not should_extract_file(files_row): - # If we were going to extract, but aren't - # because a matching file is on disk - msg: str = "Not extracting {}, because it" - msg += " already exists on disk with the same" - msg += " size and modification date." - logger.info(msg.format(files_row.name)) + tries: int = args.retries + 1 + # Set to True to test the `--retries` option with a forced failure. + # Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries` + test_retry: bool = False - # True if we should actually extract the file from the tar - extract_this_file: bool = keep_files and should_extract_file(files_row) + while tries > 0: + tries -= 1 + do_retrieve: bool - try: - # Seek file position - if tar.fileobj is not None: - fileobj = tar.fileobj - else: - raise TypeError("Invalid tar.fileobj={}".format(tar.fileobj)) - fileobj.seek(files_row.offset) - - # Get next member - tarinfo: tarfile.TarInfo = tar.tarinfo.fromtarfile(tar) - - if tarinfo.isfile(): - # fileobj to extract - # error: Name 'tarfile.ExFileObject' is not defined - extracted_file: Optional[tarfile.ExFileObject] = tar.extractfile(tarinfo) # type: ignore - if extracted_file: - fin: tarfile.ExFileObject = extracted_file + if not os.path.exists(tfname): + do_retrieve = True + else: + do_retrieve = not check_sizes_match( + cur, tfname, args.error_on_duplicate_tar + ) + + try: + if test_retry: + test_retry = False + raise RuntimeError + if do_retrieve: + hpss_get(hpss, tfname, cache) + if not check_sizes_match( + cur, tfname, args.error_on_duplicate_tar + ): + raise RuntimeError( + f"{tfname} size does not match expected size." + ) + # `hpss_get` successful or not needed: no more tries needed + break + except RuntimeError as e: + if tries > 0: + logger.info(f"Retrying HPSS get: {tries} tries remaining.") + # Run the try-except block again + continue + else: + raise e + + logger.info("Opening tar archive %s" % (tfname)) + tar: tarfile.TarFile = tarfile.open(tfname, "r") + + # Extract file + cmd: str = "Extracting" if keep_files else "Checking" + logger.info(cmd + " %s" % (files_row.name)) + # if multiprocess_worker: + # print('{} is {} {} from {}'.format(multiprocess_worker, cmd, file[1], file[5])) + + if keep_files and not should_extract_file(files_row): + # If we were going to extract, but aren't + # because a matching file is on disk + msg: str = "Not extracting {}, because it" + msg += " already exists on disk with the same" + msg += " size and modification date." + logger.info(msg.format(files_row.name)) + + # True if we should actually extract the file from the tar + extract_this_file: bool = keep_files and should_extract_file(files_row) + + try: + # Seek file position + if tar.fileobj is not None: + fileobj = tar.fileobj else: - raise TypeError("Invalid extracted_file={}".format(extracted_file)) - try: - fname: str = tarinfo.name - path: str - name: str - path, name = os.path.split(fname) - if path != "" and extract_this_file: - if not os.path.isdir(path): - # The path doesn't exist, so create it. - os.makedirs(path) - if extract_this_file: - # If we're keeping the files, - # then have an output file - fout: _io.BufferedWriter = open(fname, "wb") - - hash_md5: _hashlib.HASH = hashlib.md5() - while True: - s: bytes = fin.read(BLOCK_SIZE) - if len(s) > 0: - hash_md5.update(s) - if extract_this_file: - fout.write(s) - if len(s) < BLOCK_SIZE: - break - finally: - fin.close() + raise TypeError("Invalid tar.fileobj={}".format(tar.fileobj)) + fileobj.seek(files_row.offset) + + # Get next member + tarinfo: tarfile.TarInfo = tar.tarinfo.fromtarfile(tar) + + if tarinfo.isfile(): + # fileobj to extract + # error: Name 'tarfile.ExFileObject' is not defined + extracted_file: Optional[tarfile.ExFileObject] = tar.extractfile(tarinfo) # type: ignore + if extracted_file: + fin: tarfile.ExFileObject = extracted_file + else: + raise TypeError("Invalid extracted_file={}".format(extracted_file)) + try: + fname: str = tarinfo.name + path: str + name: str + path, name = os.path.split(fname) + if path != "" and extract_this_file: + if not os.path.isdir(path): + # The path doesn't exist, so create it. + os.makedirs(path) + if extract_this_file: + # If we're keeping the files, + # then have an output file + fout: _io.BufferedWriter = open(fname, "wb") + + hash_md5: _hashlib.HASH = hashlib.md5() + while True: + s: bytes = fin.read(BLOCK_SIZE) + if len(s) > 0: + hash_md5.update(s) + if extract_this_file: + fout.write(s) + if len(s) < BLOCK_SIZE: + break + finally: + fin.close() + if extract_this_file: + fout.close() + + md5: str = hash_md5.hexdigest() if extract_this_file: - fout.close() - - md5: str = hash_md5.hexdigest() - if extract_this_file: - # numeric_owner is a required arg in Python 3. - # If True, "only the numbers for user/group names - # are used and not the names". - tar.chown(tarinfo, fname, numeric_owner=False) - tar.chmod(tarinfo, fname) - tar.utime(tarinfo, fname) - # Verify size - if os.path.getsize(fname) != files_row.size: - logger.error("size mismatch for: {}".format(fname)) - - # Verify md5 checksum - files_row_md5: Optional[str] = files_row.md5 - if md5 != files_row_md5: - logger.error("md5 mismatch for: {}".format(fname)) - logger.error("md5 of extracted file: {}".format(md5)) - logger.error("md5 of original file: {}".format(files_row_md5)) - failures.append(files_row) - else: - logger.debug("Valid md5: {} {}".format(md5, fname)) - - elif extract_this_file: - if sys.version_info >= (3, 12): - tar.extract(tarinfo, filter="tar") - else: - tar.extract(tarinfo) - # Note: tar.extract() will not restore time stamps of symbolic - # links. Could not find a Python-way to restore it either, so - # relying here on 'touch'. This is not the prettiest solution. - # Maybe a better one can be implemented later. - if tarinfo.issym(): - tmp1 = tarinfo.mtime - tmp2: datetime = datetime.fromtimestamp(tmp1) - tmp3: str = tmp2.strftime("%Y%m%d%H%M.%S") - os.system("touch -h -t %s %s" % (tmp3, tarinfo.name)) + # numeric_owner is a required arg in Python 3. + # If True, "only the numbers for user/group names + # are used and not the names". + tar.chown(tarinfo, fname, numeric_owner=False) + tar.chmod(tarinfo, fname) + tar.utime(tarinfo, fname) + # Verify size + if os.path.getsize(fname) != files_row.size: + logger.error("size mismatch for: {}".format(fname)) + + # Verify md5 checksum + files_row_md5: Optional[str] = files_row.md5 + if md5 != files_row_md5: + logger.error("md5 mismatch for: {}".format(fname)) + logger.error("md5 of extracted file: {}".format(md5)) + logger.error("md5 of original file: {}".format(files_row_md5)) + failures.append(files_row) + else: + logger.debug("Valid md5: {} {}".format(md5, fname)) - except Exception: - # Catch all exceptions here. - traceback.print_exc() - logger.error("Retrieving {}".format(files_row.name)) - failures.append(files_row) - - # Close current archive? - if i == nfiles - 1 or files[i].tar != files[i + 1].tar: - # We're either on the last file or the tar is distinct from the tar of the next file. - - # Close current archive file - logger.debug("Closing tar archive {}".format(tfname)) - tar.close() - - if multiprocess_worker: - multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar) - multiprocess_worker.print_all_contents() - - # Open new archive next time - newtar = True - - # Delete this tar if the corresponding command-line arg was used. - if not keep_tars: - if tfname is not None: - os.remove(tfname) - else: - raise TypeError("Invalid tfname={}".format(tfname)) + elif extract_this_file: + if sys.version_info >= (3, 12): + tar.extract(tarinfo, filter="tar") + else: + tar.extract(tarinfo) + # Note: tar.extract() will not restore time stamps of symbolic + # links. Could not find a Python-way to restore it either, so + # relying here on 'touch'. This is not the prettiest solution. + # Maybe a better one can be implemented later. + if tarinfo.issym(): + tmp1 = tarinfo.mtime + tmp2: datetime = datetime.fromtimestamp(tmp1) + tmp3: str = tmp2.strftime("%Y%m%d%H%M.%S") + os.system("touch -h -t %s %s" % (tmp3, tarinfo.name)) + + except Exception: + # Catch all exceptions here. + traceback.print_exc() + logger.error("Retrieving {}".format(files_row.name)) + failures.append(files_row) + + # Close current archive? + if i == nfiles - 1 or files[i].tar != files[i + 1].tar: + # We're either on the last file or the tar is distinct from the tar of the next file. + + # Close current archive file + logger.debug("Closing tar archive {}".format(tfname)) + tar.close() + + if multiprocess_worker: + multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar) + multiprocess_worker.print_all_contents() + + # Open new archive next time + newtar = True + + # Delete this tar if the corresponding command-line arg was used. + if not keep_tars: + if tfname is not None: + os.remove(tfname) + else: + raise TypeError("Invalid tfname={}".format(tfname)) - # Close database connection if we opened it - if close_db: - cur.close() - con.close() + finally: + # Close database connection if we opened it + if close_db and con is not None: + cur.close() + con.close() # Add the failures to the queue. # When running with multiprocessing, the function multiprocess_extract() diff --git a/zstash/parallel.py b/zstash/parallel.py index 4bcda598..741572ae 100644 --- a/zstash/parallel.py +++ b/zstash/parallel.py @@ -43,7 +43,10 @@ def __init__(self, tars_to_print: List[str], manager=None, *args, **kwargs): tar: i for i, tar in enumerate(tars_to_print) } - # Use a simple counter to track which tar we're on + # Use manager-backed primitives instead of bare multiprocessing primitives + # for Python 3.14 compatibility: Python 3.14 restricts sharing unpickled + # synchronization objects across processes (see bpo-38119 / gh-84582). + # A simple counter tracks which tar we're on. self._current_tar_index: multiprocessing.managers.ValueProxy = manager.Value( "i", 0 ) @@ -70,6 +73,7 @@ def wait_turn( raise NotYourTurnError() attempted = True + # Poll every 10 ms: lower CPU usage vs 1 ms at cost of ~10 ms latency. time.sleep(0.01) def done_enqueuing_output_for_tar( @@ -86,6 +90,14 @@ def done_enqueuing_output_for_tar( with self._lock: if self._current_tar_index.value == tar_index: self._current_tar_index.value += 1 + else: + raise RuntimeError( + "Attempted to advance tar index for tar {} (expected index" + " {}) but current index is {}. This indicates a programming" + " error.".format( + workers_curr_tar, tar_index, self._current_tar_index.value + ) + ) class ExtractWorker(object):