|
4 | 4 | """This analyzer checks if the package has a similar structure to other packages maintained by the same user.""" |
5 | 5 |
|
6 | 6 | import hashlib |
| 7 | +import io |
7 | 8 | import logging |
8 | 9 | import tarfile |
9 | | -import typing |
10 | 10 |
|
11 | | -import requests |
12 | | -from bs4 import BeautifulSoup |
13 | | - |
14 | | -from macaron.errors import HeuristicAnalyzerValueError |
15 | 11 | from macaron.json_tools import JsonType |
16 | 12 | from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer |
17 | 13 | from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics |
18 | 14 | from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset |
| 15 | +from macaron.util import send_get_http, send_get_http_raw |
19 | 16 |
|
20 | 17 | logger: logging.Logger = logging.getLogger(__name__) |
21 | 18 |
|
@@ -50,184 +47,101 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes |
50 | 47 | """ |
51 | 48 | package_name = pypi_package_json.component_name |
52 | 49 | target_hash = self.get_structure_hash(package_name) |
53 | | - if target_hash is None: |
54 | | - return HeuristicResult.SKIP, { |
55 | | - "message": f"the package {package_name} does not have a sdist.", |
56 | | - } |
57 | | - |
58 | | - similar_packages = self.get_packages(package_name) |
59 | | - if not similar_packages: |
60 | | - return HeuristicResult.SKIP, { |
61 | | - "message": f"the maintainers of {package_name} do not maintain any other packages.", |
62 | | - } |
63 | | - |
64 | | - for package in similar_packages: |
65 | | - package_hash = self.get_structure_hash(package) |
66 | | - if package_hash is None: |
67 | | - logger.info("Package does not have a sdist.") |
68 | | - continue |
69 | | - if package_hash == target_hash: |
70 | | - return HeuristicResult.FAIL, { |
71 | | - "similar_package": package, |
72 | | - } |
| 50 | + if not target_hash: |
| 51 | + return HeuristicResult.SKIP, {} |
| 52 | + |
| 53 | + maintainers = pypi_package_json.pypi_registry.get_maintainers_of_package(package_name) |
| 54 | + if maintainers: |
| 55 | + for maintainer in maintainers: |
| 56 | + maintainer_packages = pypi_package_json.pypi_registry.get_packages_by_username(maintainer) |
| 57 | + if not maintainer_packages: |
| 58 | + continue |
| 59 | + for package in maintainer_packages: |
| 60 | + if package == package_name: |
| 61 | + continue |
| 62 | + |
| 63 | + hash_value = self.get_structure_hash(package) |
| 64 | + if target_hash == hash_value: |
| 65 | + return HeuristicResult.FAIL, { |
| 66 | + "message": f"The package {package_name} has a similar structure to {package}.", |
| 67 | + "similar_package": package, |
| 68 | + } |
| 69 | + |
73 | 70 | return HeuristicResult.PASS, {} |
74 | 71 |
|
75 | | - def get_maintainers(self, package_name: str) -> list[str]: |
76 | | - """Get all maintainers of a package. |
| 72 | + def get_url(self, package_name: str, package_type: str = "sdist") -> str | None: |
| 73 | + """Get the URL of the package's sdist. |
77 | 74 |
|
78 | 75 | Parameters |
79 | 76 | ---------- |
80 | | - package_name (str): The name of the package. |
| 77 | + package_name : str |
| 78 | + The name of the package. |
| 79 | + package_type: str |
| 80 | + The package type to retrieve the URL of. |
81 | 81 |
|
82 | 82 | Returns |
83 | 83 | ------- |
84 | | - list[str]: A list of maintainers. |
| 84 | + str | None: |
| 85 | + The URL of the package's sdist or None if not found. |
85 | 86 | """ |
86 | | - url = f"https://pypi.org/project/{package_name}/" |
87 | | - response = requests.get(url, timeout=10) |
88 | | - if response.status_code != 200: |
89 | | - return [] |
90 | | - |
91 | | - soup = BeautifulSoup(response.text, "html.parser") |
92 | | - gravatar_spans = soup.find_all("span", class_="sidebar-section__user-gravatar-text") |
93 | | - maintainers = [span.get_text().strip() for span in gravatar_spans] |
| 87 | + json_url = f"https://pypi.org/pypi/{package_name}/json" |
| 88 | + data = send_get_http(json_url, headers={}) |
| 89 | + if not data: |
| 90 | + logger.debug("Failed to fetch package data for %s.", package_name) |
| 91 | + return None |
94 | 92 |
|
95 | | - return maintainers |
| 93 | + sdist = next((url for url in data["urls"] if url["packagetype"] == package_type and url.get("url")), None) |
| 94 | + return sdist["url"] if sdist else None |
96 | 95 |
|
97 | | - def get_packages_by_user(self, username: str) -> list[str]: |
98 | | - """Get all packages by a user. |
| 96 | + def get_structure(self, package_name: str) -> list[str]: |
| 97 | + """Get the file structure of the package's sdist. |
99 | 98 |
|
100 | 99 | Parameters |
101 | 100 | ---------- |
102 | | - username (str): The username of the user. |
| 101 | + package_name : str |
| 102 | + The name of the package. |
103 | 103 |
|
104 | 104 | Returns |
105 | 105 | ------- |
106 | | - list[str]: A list of package names. |
| 106 | + list[str]: |
| 107 | + The list of files in the package's sdist. |
107 | 108 | """ |
108 | | - url = f"https://pypi.org/user/{username}/" |
109 | | - response = requests.get(url, timeout=10) |
110 | | - if response.status_code != 200: |
| 109 | + sdist_url = self.get_url(package_name) |
| 110 | + if not sdist_url: |
| 111 | + logger.debug("Package %s does not have a sdist.", package_name) |
111 | 112 | return [] |
112 | 113 |
|
113 | | - soup = BeautifulSoup(response.text, "html.parser") |
114 | | - headers = soup.find_all("h3", class_="package-snippet__title") |
115 | | - packages = [header.get_text().strip() for header in headers] |
116 | | - return packages |
| 114 | + response = send_get_http_raw(sdist_url) |
| 115 | + if not response: |
| 116 | + logger.debug("Failed to download sdist for package %s.", package_name) |
| 117 | + return [] |
117 | 118 |
|
118 | | - def get_packages(self, package_name: str) -> list[str]: |
119 | | - """Get packages that are maintained by this package's maintainers. |
| 119 | + buffer = io.BytesIO(response.content) |
| 120 | + with tarfile.open(fileobj=buffer, mode="r:gz") as tf: |
| 121 | + members = [ |
| 122 | + member.name for member in tf.getmembers() if member.name and not member.name.startswith("PAXHeaders/") |
| 123 | + ] |
120 | 124 |
|
121 | | - Parameters |
122 | | - ---------- |
123 | | - package_name (str): The name of the package. |
| 125 | + return members |
124 | 126 |
|
125 | | - Returns |
126 | | - ------- |
127 | | - list[str]: A list of similar projects. |
128 | | - """ |
129 | | - similar_projects = [] |
130 | | - maintainers = self.get_maintainers(package_name) |
131 | | - for user in maintainers: |
132 | | - user_packages = self.get_packages_by_user(user) |
133 | | - similar_projects.extend(user_packages) |
134 | | - # Remove the target package from the list of similar projects. |
135 | | - similar_projects_set = set(similar_projects) |
136 | | - similar_projects_set.discard(package_name) |
137 | | - return list(similar_projects_set) |
138 | | - |
139 | | - def fetch_sdist_url(self, package_name: str, version: str | None = None) -> str: |
140 | | - """Fetch the sdist URL for a package. |
| 127 | + def get_structure_hash(self, package_name: str) -> str: |
| 128 | + """Get the hash of the package's file structure. |
141 | 129 |
|
142 | 130 | Parameters |
143 | 131 | ---------- |
144 | | - package_name (str): The name of the package. |
145 | | - version (str): The version of the package. If None, the latest version will be used. |
| 132 | + package_name : str |
| 133 | + The name of the package. |
146 | 134 |
|
147 | 135 | Returns |
148 | 136 | ------- |
149 | | - str: The sdist URL, or an empty string if not found. |
| 137 | + str: |
| 138 | + The hash of the package's file structure. |
150 | 139 | """ |
151 | | - url = f"https://pypi.org/pypi/{package_name}/json" |
152 | | - try: |
153 | | - response = requests.get(url, timeout=10) |
154 | | - response.raise_for_status() |
155 | | - data = response.json() |
156 | | - except requests.exceptions.RequestException as err: |
157 | | - err_message = f"Failed to fetch PyPI JSON for {package_name}: {err}" |
158 | | - raise HeuristicAnalyzerValueError(err_message) from err |
159 | | - except ValueError as err: |
160 | | - err_message = f"Failed to decode PyPI JSON for {package_name}: {err}" |
161 | | - raise HeuristicAnalyzerValueError(err_message) from err |
162 | | - |
163 | | - actual_version: str |
164 | | - if version is None: |
165 | | - try: |
166 | | - actual_version = typing.cast(str, data["info"]["version"]) |
167 | | - except (KeyError, TypeError) as err: |
168 | | - err_message = f"Failed to get version for {package_name}: {err}" |
169 | | - raise HeuristicAnalyzerValueError(err_message) from err |
170 | | - else: |
171 | | - actual_version = version |
172 | | - |
173 | | - try: |
174 | | - for release_file in data.get("releases", {}).get(actual_version, []): |
175 | | - if isinstance(release_file, dict) and release_file.get("packagetype") == "sdist": |
176 | | - sdist_url = release_file.get("url") |
177 | | - if isinstance(sdist_url, str): |
178 | | - return sdist_url |
179 | | - except Exception as err: |
180 | | - err_message = f"Failed to parse releases for {package_name} version {actual_version}: {err}" |
181 | | - raise HeuristicAnalyzerValueError(err_message) from err |
182 | | - |
183 | | - return "" |
184 | | - |
185 | | - def get_structure_hash(self, package_name: str) -> str | None: |
186 | | - """Calculate a hash based on the project's file structure. |
| 140 | + structure = self.get_structure(package_name) |
| 141 | + if not structure: |
| 142 | + return "" |
187 | 143 |
|
188 | | - Parameters |
189 | | - ---------- |
190 | | - package_name (str): The name of the package. |
191 | | -
|
192 | | - Returns |
193 | | - ------- |
194 | | - str: The structure hash. |
195 | | -
|
196 | | - Raises |
197 | | - ------ |
198 | | - ValueError: If the sdist URL cannot be fetched or the package structure cannot be hashed. |
199 | | - """ |
200 | | - sdist_url = self.fetch_sdist_url(package_name) |
201 | | - if not sdist_url: |
202 | | - return None |
| 144 | + normalized = sorted([p.replace(package_name, "<ROOT>") for p in structure]) |
203 | 145 |
|
204 | | - try: |
205 | | - response = requests.get(sdist_url, stream=True, timeout=10) |
206 | | - response.raise_for_status() |
207 | | - raw_file_obj: typing.IO[bytes] = typing.cast(typing.IO[bytes], response.raw) |
208 | | - |
209 | | - with tarfile.open(fileobj=raw_file_obj, mode="r:gz") as file_archive: |
210 | | - paths = [] |
211 | | - for member in file_archive: |
212 | | - if not member.isdir(): |
213 | | - # remove top‑level dir. |
214 | | - parts = member.name.split("/", 1) |
215 | | - normalized = parts[1] if len(parts) > 1 else parts[0] |
216 | | - # replace the pkg name. |
217 | | - normalized = normalized.replace(package_name, "<PKG>") |
218 | | - paths.append(normalized) |
219 | | - paths.sort() |
220 | | - structure_hash_calculator = hashlib.sha256() |
221 | | - for path in paths: |
222 | | - structure_hash_calculator.update(path.encode("utf-8")) |
223 | | - structure_hash_calculator.update(b"\n") |
224 | | - return structure_hash_calculator.hexdigest() |
225 | | - except requests.exceptions.RequestException as err: |
226 | | - err_message = f"Failed to download sdist for {package_name} from {sdist_url}: {err}" |
227 | | - raise HeuristicAnalyzerValueError(err_message) from err |
228 | | - except tarfile.TarError as err: |
229 | | - err_message = f"Failed to process tarfile for {package_name} from {sdist_url}: {err}" |
230 | | - raise HeuristicAnalyzerValueError(err_message) from err |
231 | | - except Exception as err: |
232 | | - err_message = f"Failed to get structure hash for {package_name}: {err}" |
233 | | - raise HeuristicAnalyzerValueError(err_message) from err |
| 146 | + joined = "\n".join(normalized).encode("utf-8") |
| 147 | + return hashlib.sha256(joined).hexdigest() |
0 commit comments