diff --git a/src/pip/_internal/metadata/_json.py b/src/pip/_internal/metadata/_json.py new file mode 100644 index 00000000000..336b52f1efd --- /dev/null +++ b/src/pip/_internal/metadata/_json.py @@ -0,0 +1,84 @@ +# Extracted from https://github.com/pfmoore/pkg_metadata + +from email.header import Header, decode_header, make_header +from email.message import Message +from typing import Any, Dict, List, Union + +METADATA_FIELDS = [ + # Name, Multiple-Use + ("Metadata-Version", False), + ("Name", False), + ("Version", False), + ("Dynamic", True), + ("Platform", True), + ("Supported-Platform", True), + ("Summary", False), + ("Description", False), + ("Description-Content-Type", False), + ("Keywords", False), + ("Home-page", False), + ("Download-URL", False), + ("Author", False), + ("Author-email", False), + ("Maintainer", False), + ("Maintainer-email", False), + ("License", False), + ("Classifier", True), + ("Requires-Dist", True), + ("Requires-Python", False), + ("Requires-External", True), + ("Project-URL", True), + ("Provides-Extra", True), + ("Provides-Dist", True), + ("Obsoletes-Dist", True), +] + + +def json_name(field: str) -> str: + return field.lower().replace("-", "_") + + +def msg_to_json(msg: Message) -> Dict[str, Any]: + """Convert a Message object into a JSON-compatible dictionary.""" + + def sanitise_header(h: Union[Header, str]) -> str: + if isinstance(h, Header): + chunks = [] + for bytes, encoding in decode_header(h): + if encoding == "unknown-8bit": + try: + # See if UTF-8 works + bytes.decode("utf-8") + encoding = "utf-8" + except UnicodeDecodeError: + # If not, latin1 at least won't fail + encoding = "latin1" + chunks.append((bytes, encoding)) + return str(make_header(chunks)) + return str(h) + + result = {} + for field, multi in METADATA_FIELDS: + if field not in msg: + continue + key = json_name(field) + if multi: + value: Union[str, List[str]] = [ + sanitise_header(v) for v in msg.get_all(field) + ] + else: + value = sanitise_header(msg.get(field)) + if key == "keywords": + # Accept both comma-separated and space-separated + # forms, for better compatibility with old data. + if "," in value: + value = [v.strip() for v in value.split(",")] + else: + value = value.split() + result[key] = value + + payload = msg.get_payload() + if payload: + result["description"] = payload + + return result diff --git a/src/pip/_internal/metadata/base.py b/src/pip/_internal/metadata/base.py index d226dec8b3f..2823b67540e 100644 --- a/src/pip/_internal/metadata/base.py +++ b/src/pip/_internal/metadata/base.py @@ -9,8 +9,10 @@ from typing import ( IO, TYPE_CHECKING, + Any, Collection, Container, + Dict, Iterable, Iterator, List, @@ -38,6 +40,8 @@ from pip._internal.utils.packaging import safe_extra from pip._internal.utils.urls import url_to_path +from ._json import msg_to_json + if TYPE_CHECKING: from typing import Protocol else: @@ -379,6 +383,17 @@ def metadata(self) -> email.message.Message: """ return self._metadata_cached() + @property + def metadata_dict(self) -> Dict[str, Any]: + """PEP 566 compliant JSON-serializable representation of METADATA or PKG-INFO. + + This should return an empty dict if the metadata file is unavailable. + + :raises NoneMetadataError: If the metadata file is available, but does + not contain valid metadata. + """ + return msg_to_json(self.metadata) + @property def metadata_version(self) -> Optional[str]: """Value of "Metadata-Version:" in distribution metadata, if available.""" diff --git a/tests/unit/metadata/test_metadata.py b/tests/unit/metadata/test_metadata.py index 57e09acf76f..0e8f3ffc1cc 100644 --- a/tests/unit/metadata/test_metadata.py +++ b/tests/unit/metadata/test_metadata.py @@ -6,8 +6,14 @@ import pytest from pip._vendor.packaging.utils import NormalizedName -from pip._internal.metadata import BaseDistribution, get_directory_distribution +from pip._internal.metadata import ( + BaseDistribution, + get_directory_distribution, + get_wheel_distribution, +) +from pip._internal.metadata.base import FilesystemWheel from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, ArchiveInfo +from tests.lib.wheel import make_wheel @mock.patch.object(BaseDistribution, "read_text", side_effect=FileNotFoundError) @@ -82,3 +88,17 @@ class FakeDistribution(BaseDistribution): mock_read_text.assert_called_once_with(DIRECT_URL_METADATA_NAME) assert direct_url.url == "https://e.c/p.tgz" assert isinstance(direct_url.info, ArchiveInfo) + + +def test_metadata_dict(tmp_path: Path) -> None: + """Basic test of BaseDistribution metadata_dict. + + More tests are available in the original pkg_metadata project where this + function comes from, and which we may vendor in the future. + """ + wheel_path = make_wheel(name="pkga", version="1.0.1").save_to_dir(tmp_path) + wheel = FilesystemWheel(wheel_path) + dist = get_wheel_distribution(wheel, "pkga") + metadata_dict = dist.metadata_dict + assert metadata_dict["name"] == "pkga" + assert metadata_dict["version"] == "1.0.1"