diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index d6a7d1568..6179b8bfa 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -152,6 +152,17 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None: def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None: """Request two-factor authentication.""" devices = icloud.get_trusted_phone_numbers() + + # Trigger push notification to trusted devices + try: + if icloud.trigger_push_notification(): + logger.debug("Push notification triggered successfully") + else: + logger.debug("Failed to trigger push notification") + except Exception as e: + logger.debug(f"Exception while triggering push notification: {e}") + + devices_count = len(devices) device_index_alphabet = "abcdefghijklmnopqrstuvwxyz" if devices_count > 0: diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index d279a81d7..31a6ce135 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -694,6 +694,45 @@ def get_trusted_phone_numbers(self) -> Sequence[TrustedDevice]: return parse_trusted_phone_numbers_response(response) + def trigger_push_notification(self) -> bool: + """Triggers a push notification to trusted devices for 2FA code entry + + This should be called after get_trusted_phone_numbers() to enable + push notifications on trusted Apple devices (iPhone, iPad, Mac). + Returns True if the request was successful. + """ + from pyicloud_ipd.sms import build_trigger_push_notification_request + + oauth_session = self.get_oauth_session() + context = TrustedPhoneContextProvider(domain=self.domain, oauth_session=oauth_session) + + req = build_trigger_push_notification_request(context) + request = Request( + method=req.method, + url=req.url, + headers=req.headers, + data=req.data, + json=req.json, + ).prepare() + + if self.response_observer: + rules = list( + chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + ) + ) + else: + rules = [] + + with self.use_rules(rules): + response = self.send_request(request) + + # Successful response is 200 with no body, but treat any 200 response as success + return response.status_code == 200 + def send_2fa_code_sms(self, device_id: int) -> bool: """Requests that a verification code is sent to the given device""" diff --git a/src/pyicloud_ipd/sms.py b/src/pyicloud_ipd/sms.py index 0a78e2b4f..3ee12d04c 100644 --- a/src/pyicloud_ipd/sms.py +++ b/src/pyicloud_ipd/sms.py @@ -65,12 +65,17 @@ def parse_trusted_phone_numbers_payload(content: str) -> Sequence[TrustedDevice] parser = _SMSParser() parser.feed(content) parser.close() + twoSV = parser.sms_data.get("direct", {}).get("twoSV", {}) + # Apple moved trustedPhoneNumbers into bridgeInitiateData.phoneNumberVerification (2026+) numbers: Sequence[Mapping[str, Any]] = ( - parser.sms_data.get("direct", {}) - .get("twoSV", {}) - .get("phoneNumberVerification", {}) - .get("trustedPhoneNumbers", []) + twoSV.get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) ) + if not numbers: + numbers = ( + twoSV.get("bridgeInitiateData", {}) + .get("phoneNumberVerification", {}) + .get("trustedPhoneNumbers", []) + ) return list(item for item in map(_map_to_trusted_device, numbers) if item is not None) @@ -209,3 +214,39 @@ def build_verify_sms_code_request( json=json, ) return req + +def build_trigger_push_notification_request(context: _TrustedPhoneContextProvider) -> Request: + """Builds a request to trigger push notification to trusted devices for 2FA""" + import time + import uuid + import secrets + + url = _auth_url(context.domain).replace("/auth", "/auth/bridge/step/0") + + # Generate client-side bridge session values + # sessionUUID format: {uuid}-{timestamp_ms} + session_uuid = f"{str(uuid.uuid4())}-{int(time.time() * 1000)}" + # ptkn: 128-char hex string (64 bytes) + ptkn = secrets.token_hex(64) + + json_payload = { + "sessionUUID": session_uuid, + "ptkn": ptkn + } + + req = _InternalRequest( + method="POST", + url=url, + headers={ + **_oauth_const_headers(), + **_oauth_redirect_header(context.domain), + **_oauth_headers(context.oauth_session), + **{"Content-type": "application/json; charset=utf-8"}, + **{"Accept": "application/json, text/plain, */*"}, + **{"X-Apple-App-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d"}, + **{"X-Apple-Domain-Id": "3"}, + }, + json=json_payload, + ) + return req +