Skip to content

feat: use thread pool #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/auditwheel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import auditwheel

from . import main_lddtree, main_repair, main_show
from .tools import EnvironmentDefault


def main() -> int | None:
Expand All @@ -33,6 +34,16 @@ def main() -> int | None:
default=0,
help="Give more output. Option is additive",
)
p.add_argument(
"-j",
"--max-jobs",
dest="max_jobs",
action=EnvironmentDefault,
env="AUDITWHEEL_MAX_JOBS",
type=int,
default=1,
help="Maximum number of jobs to run in parallel",
)
sub_parsers = p.add_subparsers(metavar="command", dest="cmd")

main_show.configure_parser(sub_parsers)
Expand Down
4 changes: 3 additions & 1 deletion src/auditwheel/main_repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from auditwheel.patcher import Patchelf

from .policy import WheelPolicies
from .pool import FileTaskExecutor
from .tools import EnvironmentDefault

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,7 +51,7 @@ def configure_parser(sub_parsers) -> None: # type: ignore[no-untyped-def]
dest="ZIP_COMPRESSION_LEVEL",
type=int,
help="Compress level to be used to create zip file.",
choices=list(range(zlib.Z_NO_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1)),
choices=list(range(zlib.Z_DEFAULT_COMPRESSION, zlib.Z_BEST_COMPRESSION + 1)),
default=zlib.Z_DEFAULT_COMPRESSION,
)
parser.add_argument(
Expand Down Expand Up @@ -211,6 +212,7 @@ def execute(args: argparse.Namespace, parser: argparse.ArgumentParser) -> int:
exclude=exclude,
strip=args.STRIP,
zip_compression_level=args.ZIP_COMPRESSION_LEVEL,
pool=FileTaskExecutor(args.max_jobs),
)

if out_wheel is not None:
Expand Down
4 changes: 4 additions & 0 deletions src/auditwheel/patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def __init__(self) -> None:
_verify_patchelf()

def replace_needed(self, file_name: Path, *old_new_pairs: tuple[str, str]) -> None:
"""
Patching one elf do not need its dependencies to be ready,
so this function can be parallelized.
"""
check_call(
[
"patchelf",
Expand Down
123 changes: 123 additions & 0 deletions src/auditwheel/pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Concurrent support of auditwheel.
This can sppedup `auditwheel show` and `auditwheel repair`
where they have external shell invocation/io operations
that do no depends on each other and can be parallelized.

If `j=1`, there'll be no concurrency at all and each call is synchronous,
which is same as not using this pool.
"""

import contextlib
import functools
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from typing import Any, Callable, Optional


class FileTaskExecutor:
"""A task executor that manages parallel jobs assiciated with a file.

This executor ensures that only one task per file path runs at a time.
Multiple tasks submitted for the same file will be executed in order.
It executes tasks with `concurrent` threads when `concurrent` >= 1.
Specially when `concurrent` is 1, it will execute tasks sequentially.
When `concurrent` < 1, it will use the default setting of
ThreadPoolExecutor.

Args:
concurrent (int): Number of concurrent threads to use. Defaults to 1.
Example:
>>> executor = FileTaskExecutor(concurrent=2)
>>> future = executor.submit(Path("file.txt"), process_file, "file.txt")
>>> executor.wait() # Wait for all tasks to complete
"""

def __init__(self, concurrent: int = 0):
self.executor = (
None
if concurrent == 1
else ThreadPoolExecutor(concurrent if concurrent > 1 else None)
)
self.working_map: dict[Path, Future[Any]] = {}

def submit_chain(
self, path: Path, fn: Callable[..., Any], /, *args: Any, **kwargs: Any
) -> Future[Any]:
"""
Submit a task to be executed (after any existing task) for the file.
"""
return self._submit(path, fn, True, *args, **kwargs)

def submit(
self, path: Path, fn: Callable[..., Any], /, *args: Any, **kwargs: Any
) -> Future[Any]:
"""
Submit a task to be executed when no task running for the file,
otherwise raise an error.
"""
return self._submit(path, fn, False, *args, **kwargs)

def _submit(
self,
path: Path,
fn: Callable[..., Any],
chain: bool,
/,
*args: Any,
**kwargs: Any,
) -> Future[Any]:
if not path.is_absolute():
path = path.absolute()

future: Future[Any]
if self.executor is None:
future = Future()
future.set_result(fn(*args, **kwargs))
elif chain and path in self.working_map:
current = self.working_map[path]
future = self.working_map[path] = Future()

Check warning on line 79 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L78-L79

Added lines #L78 - L79 were not covered by tests

@functools.wraps(fn)
def new_fn(_current: Future[Any]) -> None:

Check warning on line 82 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L81-L82

Added lines #L81 - L82 were not covered by tests
nonlocal future, current

assert _current == current

Check warning on line 85 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L85

Added line #L85 was not covered by tests

try:
future.set_result(fn(*args, **kwargs))
except Exception as e:
future.set_exception(e)

Check warning on line 90 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L87-L90

Added lines #L87 - L90 were not covered by tests

current.add_done_callback(new_fn)

Check warning on line 92 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L92

Added line #L92 was not covered by tests
else:
if not chain:
assert path not in self.working_map, (

Check warning on line 95 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L95

Added line #L95 was not covered by tests
"task assiciated with path is already running"
)
future = self.executor.submit(fn, *args, **kwargs)
self.working_map[path] = future

Check warning on line 99 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L98-L99

Added lines #L98 - L99 were not covered by tests

return future

def wait(self, path: Optional[Path] = None) -> None:
"""Wait for tasks to complete.

If a path is specified, waits only for that specific file's task to complete.
Otherwise, waits for all tasks to complete.

Args:
path (Optional[Path]): The specific file path to wait for. If None,
waits for all tasks to complete.
"""
if self.executor is None or (path is not None and path not in self.working_map):
return
if path is not None:
with contextlib.suppress(Exception):
self.working_map.pop(path).result()

Check warning on line 117 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L116-L117

Added lines #L116 - L117 were not covered by tests
else:
for path in list(self.working_map):
self.wait(path)

Check warning on line 120 in src/auditwheel/pool.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/pool.py#L120

Added line #L120 was not covered by tests


DEFAULT_POOL = FileTaskExecutor(1)
82 changes: 51 additions & 31 deletions src/auditwheel/repair.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from __future__ import annotations

import functools
import itertools
import logging
import os
import platform
import re
import shutil
import stat
from collections.abc import Iterable
from fnmatch import fnmatch
from os.path import isabs
from pathlib import Path
Expand All @@ -18,6 +18,7 @@
from .elfutils import elf_read_dt_needed, elf_read_rpaths, is_subdir
from .hashfile import hashfile
from .policy import WheelPolicies, get_replace_platforms
from .pool import DEFAULT_POOL, FileTaskExecutor
from .wheel_abi import get_wheel_elfdata
from .wheeltools import InWheelCtx, add_platforms

Expand All @@ -43,8 +44,9 @@
exclude: frozenset[str],
strip: bool,
zip_compression_level: int,
pool: FileTaskExecutor = DEFAULT_POOL,
) -> Path | None:
elf_data = get_wheel_elfdata(wheel_policy, wheel_path, exclude)
elf_data = get_wheel_elfdata(wheel_policy, wheel_path, exclude, pool)
external_refs_by_fn = elf_data.full_external_refs

# Do not repair a pure wheel, i.e. has no external refs
Expand Down Expand Up @@ -84,20 +86,33 @@

if not dest_dir.exists():
dest_dir.mkdir()
new_soname, new_path = copylib(src_path, dest_dir, patcher)
soname_map[soname] = (new_soname, new_path)
new_soname, new_path = get_new_soname(src_path, dest_dir)
replacements.append((soname, new_soname))
if soname not in soname_map:
pool.submit(new_path, copylib, src_path, dest_dir, patcher)
soname_map[soname] = (new_soname, new_path)

if replacements:
patcher.replace_needed(fn, *replacements)
# patching one elf do not need its dependencies to be ready
# so we can submit this task without waiting for dependencies
pool.submit(fn, patcher.replace_needed, fn, *replacements)

if len(ext_libs) > 0:
new_fn = fn
if _path_is_script(fn):
new_fn = _replace_elf_script_with_shim(match.group("name"), fn)

new_rpath = os.path.relpath(dest_dir, new_fn.parent)
new_rpath = os.path.join("$ORIGIN", new_rpath)
append_rpath_within_wheel(new_fn, new_rpath, ctx.name, patcher)
def _patch_fn(fn: Path) -> None:
assert match is not None
new_fn = fn
if _path_is_script(fn):
new_fn = _replace_elf_script_with_shim(match.group("name"), fn)

new_rpath = os.path.relpath(dest_dir, new_fn.parent)
new_rpath = os.path.join("$ORIGIN", new_rpath)

append_rpath_within_wheel(new_fn, new_rpath, ctx.name, patcher)

pool.submit_chain(fn, _patch_fn, fn)

pool.wait()

# we grafted in a bunch of libraries and modified their sonames, but
# they may have internal dependencies (DT_NEEDED) on one another, so
Expand All @@ -110,26 +125,42 @@
if n in soname_map:
replacements.append((n, soname_map[n][0]))
if replacements:
patcher.replace_needed(path, *replacements)
pool.submit(path, patcher.replace_needed, path, *replacements)

if update_tags:
ctx.out_wheel = add_platforms(ctx, abis, get_replace_platforms(abis[0]))

if strip:
libs_to_strip = [path for (_, path) in soname_map.values()]
extensions = external_refs_by_fn.keys()
strip_symbols(itertools.chain(libs_to_strip, extensions))
for lib in itertools.chain(libs_to_strip, extensions):
pool.submit(lib, strip_symbols, lib)

pool.wait()
return ctx.out_wheel


def strip_symbols(libraries: Iterable[Path]) -> None:
for lib in libraries:
logger.info("Stripping symbols from %s", lib)
check_call(["strip", "-s", lib])
def strip_symbols(lib: Path) -> None:
logger.info("Stripping symbols from %s", lib)
check_call(["strip", "-s", lib])


@functools.lru_cache(maxsize=1)
def get_new_soname(src_path: Path, dest_dir: Path) -> tuple[str, Path]:
with open(src_path, "rb") as f:
shorthash = hashfile(f)[:8]
src_name = src_path.name
base, ext = src_name.split(".", 1)
if not base.endswith(f"-{shorthash}"):
new_soname = f"{base}-{shorthash}.{ext}"
else:
new_soname = src_name

Check warning on line 157 in src/auditwheel/repair.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/repair.py#L157

Added line #L157 was not covered by tests

dest_path = dest_dir / new_soname
return new_soname, dest_path


def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> tuple[str, Path]:
def copylib(src_path: Path, dest_dir: Path, patcher: ElfPatcher) -> None:
"""Graft a shared library from the system into the wheel and update the
relevant links.

Expand All @@ -142,19 +173,10 @@
# if the library has a RUNPATH/RPATH we clear it and set RPATH to point to
# its new location.

with open(src_path, "rb") as f:
shorthash = hashfile(f)[:8]
new_soname, dest_path = get_new_soname(src_path, dest_dir)

src_name = src_path.name
base, ext = src_name.split(".", 1)
if not base.endswith(f"-{shorthash}"):
new_soname = f"{base}-{shorthash}.{ext}"
else:
new_soname = src_name

dest_path = dest_dir / new_soname
if dest_path.exists():
return new_soname, dest_path
return

Check warning on line 179 in src/auditwheel/repair.py

View check run for this annotation

Codecov / codecov/patch

src/auditwheel/repair.py#L179

Added line #L179 was not covered by tests

logger.debug("Grafting: %s -> %s", src_path, dest_path)
rpaths = elf_read_rpaths(src_path)
Expand All @@ -168,8 +190,6 @@
if any(itertools.chain(rpaths["rpaths"], rpaths["runpaths"])):
patcher.set_rpath(dest_path, "$ORIGIN")

return new_soname, dest_path


def append_rpath_within_wheel(
lib_name: Path, rpath: str, wheel_base_dir: Path, patcher: ElfPatcher
Expand Down
Loading