|
37 | 37 |
|
38 | 38 | import yaml |
39 | 39 |
|
| 40 | +from ..utils.archive import MAX_ZIP_ENTRIES, MAX_ZIP_UNCOMPRESSED, safe_extract_zip |
40 | 41 | from ..utils.path_security import ( |
41 | 42 | PathTraversalError, |
42 | 43 | ensure_path_within, |
43 | 44 | validate_path_segments, |
44 | 45 | ) |
45 | 46 |
|
46 | | -_MAX_ZIP_ENTRIES = 10_000 |
47 | | -_MAX_ZIP_UNCOMPRESSED = 512 * 1024 * 1024 # 512 MB |
| 47 | +_MAX_ZIP_ENTRIES = MAX_ZIP_ENTRIES |
| 48 | +_MAX_ZIP_UNCOMPRESSED = MAX_ZIP_UNCOMPRESSED |
48 | 49 |
|
49 | 50 |
|
50 | 51 | @dataclass(frozen=True) |
@@ -204,44 +205,27 @@ def _find_extracted_root(extract_dir: Path) -> Path | None: |
204 | 205 | def _extract_zip_bundle(path: Path) -> LocalBundleInfo | None: |
205 | 206 | """Extract a ``.zip`` bundle to a temp dir and return :class:`LocalBundleInfo`. |
206 | 207 |
|
207 | | - Applies the same security checks as the tar.gz branch: rejects absolute |
208 | | - paths, path-traversal segments, and Unix symlink entries detected via |
209 | | - ``external_attr``. Returns ``None`` on any validation failure or I/O |
210 | | - error so the caller can fall through to a generic error message. |
| 208 | + Applies the same security checks as the tar.gz branch and enforces the ZIP |
| 209 | + size quota while streaming each entry. Returns ``None`` only when the file |
| 210 | + is not a readable ZIP bundle or no ``plugin.json`` root is found; security |
| 211 | + violations raise ``ValueError`` with a targeted reason. |
211 | 212 | """ |
212 | 213 | temp_dir = Path(tempfile.mkdtemp(prefix="apm-local-bundle-")) |
213 | 214 | try: |
214 | 215 | with zipfile.ZipFile(path, "r") as zf: |
215 | | - members = zf.infolist() |
216 | | - # ZIP bomb guard: reject suspiciously large or deep archives |
217 | | - if len(members) > _MAX_ZIP_ENTRIES: |
218 | | - shutil.rmtree(temp_dir, ignore_errors=True) |
219 | | - return None |
220 | | - if sum(m.file_size for m in members) > _MAX_ZIP_UNCOMPRESSED: |
221 | | - shutil.rmtree(temp_dir, ignore_errors=True) |
222 | | - return None |
223 | | - for member in members: |
224 | | - name = member.filename |
225 | | - if ( |
226 | | - name.startswith("/") |
227 | | - or PureWindowsPath(name).drive |
228 | | - or PureWindowsPath(name).is_absolute() |
229 | | - ): |
230 | | - shutil.rmtree(temp_dir, ignore_errors=True) |
231 | | - return None |
232 | | - try: |
233 | | - validate_path_segments(name, context="zip member") |
234 | | - except PathTraversalError: |
235 | | - shutil.rmtree(temp_dir, ignore_errors=True) |
236 | | - return None |
237 | | - # Detect Unix symlinks stored in zip external_attr |
238 | | - if (member.external_attr >> 16) & 0o170000 == 0o120000: |
239 | | - shutil.rmtree(temp_dir, ignore_errors=True) |
240 | | - return None |
241 | | - zf.extractall(temp_dir) # noqa: S202 -- validated above |
| 216 | + safe_extract_zip( |
| 217 | + zf, |
| 218 | + temp_dir, |
| 219 | + max_entries=_MAX_ZIP_ENTRIES, |
| 220 | + max_uncompressed=_MAX_ZIP_UNCOMPRESSED, |
| 221 | + error_type=ValueError, |
| 222 | + ) |
242 | 223 | except (zipfile.BadZipFile, OSError): |
243 | 224 | shutil.rmtree(temp_dir, ignore_errors=True) |
244 | 225 | return None |
| 226 | + except ValueError: |
| 227 | + shutil.rmtree(temp_dir, ignore_errors=True) |
| 228 | + raise |
245 | 229 | bundle_root = _find_extracted_root(temp_dir) |
246 | 230 | if bundle_root is None: |
247 | 231 | shutil.rmtree(temp_dir, ignore_errors=True) |
|
0 commit comments