|
3 | 3 | import json
|
4 | 4 | import logging
|
5 | 5 | import platform as _platform_module
|
| 6 | +import re |
6 | 7 | import sys
|
7 | 8 | from collections import defaultdict
|
8 | 9 | from os.path import abspath, dirname, join
|
9 | 10 | from pathlib import Path
|
| 11 | +from typing import Any, Generator |
| 12 | + |
| 13 | +from auditwheel.elfutils import filter_undefined_symbols, is_subdir |
10 | 14 |
|
11 | 15 | from ..libc import Libc, get_libc
|
12 | 16 | from ..musllinux import find_musl_libc, get_musl_version
|
13 |
| -from .external_references import lddtree_external_references |
14 |
| -from .versioned_symbols import versioned_symbols_policy |
15 | 17 |
|
16 | 18 | _HERE = Path(__file__).parent
|
| 19 | +LIBPYTHON_RE = re.compile(r"^libpython\d+\.\d+m?.so(.\d)*$") |
17 | 20 |
|
18 | 21 | logger = logging.getLogger(__name__)
|
19 | 22 |
|
@@ -97,6 +100,112 @@ def get_priority_by_name(self, name: str) -> int | None:
|
97 | 100 | policy = self.get_policy_by_name(name)
|
98 | 101 | return None if policy is None else policy["priority"]
|
99 | 102 |
|
| 103 | + def versioned_symbols_policy(self, versioned_symbols: dict[str, set[str]]) -> int: |
| 104 | + def policy_is_satisfied( |
| 105 | + policy_name: str, policy_sym_vers: dict[str, set[str]] |
| 106 | + ) -> bool: |
| 107 | + policy_satisfied = True |
| 108 | + for name in set(required_vers) & set(policy_sym_vers): |
| 109 | + if not required_vers[name].issubset(policy_sym_vers[name]): |
| 110 | + for symbol in required_vers[name] - policy_sym_vers[name]: |
| 111 | + logger.debug( |
| 112 | + "Package requires %s, incompatible with " |
| 113 | + "policy %s which requires %s", |
| 114 | + symbol, |
| 115 | + policy_name, |
| 116 | + policy_sym_vers[name], |
| 117 | + ) |
| 118 | + policy_satisfied = False |
| 119 | + return policy_satisfied |
| 120 | + |
| 121 | + required_vers: dict[str, set[str]] = {} |
| 122 | + for symbols in versioned_symbols.values(): |
| 123 | + for symbol in symbols: |
| 124 | + sym_name, _, _ = symbol.partition("_") |
| 125 | + required_vers.setdefault(sym_name, set()).add(symbol) |
| 126 | + matching_policies: list[int] = [] |
| 127 | + for p in self.policies: |
| 128 | + policy_sym_vers = { |
| 129 | + sym_name: {sym_name + "_" + version for version in versions} |
| 130 | + for sym_name, versions in p["symbol_versions"].items() |
| 131 | + } |
| 132 | + if policy_is_satisfied(p["name"], policy_sym_vers): |
| 133 | + matching_policies.append(p["priority"]) |
| 134 | + |
| 135 | + if len(matching_policies) == 0: |
| 136 | + # the base policy (generic linux) should always match |
| 137 | + raise RuntimeError("Internal error") |
| 138 | + |
| 139 | + return max(matching_policies) |
| 140 | + |
| 141 | + def lddtree_external_references(self, lddtree: dict, wheel_path: str) -> dict: |
| 142 | + # XXX: Document the lddtree structure, or put it in something |
| 143 | + # more stable than a big nested dict |
| 144 | + def filter_libs( |
| 145 | + libs: set[str], whitelist: set[str] |
| 146 | + ) -> Generator[str, None, None]: |
| 147 | + for lib in libs: |
| 148 | + if "ld-linux" in lib or lib in ["ld64.so.2", "ld64.so.1"]: |
| 149 | + # always exclude ELF dynamic linker/loader |
| 150 | + # 'ld64.so.2' on s390x |
| 151 | + # 'ld64.so.1' on ppc64le |
| 152 | + # 'ld-linux*' on other platforms |
| 153 | + continue |
| 154 | + if LIBPYTHON_RE.match(lib): |
| 155 | + # always exclude libpythonXY |
| 156 | + continue |
| 157 | + if lib in whitelist: |
| 158 | + # exclude any libs in the whitelist |
| 159 | + continue |
| 160 | + yield lib |
| 161 | + |
| 162 | + def get_req_external(libs: set[str], whitelist: set[str]) -> set[str]: |
| 163 | + # get all the required external libraries |
| 164 | + libs = libs.copy() |
| 165 | + reqs = set() |
| 166 | + while libs: |
| 167 | + lib = libs.pop() |
| 168 | + reqs.add(lib) |
| 169 | + for dep in filter_libs(lddtree["libs"][lib]["needed"], whitelist): |
| 170 | + if dep not in reqs: |
| 171 | + libs.add(dep) |
| 172 | + return reqs |
| 173 | + |
| 174 | + ret: dict[str, dict[str, Any]] = {} |
| 175 | + for p in self.policies: |
| 176 | + needed_external_libs: set[str] = set() |
| 177 | + blacklist = {} |
| 178 | + |
| 179 | + if not (p["name"] == "linux" and p["priority"] == 0): |
| 180 | + # special-case the generic linux platform here, because it |
| 181 | + # doesn't have a whitelist. or, you could say its |
| 182 | + # whitelist is the complete set of all libraries. so nothing |
| 183 | + # is considered "external" that needs to be copied in. |
| 184 | + whitelist = set(p["lib_whitelist"]) |
| 185 | + blacklist_libs = set(p["blacklist"].keys()) & set(lddtree["needed"]) |
| 186 | + blacklist = {k: p["blacklist"][k] for k in blacklist_libs} |
| 187 | + blacklist = filter_undefined_symbols(lddtree["realpath"], blacklist) |
| 188 | + needed_external_libs = get_req_external( |
| 189 | + set(filter_libs(lddtree["needed"], whitelist)), whitelist |
| 190 | + ) |
| 191 | + |
| 192 | + pol_ext_deps = {} |
| 193 | + for lib in needed_external_libs: |
| 194 | + if is_subdir(lddtree["libs"][lib]["realpath"], wheel_path): |
| 195 | + # we didn't filter libs that resolved via RPATH out |
| 196 | + # earlier because we wanted to make sure to pick up |
| 197 | + # our elf's indirect dependencies. But now we want to |
| 198 | + # filter these ones out, since they're not "external". |
| 199 | + logger.debug("RPATH FTW: %s", lib) |
| 200 | + continue |
| 201 | + pol_ext_deps[lib] = lddtree["libs"][lib]["realpath"] |
| 202 | + ret[p["name"]] = { |
| 203 | + "libs": pol_ext_deps, |
| 204 | + "priority": p["priority"], |
| 205 | + "blacklist": blacklist, |
| 206 | + } |
| 207 | + return ret |
| 208 | + |
100 | 209 |
|
101 | 210 | def get_arch_name() -> str:
|
102 | 211 | machine = _platform_module.machine()
|
@@ -204,7 +313,5 @@ def _load_policy_schema():
|
204 | 313 |
|
205 | 314 |
|
206 | 315 | __all__ = [
|
207 |
| - "lddtree_external_references", |
208 |
| - "versioned_symbols_policy", |
209 | 316 | "WheelPolicies",
|
210 | 317 | ]
|
0 commit comments