diff --git a/src/auditwheel/main.py b/src/auditwheel/main.py index 758b8970..b703551b 100644 --- a/src/auditwheel/main.py +++ b/src/auditwheel/main.py @@ -10,6 +10,7 @@ import auditwheel from . import main_lddtree, main_repair, main_show +from .tools import EnvironmentDefault def main() -> int | None: @@ -33,6 +34,16 @@ def main() -> int | None: default=0, help="Give more output. Option is additive", ) + p.add_argument( + "-j", + "--max-jobs", + dest="max_jobs", + action=EnvironmentDefault, + env="AUDITWHEEL_MAX_JOBS", + type=int, + default=1, + help="Maximum number of jobs to run in parallel", + ) sub_parsers = p.add_subparsers(metavar="command", dest="cmd") main_show.configure_parser(sub_parsers) diff --git a/src/auditwheel/main_repair.py b/src/auditwheel/main_repair.py index 47c09157..3745e81d 100644 --- a/src/auditwheel/main_repair.py +++ b/src/auditwheel/main_repair.py @@ -8,6 +8,7 @@ from auditwheel.patcher import Patchelf from .policy import WheelPolicies +from .pool import FileTaskExecutor from .tools import EnvironmentDefault logger = logging.getLogger(__name__) @@ -50,7 +51,7 @@ def configure_parser(sub_parsers) -> None: # type: ignore[no-untyped-def] dest="ZIP_COMPRESSION_LEVEL", type=int, help="Compress level to be used to create zip file.", - choices=list(range(zlib.Z_NO_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1)), + choices=list(range(zlib.Z_DEFAULT_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1)), default=zlib.Z_DEFAULT_COMPRESSION, ) parser.add_argument( @@ -211,6 +212,7 @@ def execute(args: argparse.Namespace, parser: argparse.ArgumentParser) -> int: exclude=exclude, strip=args.STRIP, zip_compression_level=args.ZIP_COMPRESSION_LEVEL, + pool=FileTaskExecutor(args.max_jobs), ) if out_wheel is not None: diff --git a/src/auditwheel/patcher.py b/src/auditwheel/patcher.py index 60c3157f..84d5b590 100644 --- a/src/auditwheel/patcher.py +++ b/src/auditwheel/patcher.py @@ -47,6 +47,10 @@ def __init__(self) -> None: _verify_patchelf() def replace_needed(self, file_name: Path, *old_new_pairs: tuple[str, str]) -> None: + """ + Patching one elf do not need its dependencies to be ready, + so this function can be parallelized. + """ check_call( [ "patchelf", diff --git a/src/auditwheel/pool.py b/src/auditwheel/pool.py new file mode 100644 index 00000000..98fc2739 --- /dev/null +++ b/src/auditwheel/pool.py @@ -0,0 +1,123 @@ +""" +Concurrent support of auditwheel. +This can sppedup `auditwheel show` and `auditwheel repair` +where they have external shell invocation/io operations +that do no depends on each other and can be parallelized. + +If `j=1`, there'll be no concurrency at all and each call is synchronous, +which is same as not using this pool. +""" + +import contextlib +import functools +from concurrent.futures import Future, ThreadPoolExecutor +from pathlib import Path +from typing import Any, Callable, Optional + + +class FileTaskExecutor: + """A task executor that manages parallel jobs assiciated with a file. + + This executor ensures that only one task per file path runs at a time. + Multiple tasks submitted for the same file will be executed in order. + It executes tasks with `concurrent` threads when `concurrent` >= 1. + Specially when `concurrent` is 1, it will execute tasks sequentially. + When `concurrent` < 1, it will use the default setting of + ThreadPoolExecutor. + + Args: + concurrent (int): Number of concurrent threads to use. Defaults to 1. + Example: + >>> executor = FileTaskExecutor(concurrent=2) + >>> future = executor.submit(Path("file.txt"), process_file, "file.txt") + >>> executor.wait() # Wait for all tasks to complete + """ + + def __init__(self, concurrent: int = 0): + self.executor = ( + None + if concurrent == 1 + else ThreadPoolExecutor(concurrent if concurrent > 1 else None) + ) + self.working_map: dict[Path, Future[Any]] = {} + + def submit_chain( + self, path: Path, fn: Callable[..., Any], /, *args: Any, **kwargs: Any + ) -> Future[Any]: + """ + Submit a task to be executed (after any existing task) for the file. + """ + return self._submit(path, fn, True, *args, **kwargs) + + def submit( + self, path: Path, fn: Callable[..., Any], /, *args: Any, **kwargs: Any + ) -> Future[Any]: + """ + Submit a task to be executed when no task running for the file, + otherwise raise an error. + """ + return self._submit(path, fn, False, *args, **kwargs) + + def _submit( + self, + path: Path, + fn: Callable[..., Any], + chain: bool, + /, + *args: Any, + **kwargs: Any, + ) -> Future[Any]: + if not path.is_absolute(): + path = path.absolute() + + future: Future[Any] + if self.executor is None: + future = Future() + future.set_result(fn(*args, **kwargs)) + elif chain and path in self.working_map: + current = self.working_map[path] + future = self.working_map[path] = Future() + + @functools.wraps(fn) + def new_fn(_current: Future[Any]) -> None: + nonlocal future, current + + assert _current == current + + try: + future.set_result(fn(*args, **kwargs)) + except Exception as e: + future.set_exception(e) + + current.add_done_callback(new_fn) + else: + if not chain: + assert path not in self.working_map, ( + "task assiciated with path is already running" + ) + future = self.executor.submit(fn, *args, **kwargs) + self.working_map[path] = future + + return future + + def wait(self, path: Optional[Path] = None) -> None: + """Wait for tasks to complete. + + If a path is specified, waits only for that specific file's task to complete. + Otherwise, waits for all tasks to complete. + + Args: + path (Optional[Path]): The specific file path to wait for. If None, + waits for all tasks to complete. + """ + if self.executor is None or (path is not None and path not in self.working_map): + return + if path is not None: + with contextlib.suppress(Exception): + self.working_map.pop(path).result() + else: + for path in list(self.working_map): + self.wait(path) + + +DEFAULT_POOL = FileTaskExecutor(1) diff --git a/src/auditwheel/repair.py b/src/auditwheel/repair.py index 82cf6084..65aadbdd 100644 --- a/src/auditwheel/repair.py +++ b/src/auditwheel/repair.py @@ -1,5 +1,6 @@ from __future__ import annotations +import functools import itertools import logging import os @@ -7,7 +8,6 @@ import re import shutil import stat -from collections.abc import Iterable from fnmatch import fnmatch from os.path import isabs from pathlib import Path @@ -18,6 +18,7 @@ from .elfutils import elf_read_dt_needed, elf_read_rpaths, is_subdir from .hashfile import hashfile from .policy import WheelPolicies, get_replace_platforms +from .pool import DEFAULT_POOL, FileTaskExecutor from .wheel_abi import get_wheel_elfdata from .wheeltools import InWheelCtx, add_platforms @@ -43,8 +44,9 @@ def repair_wheel( exclude: frozenset[str], strip: bool, zip_compression_level: int, + pool: FileTaskExecutor = DEFAULT_POOL, ) -> Path | None: - elf_data = get_wheel_elfdata(wheel_policy, wheel_path, exclude) + elf_data = get_wheel_elfdata(wheel_policy, wheel_path, exclude, pool) external_refs_by_fn = elf_data.full_external_refs # Do not repair a pure wheel, i.e. has no external refs @@ -84,20 +86,33 @@ def repair_wheel( if not dest_dir.exists(): dest_dir.mkdir() - new_soname, new_path = copylib(src_path, dest_dir, patcher) - soname_map[soname] = (new_soname, new_path) + new_soname, new_path = get_new_soname(src_path, dest_dir) replacements.append((soname, new_soname)) + if soname not in soname_map: + pool.submit(new_path, copylib, src_path, dest_dir, patcher) + soname_map[soname] = (new_soname, new_path) + if replacements: - patcher.replace_needed(fn, *replacements) + # patching one elf do not need its dependencies to be ready + # so we can submit this task without waiting for dependencies + pool.submit(fn, patcher.replace_needed, fn, *replacements) if len(ext_libs) > 0: - new_fn = fn - if _path_is_script(fn): - new_fn = _replace_elf_script_with_shim(match.group("name"), fn) - new_rpath = os.path.relpath(dest_dir, new_fn.parent) - new_rpath = os.path.join("$ORIGIN", new_rpath) - append_rpath_within_wheel(new_fn, new_rpath, ctx.name, patcher) + def _patch_fn(fn: Path) -> None: + assert match is not None + new_fn = fn + if _path_is_script(fn): + new_fn = _replace_elf_script_with_shim(match.group("name"), fn) + + new_rpath = os.path.relpath(dest_dir, new_fn.parent) + new_rpath = os.path.join("$ORIGIN", new_rpath) + + append_rpath_within_wheel(new_fn, new_rpath, ctx.name, patcher) + + pool.submit_chain(fn, _patch_fn, fn) + + pool.wait() # we grafted in a bunch of libraries and modified their sonames, but # they may have internal dependencies (DT_NEEDED) on one another, so @@ -110,7 +125,7 @@ def repair_wheel( if n in soname_map: replacements.append((n, soname_map[n][0])) if replacements: - patcher.replace_needed(path, *replacements) + pool.submit(path, patcher.replace_needed, path, *replacements) if update_tags: ctx.out_wheel = add_platforms(ctx, abis, get_replace_platforms(abis[0])) @@ -118,18 +133,34 @@ def repair_wheel( if strip: libs_to_strip = [path for (_, path) in soname_map.values()] extensions = external_refs_by_fn.keys() - strip_symbols(itertools.chain(libs_to_strip, extensions)) + for lib in itertools.chain(libs_to_strip, extensions): + pool.submit(lib, strip_symbols, lib) + pool.wait() return ctx.out_wheel -def strip_symbols(libraries: Iterable[Path]) -> None: - for lib in libraries: - logger.info("Stripping symbols from %s", lib) - check_call(["strip", "-s", lib]) +def strip_symbols(lib: Path) -> None: + logger.info("Stripping symbols from %s", lib) + check_call(["strip", "-s", lib]) + + +@functools.lru_cache(maxsize=1) +def get_new_soname(src_path: Path, dest_dir: Path) -> tuple[str, Path]: + with open(src_path, "rb") as f: + shorthash = hashfile(f)[:8] + src_name = src_path.name + base, ext = src_name.split(".", 1) + if not base.endswith(f"-{shorthash}"): + new_soname = f"{base}-{shorthash}.{ext}" + else: + new_soname = src_name + + dest_path = dest_dir / new_soname + return new_soname, dest_path -def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> tuple[str, Path]: +def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> None: """Graft a shared library from the system into the wheel and update the relevant links. @@ -142,19 +173,10 @@ def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> tuple[str, P # if the library has a RUNPATH/RPATH we clear it and set RPATH to point to # its new location. - with open(src_path, "rb") as f: - shorthash = hashfile(f)[:8] + new_soname, dest_path = get_new_soname(src_path, dest_dir) - src_name = src_path.name - base, ext = src_name.split(".", 1) - if not base.endswith(f"-{shorthash}"): - new_soname = f"{base}-{shorthash}.{ext}" - else: - new_soname = src_name - - dest_path = dest_dir / new_soname if dest_path.exists(): - return new_soname, dest_path + return logger.debug("Grafting: %s -> %s", src_path, dest_path) rpaths = elf_read_rpaths(src_path) @@ -168,8 +190,6 @@ def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> tuple[str, P if any(itertools.chain(rpaths["rpaths"], rpaths["runpaths"])): patcher.set_rpath(dest_path, "$ORIGIN") - return new_soname, dest_path - def append_rpath_within_wheel( lib_name: Path, rpath: str, wheel_base_dir: Path, patcher: ElfPatcher diff --git a/src/auditwheel/tools.py b/src/auditwheel/tools.py index 70d30ab0..d5a764e9 100644 --- a/src/auditwheel/tools.py +++ b/src/auditwheel/tools.py @@ -182,37 +182,35 @@ def __init__( ) -> None: self.env_default = os.environ.get(env) self.env = env - if self.env_default: - if type: - try: - self.env_default = type(self.env_default) - except Exception: - self.option_strings = kwargs["option_strings"] - args = { - "value": self.env_default, - "type": type, - "env": self.env, - } - msg = ( - "invalid type: %(value)r from environment variable " - "%(env)r cannot be converted to %(type)r" - ) - raise argparse.ArgumentError(self, msg % args) from None + error_msg_value_meta = "%(value)r" + if self.env_default is not None: default = self.env_default - if ( - self.env_default is not None - and choices is not None - and self.env_default not in choices - ): + error_msg_value_meta = "%(value)r from environment variable %(env)r" + if type: + try: + default = type(default) + except Exception: + self.option_strings = kwargs["option_strings"] + args = { + "value": self.env_default, + "type": type, + "env": self.env, + } + msg = ( + "invalid type: " + + error_msg_value_meta + + " cannot be converted to %(type)r" + ) + raise argparse.ArgumentError(self, msg % args) from None + if default is not None and choices is not None and default not in choices: self.option_strings = kwargs["option_strings"] args = { - "value": self.env_default, + "value": default, "choices": ", ".join(map(repr, choices)), "env": self.env, } msg = ( - "invalid choice: %(value)r from environment variable " - "%(env)r (choose from %(choices)s)" + "invalid choice: " + error_msg_value_meta + " (choose from %(choices)s)" ) raise argparse.ArgumentError(self, msg % args) diff --git a/src/auditwheel/wheel_abi.py b/src/auditwheel/wheel_abi.py index 57539233..5e5f2305 100644 --- a/src/auditwheel/wheel_abi.py +++ b/src/auditwheel/wheel_abi.py @@ -10,6 +10,8 @@ from pathlib import Path from typing import Optional, TypeVar +from elftools.elf.elffile import ELFFile + from . import json from .architecture import Architecture from .elfutils import ( @@ -22,6 +24,7 @@ from .genericpkgctx import InGenericPkgCtx from .lddtree import DynamicExecutable, ldd from .policy import ExternalReference, Policy, WheelPolicies +from .pool import DEFAULT_POOL, FileTaskExecutor log = logging.getLogger(__name__) @@ -80,7 +83,10 @@ def message(self) -> str: @functools.lru_cache def get_wheel_elfdata( - wheel_policy: WheelPolicies, wheel_fn: Path, exclude: frozenset[str] + wheel_policy: WheelPolicies, + wheel_fn: Path, + exclude: frozenset[str], + pool: FileTaskExecutor = DEFAULT_POOL, ) -> WheelElfData: full_elftree = {} nonpy_elftree = {} @@ -94,19 +100,23 @@ def get_wheel_elfdata( shared_libraries_with_invalid_machine = [] platform_wheel = False - for fn, elf in elf_file_filter(ctx.iter_files()): - # Check for invalid binary wheel format: no shared library should - # be found in purelib - so_name = fn.name - # If this is in purelib, add it to the list of shared libraries in - # purelib - if any(p.name == "purelib" for p in fn.parents): - shared_libraries_in_purelib.append(so_name) + def _get_fn_data_inner(fn: Path) -> None: + """ + This function reads one elf file per call, + so can be safely parallelized. + """ + nonlocal \ + platform_wheel, \ + shared_libraries_in_purelib, \ + uses_ucs2_symbols, \ + uses_PyFPE_jbuf + + with open(fn, "rb") as f: + elf = ELFFile(f) + + so_name = fn.name - # If at least one shared library exists in purelib, this is going - # to fail and there's no need to do further checks - if not shared_libraries_in_purelib: log.debug("processing: %s", fn) elftree = ldd(fn, exclude=exclude) @@ -115,11 +125,11 @@ def get_wheel_elfdata( if arch != wheel_policy.architecture.baseline: shared_libraries_with_invalid_machine.append(so_name) log.warning("ignoring: %s with %s architecture", so_name, arch) - continue + return except ValueError: shared_libraries_with_invalid_machine.append(so_name) log.warning("ignoring: %s with unknown architecture", so_name) - continue + return platform_wheel = True @@ -148,6 +158,20 @@ def get_wheel_elfdata( # its internal references later. nonpy_elftree[fn] = elftree + # Create new ELFFile object to avoid use-after-free + for fn, _elf in elf_file_filter(ctx.iter_files()): + # Check for invalid binary wheel format: no shared library should + # be found in purelib + so_name = fn.name + + # If this is in purelib, add it to the list of shared libraries in + # purelib + if any(p.name == "purelib" for p in fn.parents): + shared_libraries_in_purelib.append(so_name) + + if not shared_libraries_in_purelib: + pool.submit(fn, _get_fn_data_inner, fn) + # If at least one shared library exists in purelib, raise an error if shared_libraries_in_purelib: libraries = "\n\t".join(shared_libraries_in_purelib) @@ -159,6 +183,8 @@ def get_wheel_elfdata( ) raise RuntimeError(msg) + pool.wait() + if not platform_wheel: raise NonPlatformWheel( wheel_policy.architecture, shared_libraries_with_invalid_machine diff --git a/tests/integration/test_bundled_wheels.py b/tests/integration/test_bundled_wheels.py index 4605ec54..0650e6ce 100644 --- a/tests/integration/test_bundled_wheels.py +++ b/tests/integration/test_bundled_wheels.py @@ -135,6 +135,7 @@ def test_wheel_source_date_epoch(tmp_path, monkeypatch): EXCLUDE=[], DISABLE_ISA_EXT_CHECK=False, ZIP_COMPRESSION_LEVEL=6, + max_jobs=1, cmd="repair", func=Mock(), prog="auditwheel", diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 2c7792b0..4bba8f27 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -33,4 +33,4 @@ def test_help(monkeypatch, capsys): # THEN assert retval is None captured = capsys.readouterr() - assert "usage: auditwheel [-h] [-V] [-v] command ..." in captured.out + assert "usage: auditwheel [-h] [-V] [-v] [-j MAX_JOBS] command ..." in captured.out diff --git a/tests/unit/test_tools.py b/tests/unit/test_tools.py index d5c022b9..953972c1 100644 --- a/tests/unit/test_tools.py +++ b/tests/unit/test_tools.py @@ -46,7 +46,7 @@ def test_plat_environment_action( _all_zip_level: list[int] = list( - range(zlib.Z_NO_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1) + range(zlib.Z_DEFAULT_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1) )