diff --git a/src/icloudpd/xmp_sidecar.py b/src/icloudpd/xmp_sidecar.py index b6131d7a2..97853bef0 100644 --- a/src/icloudpd/xmp_sidecar.py +++ b/src/icloudpd/xmp_sidecar.py @@ -104,14 +104,11 @@ def build_metadata(asset_record: dict[str, Any]) -> XMPMetadata: is_not_bplist = not_(startswith("YnBsaXN0MD")) if is_not_crdt(data_value) and is_not_bplist(data_value): # not "crdt" and not "bplist00" - adjustments = json.loads( - zlib.decompress( - base64.b64decode(asset_record["fields"]["adjustmentSimpleDataEnc"]["value"]), - -zlib.MAX_WBITS, - ) + adjustments = try_decompress_json(data_value) + + orientation = ( + adjustments.get("metadata", {}).get("orientation") if adjustments else None ) - if "metadata" in adjustments and "orientation" in adjustments["metadata"]: - orientation = adjustments["metadata"]["orientation"] make, digital_source_type = None, None if ( @@ -190,6 +187,25 @@ def build_metadata(asset_record: dict[str, Any]) -> XMPMetadata: ) +def try_decompress_json(data_value: str) -> dict[str, Any] | None: + """ + Attempts to decode base64-encoded, zlib-compressed JSON data. + + Args: + data_value: Base64-encoded string + + Returns: + Decoded JSON dictionary or None on error + """ + try: + decoded = base64.b64decode(data_value) + decompressed = zlib.decompress(decoded, -zlib.MAX_WBITS) + data: dict[str, Any] = json.loads(decompressed) + return data + except (zlib.error, json.JSONDecodeError, Exception): + return None + + def generate_xml(metadata: XMPMetadata) -> ElementTree.Element: # Create the root element xml_doc = ElementTree.Element( diff --git a/tests/test_xmp_sidecar.py b/tests/test_xmp_sidecar.py index a1438ee09..4fd3f09dd 100644 --- a/tests/test_xmp_sidecar.py +++ b/tests/test_xmp_sidecar.py @@ -74,6 +74,27 @@ def test_build_metadata(self) -> None: metadata = build_metadata(assetRecordStub) assert metadata.Orientation is None + # - a zlib compressed JSON without "metadata" key (lines 101-113 coverage) + assetRecordStub["fields"]["adjustmentSimpleDataEnc"]["value"] = ( + "q1Yqzs9N9S/JSC3yTq1UslJQKkvMKU1VqgUA" + ) + metadata = build_metadata(assetRecordStub) + assert metadata.Orientation is None + + # - a zlib compressed JSON with "metadata" key but no "orientation" key (lines 101-113 coverage) + assetRecordStub["fields"]["adjustmentSimpleDataEnc"]["value"] = ( + "q1bKTS1JTEksSVSyUqhWKs7PTfUvyUgt8k6tBAoolSXmlKYq1dYCAA==" + ) + metadata = build_metadata(assetRecordStub) + assert metadata.Orientation is None + + # - invalid data that fails decompression (lines 101-113 coverage) + assetRecordStub["fields"]["adjustmentSimpleDataEnc"]["value"] = ( + "aW52YWxpZCBkYXRhIHRoYXQgd2lsbCBmYWlsIGRlY29tcHJlc3Npb24=" + ) + metadata = build_metadata(assetRecordStub) + assert metadata.Orientation is None + # Test Screenshot Tagging assetRecordStub["fields"]["assetSubtypeV2"]["value"] = 3 metadata = build_metadata(assetRecordStub)