|
| 1 | +"""Security regression tests for BaseFileComponent bundle extraction. |
| 2 | +
|
| 3 | +Covers GHSA-ccv6-r384-xp75: a TAR member that is a symlink, hardlink, or device |
| 4 | +node could be made to point at an arbitrary host file. When the extracted entry |
| 5 | +is later read by `process_files()` the host file's contents would be ingested |
| 6 | +into the downstream sink. The fix in `_safe_extract_tar` rejects every member |
| 7 | +that is not a regular file or directory, and `_unpack_and_collect_files` skips |
| 8 | +any symlinks defensively before handing entries to `process_files()`. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +import io |
| 14 | +import tarfile |
| 15 | +import zipfile |
| 16 | +from typing import TYPE_CHECKING |
| 17 | + |
| 18 | +import pytest |
| 19 | +from lfx.base.data.base_file import BaseFileComponent |
| 20 | + |
| 21 | +if TYPE_CHECKING: |
| 22 | + from pathlib import Path |
| 23 | + |
| 24 | + |
| 25 | +class _StubFileComponent(BaseFileComponent): |
| 26 | + """Minimal concrete subclass used to exercise the unpack helpers.""" |
| 27 | + |
| 28 | + VALID_EXTENSIONS = ["txt"] |
| 29 | + |
| 30 | + def __init__(self, **data): |
| 31 | + super().__init__(**data) |
| 32 | + self.set_attributes( |
| 33 | + { |
| 34 | + "path": [], |
| 35 | + "file_path": None, |
| 36 | + "separator": "\n\n", |
| 37 | + "silent_errors": False, |
| 38 | + "delete_server_file_after_processing": True, |
| 39 | + "ignore_unsupported_extensions": True, |
| 40 | + "ignore_unspecified_files": False, |
| 41 | + } |
| 42 | + ) |
| 43 | + |
| 44 | + def process_files(self, file_list): # pragma: no cover - not exercised here |
| 45 | + return file_list |
| 46 | + |
| 47 | + |
| 48 | +def _add_file(tar: tarfile.TarFile, name: str, payload: bytes) -> None: |
| 49 | + info = tarfile.TarInfo(name=name) |
| 50 | + info.size = len(payload) |
| 51 | + info.type = tarfile.REGTYPE |
| 52 | + tar.addfile(info, io.BytesIO(payload)) |
| 53 | + |
| 54 | + |
| 55 | +def _add_symlink(tar: tarfile.TarFile, name: str, target: str) -> None: |
| 56 | + info = tarfile.TarInfo(name=name) |
| 57 | + info.type = tarfile.SYMTYPE |
| 58 | + info.linkname = target |
| 59 | + tar.addfile(info) |
| 60 | + |
| 61 | + |
| 62 | +def _add_hardlink(tar: tarfile.TarFile, name: str, target: str) -> None: |
| 63 | + info = tarfile.TarInfo(name=name) |
| 64 | + info.type = tarfile.LNKTYPE |
| 65 | + info.linkname = target |
| 66 | + tar.addfile(info) |
| 67 | + |
| 68 | + |
| 69 | +def _add_fifo(tar: tarfile.TarFile, name: str) -> None: |
| 70 | + info = tarfile.TarInfo(name=name) |
| 71 | + info.type = tarfile.FIFOTYPE |
| 72 | + tar.addfile(info) |
| 73 | + |
| 74 | + |
| 75 | +def _build_tar(tmp_path: Path, name: str, populate) -> Path: |
| 76 | + bundle = tmp_path / name |
| 77 | + with tarfile.open(bundle, "w") as tar: |
| 78 | + populate(tar) |
| 79 | + return bundle |
| 80 | + |
| 81 | + |
| 82 | +def _build_zip(tmp_path: Path, name: str, files: dict[str, bytes]) -> Path: |
| 83 | + bundle = tmp_path / name |
| 84 | + with zipfile.ZipFile(bundle, "w") as zf: |
| 85 | + for member, payload in files.items(): |
| 86 | + zf.writestr(member, payload) |
| 87 | + return bundle |
| 88 | + |
| 89 | + |
| 90 | +@pytest.fixture |
| 91 | +def component() -> _StubFileComponent: |
| 92 | + return _StubFileComponent() |
| 93 | + |
| 94 | + |
| 95 | +def test_tar_with_absolute_symlink_is_rejected(tmp_path, component): |
| 96 | + """A symlink whose target is an absolute host path must be refused.""" |
| 97 | + secret = tmp_path / "secret.txt" |
| 98 | + secret.write_bytes(b"jwt-signing-secret") |
| 99 | + |
| 100 | + bundle = _build_tar( |
| 101 | + tmp_path, |
| 102 | + "evil.tar", |
| 103 | + lambda tar: _add_symlink(tar, "leak", str(secret)), |
| 104 | + ) |
| 105 | + |
| 106 | + extract_dir = tmp_path / "out_abs" |
| 107 | + extract_dir.mkdir() |
| 108 | + with pytest.raises(ValueError, match="Refusing to extract link member"): |
| 109 | + component._unpack_bundle(bundle, extract_dir) |
| 110 | + assert list(extract_dir.iterdir()) == [] |
| 111 | + |
| 112 | + |
| 113 | +def test_tar_with_relative_escape_symlink_is_rejected(tmp_path, component): |
| 114 | + """A symlink that uses ../ to escape the extract dir must be refused.""" |
| 115 | + bundle = _build_tar( |
| 116 | + tmp_path, |
| 117 | + "escape.tar", |
| 118 | + lambda tar: _add_symlink(tar, "leak", "../../etc/passwd"), |
| 119 | + ) |
| 120 | + |
| 121 | + extract_dir = tmp_path / "out_rel" |
| 122 | + extract_dir.mkdir() |
| 123 | + with pytest.raises(ValueError, match="Refusing to extract link member"): |
| 124 | + component._unpack_bundle(bundle, extract_dir) |
| 125 | + assert list(extract_dir.iterdir()) == [] |
| 126 | + |
| 127 | + |
| 128 | +def test_tar_with_hardlink_is_rejected(tmp_path, component): |
| 129 | + """Hardlinks have the same arbitrary-target risk as symlinks.""" |
| 130 | + |
| 131 | + def populate(tar): |
| 132 | + _add_file(tar, "real.txt", b"ok") |
| 133 | + _add_hardlink(tar, "leak", "../etc/passwd") |
| 134 | + |
| 135 | + bundle = _build_tar(tmp_path, "hardlink.tar", populate) |
| 136 | + |
| 137 | + extract_dir = tmp_path / "out_hl" |
| 138 | + extract_dir.mkdir() |
| 139 | + with pytest.raises(ValueError, match="Refusing to extract link member"): |
| 140 | + component._unpack_bundle(bundle, extract_dir) |
| 141 | + |
| 142 | + |
| 143 | +def test_tar_with_fifo_member_is_rejected(tmp_path, component): |
| 144 | + """Non-regular members (FIFO/device) must be refused.""" |
| 145 | + bundle = _build_tar(tmp_path, "fifo.tar", lambda tar: _add_fifo(tar, "pipe")) |
| 146 | + |
| 147 | + extract_dir = tmp_path / "out_fifo" |
| 148 | + extract_dir.mkdir() |
| 149 | + with pytest.raises(ValueError, match="Refusing to extract non-regular TAR member"): |
| 150 | + component._unpack_bundle(bundle, extract_dir) |
| 151 | + |
| 152 | + |
| 153 | +def test_tar_with_only_regular_files_extracts(tmp_path, component): |
| 154 | + """The fix must not regress benign archives.""" |
| 155 | + |
| 156 | + def populate(tar): |
| 157 | + _add_file(tar, "a.txt", b"alpha") |
| 158 | + _add_file(tar, "nested/b.txt", b"beta") |
| 159 | + |
| 160 | + bundle = _build_tar(tmp_path, "ok.tar", populate) |
| 161 | + |
| 162 | + extract_dir = tmp_path / "out_ok" |
| 163 | + extract_dir.mkdir() |
| 164 | + component._unpack_bundle(bundle, extract_dir) |
| 165 | + |
| 166 | + assert (extract_dir / "a.txt").read_bytes() == b"alpha" |
| 167 | + assert (extract_dir / "nested" / "b.txt").read_bytes() == b"beta" |
| 168 | + |
| 169 | + |
| 170 | +def test_zip_with_only_regular_files_extracts(tmp_path, component): |
| 171 | + """ZIP path must remain working unchanged.""" |
| 172 | + bundle = _build_zip(tmp_path, "ok.zip", {"a.txt": b"alpha", "nested/b.txt": b"beta"}) |
| 173 | + |
| 174 | + extract_dir = tmp_path / "out_zip" |
| 175 | + extract_dir.mkdir() |
| 176 | + component._unpack_bundle(bundle, extract_dir) |
| 177 | + |
| 178 | + assert (extract_dir / "a.txt").read_bytes() == b"alpha" |
| 179 | + assert (extract_dir / "nested" / "b.txt").read_bytes() == b"beta" |
| 180 | + |
| 181 | + |
| 182 | +def test_collect_files_skips_symlinks_in_extracted_dir(tmp_path, component): |
| 183 | + """Defense-in-depth check for the post-extraction iteration. |
| 184 | +
|
| 185 | + A symlink that somehow lands in an unpacked dir must not be passed to |
| 186 | + ``process_files()``. Simulated by manually planting one, since |
| 187 | + ``_safe_extract_tar`` would otherwise refuse it. |
| 188 | + """ |
| 189 | + extract_root = tmp_path / "extracted" |
| 190 | + extract_root.mkdir() |
| 191 | + real = extract_root / "doc.txt" |
| 192 | + real.write_bytes(b"hello") |
| 193 | + secret = tmp_path / "secret.txt" |
| 194 | + secret.write_bytes(b"top-secret") |
| 195 | + (extract_root / "leak").symlink_to(secret) |
| 196 | + |
| 197 | + files = [BaseFileComponent.BaseFile(data=None, path=extract_root)] |
| 198 | + collected = component._unpack_and_collect_files(files) |
| 199 | + |
| 200 | + paths = {bf.path.name for bf in collected} |
| 201 | + assert "doc.txt" in paths |
| 202 | + assert "leak" not in paths |
| 203 | + |
| 204 | + |
| 205 | +def test_unpack_bundle_rejects_unsupported_format(tmp_path, component): |
| 206 | + """An input that is neither zip nor tar still raises clearly.""" |
| 207 | + bogus = tmp_path / "not-a-bundle.bin" |
| 208 | + bogus.write_bytes(b"not an archive") |
| 209 | + |
| 210 | + extract_dir = tmp_path / "out_bogus" |
| 211 | + extract_dir.mkdir() |
| 212 | + with pytest.raises(ValueError, match="Unsupported bundle format"): |
| 213 | + component._unpack_bundle(bogus, extract_dir) |
| 214 | + |
| 215 | + |
| 216 | +def test_real_filesystem_symlink_in_a_tar_via_tarfile_is_rejected(tmp_path, component): |
| 217 | + """End-to-end repro of the advisory's PoC archive shape. |
| 218 | +
|
| 219 | + Builds the tar from a real filesystem symlink (the way the reporter's |
| 220 | + archive was produced) and confirms extraction is refused. |
| 221 | + """ |
| 222 | + target = tmp_path / "host_secret" |
| 223 | + target.write_bytes(b"x") |
| 224 | + workdir = tmp_path / "src" |
| 225 | + workdir.mkdir() |
| 226 | + (workdir / "leak").symlink_to(target) |
| 227 | + |
| 228 | + bundle = tmp_path / "from_fs.tar" |
| 229 | + with tarfile.open(bundle, "w") as tar: |
| 230 | + tar.add(workdir / "leak", arcname="leak") |
| 231 | + |
| 232 | + extract_dir = tmp_path / "out_fs" |
| 233 | + extract_dir.mkdir() |
| 234 | + with pytest.raises(ValueError, match="Refusing to extract link member"): |
| 235 | + component._unpack_bundle(bundle, extract_dir) |
| 236 | + assert list(extract_dir.iterdir()) == [] |
0 commit comments