Skip to content
Merged
18 changes: 18 additions & 0 deletions pymobiledevice3/cli/developer/core_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
touch_session,
)
from pymobiledevice3.remote.core_device.location_service import LocationService
from pymobiledevice3.remote.core_device.orientation_service import OrientationService
from pymobiledevice3.remote.core_device.screen_capture_service import ScreenCaptureService
from pymobiledevice3.remote.core_device.screen_stream import (
ScreenStreamServer,
Expand Down Expand Up @@ -287,6 +288,23 @@ async def core_device_get_lockstate(service_provider: RSDServiceProviderDep) ->
await core_device_get_lockstate_task(service_provider)


@cli.command("rotate")
@async_command
async def core_device_rotate(
service_provider: RSDServiceProviderDep,
direction: Annotated[
str,
typer.Argument(
click_type=click.Choice(["left", "right"]),
help="Rotate 90 degrees: 'left' = CCW, 'right' = CW.",
),
] = "left",
) -> None:
"""Rotate the device 90 degrees. Four consecutive 'left' calls cycle a full turn."""
async with OrientationService(service_provider) as svc:
print_json(await svc.rotate(direction))


@cli.command("user-interface-style")
@async_command
async def core_device_user_interface_style(
Expand Down
53 changes: 53 additions & 0 deletions pymobiledevice3/remote/core_device/orientation_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Programmatic device rotation via the ``com.apple.coredevice.devicecontrol``
RemoteXPC service.

Decoded from a Xcode-mirror sniff (``misc/remotexpc_sniffer.py``) of four
back-to-back "rotate left" clicks. Each request is an ``OrientationRequest``
envelope; the reply carries the resulting orientation state:

request -> {'featureIdentifier': 'com.apple.coredevice.feature.remote.devicecontrol.orientation',
'messageType': 'OrientationRequest',
'payload': {'rotate': {'_0': 'left'}}}
response -> {'currentDeviceOrientation': 'landscapeLeft',
'currentDeviceNonFlatOrientation': 'landscapeLeft',
'currentDeviceOrientationLocked': False}

Four consecutive ``rotate=left`` calls cycle the device through
``portrait -> landscapeLeft -> portraitUpsideDown -> landscapeRight -> portrait``
- so a single request is a 90 degree CCW step, ``right`` is the CW
counterpart. ``currentDeviceOrientationLocked`` reflects iOS's own
orientation-lock toggle; the service still rotates while locked.
"""

from pymobiledevice3.remote.remote_service import RemoteService
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService

ORIENTATION_FEATURE = "com.apple.coredevice.feature.remote.devicecontrol.orientation"

ROTATE_LEFT = "left"
ROTATE_RIGHT = "right"


class OrientationService(RemoteService):
"""Rotate the device 90 degrees at a time over ``com.apple.coredevice.devicecontrol``."""

SERVICE_NAME = "com.apple.coredevice.devicecontrol"

def __init__(self, rsd: RemoteServiceDiscoveryService):
super().__init__(rsd, self.SERVICE_NAME)

async def rotate(self, direction: str = ROTATE_LEFT) -> dict:
"""Rotate the device 90 degrees in ``direction`` (``'left'`` = CCW, ``'right'`` = CW).

Returns the device's resulting orientation, e.g. ``{'currentDeviceOrientation':
'landscapeLeft', 'currentDeviceNonFlatOrientation': 'landscapeLeft',
'currentDeviceOrientationLocked': False}``.
"""
if direction not in (ROTATE_LEFT, ROTATE_RIGHT):
raise ValueError(f"direction must be 'left' or 'right', got {direction!r}")
return await self.service.send_receive_request({
"featureIdentifier": ORIENTATION_FEATURE,
"messageType": "OrientationRequest",
"payload": {"rotate": {"_0": direction}},
})
158 changes: 158 additions & 0 deletions pymobiledevice3/remote/core_device/screen_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from cryptography.x509.oid import NameOID

import pymobiledevice3.resources
from pymobiledevice3.lockdown import create_using_usbmux as _create_lockdown_usbmux
from pymobiledevice3.remote.core_device.aac_eld import AAC_ELD_ASC_48K_STEREO_480, AACELDDecoder
from pymobiledevice3.remote.core_device.configuration_service import ConfigurationService
from pymobiledevice3.remote.core_device.display_service import DisplayService
Expand All @@ -46,7 +47,9 @@
IndigoHIDService,
UniversalHIDServiceService,
)
from pymobiledevice3.remote.core_device.orientation_service import OrientationService
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
from pymobiledevice3.services.accessibilityaudit import AccessibilityAudit

# Named iOS hardware buttons → (usage_page, usage_code, hold_seconds).
# Mirrors the table in cli/developer/core_device.py so the browser viewer
Expand Down Expand Up @@ -541,6 +544,16 @@ def __init__(
self._last_restart_t: float = 0.0
self._consecutive_restarts: int = 0

# Lazy lockdown handle + AccessibilityAudit cache for the
# accessibility sidebar -- opened on the first /accessibility
# request so a serve-web run without any accessibility use
# never has to touch usbmuxd. The audit's DTX reader tasks are
# closed during shutdown; otherwise they trigger a flurry of
# CancelledError tracebacks from the connection-cleanup path.
self._lockdown = None
self._accessibility: Optional[AccessibilityAudit] = None
self._accessibility_lock = asyncio.Lock()

# ----- per-session UDP receiver -----------------------------------------
async def _udp_recv_and_depacketize(self, sock: socket.socket) -> None:
sock.setblocking(False)
Expand Down Expand Up @@ -1185,6 +1198,59 @@ async def _ensure_fresh_stream(self, force: bool = False) -> None:
self._stream_dirty = False

# ----- HID (touch + buttons) -------------------------------------------
# ----- Accessibility settings (lockdown / DTX) --------------------------
async def _ensure_accessibility(self) -> AccessibilityAudit:
"""Lazy-open the lockdown + AccessibilityAudit handles. Cached
for the server's lifetime so we don't churn the DTX channel on
every panel interaction; closed in :meth:`serve`'s shutdown."""
if self._accessibility is None:
if self._lockdown is None:
self._lockdown = await _create_lockdown_usbmux(self._rsd.udid)
self._accessibility = AccessibilityAudit(self._lockdown)
return self._accessibility

async def _accessibility_list(self) -> list[dict]:
async with self._accessibility_lock:
audit = await self._ensure_accessibility()
settings = await audit.settings()
out: list[dict] = []
for s in settings:
val = s.value
# Whitelist serialisable scalars; everything else is dropped so
# bad responses don't break json.dumps for the whole list.
if isinstance(val, (bool, int, float, str)):
out.append({"key": s.key, "value": val})
return out

async def _accessibility_set(self, key: str, value) -> None:
async with self._accessibility_lock:
audit = await self._ensure_accessibility()
await audit.set_setting(key, value)

async def _accessibility_reset(self) -> None:
async with self._accessibility_lock:
audit = await self._ensure_accessibility()
await audit.reset_settings()

async def _stop_accessibility(self) -> None:
"""Tear down the cached AccessibilityAudit (closes its DTX reader
task) so shutdown doesn't drag stale channels into the cancel
path -- their reader-loop CancelledError would otherwise log a
full traceback at ERROR level for each open channel.

Acquires ``_accessibility_lock`` first so an in-flight panel
request can finish before we pull the underlying DTX channel
out from under it; closing mid-decode produces a 'coroutine
ignored GeneratorExit' warning on Python 3.14 when the
in-flight bplist read can't unwind cleanly."""
with contextlib.suppress(Exception):
async with self._accessibility_lock:
audit = self._accessibility
self._accessibility = None
if audit is not None:
with contextlib.suppress(Exception):
await audit.close()

async def _ensure_hid(self) -> None:
"""Lazily open the HID services + worker on first input event."""
async with self._hid_lock:
Expand Down Expand Up @@ -1218,6 +1284,12 @@ async def _stop_hid(self) -> None:
with contextlib.suppress(Exception):
await self._indigo.close()
self._indigo = None
# The keyboard surface is host-registered against the live
# media stream; after a stream restart that ID points at a
# stale dtuhidd session and every report posted to it is
# silently dropped. Forget it so _ensure_keyboard re-creates
# one against the new stream on the next /key.
self._kb_service_id = None

async def _hid_worker(self) -> None:
"""Single consumer that serially dispatches queued HID requests so
Expand Down Expand Up @@ -1500,6 +1572,46 @@ async def _restart_bg():
await writer.drain()
writer.close()
return
if path.startswith("/accessibility"):
try:
resp_body: bytes
if path == "/accessibility" and method == "GET":
settings = await self._accessibility_list()
resp_body = json.dumps({"settings": settings}).encode()
elif path == "/accessibility/set" and method == "POST":
body = await self._read_body(reader, headers)
payload = json.loads(body)
key = str(payload["key"])
value = payload["value"]
await self._accessibility_set(key, value)
resp_body = b'{"ok":true}'
elif path == "/accessibility/reset" and method == "POST":
await self._accessibility_reset()
resp_body = b'{"ok":true}'
else:
writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
await writer.drain()
writer.close()
return
writer.write(
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: application/json\r\n"
b"Cache-Control: no-store\r\n"
b"Content-Length: " + str(len(resp_body)).encode() + b"\r\n"
b"Connection: close\r\n\r\n" + resp_body
)
except Exception as exc:
logger.exception("accessibility endpoint failed")
err = f"accessibility error: {exc}".encode()
writer.write(
b"HTTP/1.1 500 Internal\r\n"
b"Content-Type: text/plain\r\n"
b"Content-Length: " + str(len(err)).encode() + b"\r\n"
b"Connection: close\r\n\r\n" + err
)
await writer.drain()
writer.close()
return
if path == "/style":
try:
if method == "POST":
Expand Down Expand Up @@ -1538,6 +1650,43 @@ async def _restart_bg():
await writer.drain()
writer.close()
return
if path == "/rotate" and method == "POST":
# 90 degree rotation step. JSON body: ``{"direction": "left"|"right"}``.
# The reply is the device's resulting orientation, which the viewer
# uses to apply a matching CSS transform to the canvas so the user
# sees the rotated content upright in the browser too.
body = await self._read_body(reader, headers)
try:
direction = str(json.loads(body)["direction"])
except (KeyError, TypeError, ValueError, json.JSONDecodeError) as exc:
writer.write(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
await writer.drain()
writer.close()
logger.debug("rotate POST: bad body %r (%s)", body, exc)
return
try:
async with OrientationService(self._rsd) as svc:
state = await svc.rotate(direction)
resp_body = json.dumps({k: v for k, v in state.items() if isinstance(v, (str, bool))}).encode()
writer.write(
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: application/json\r\n"
b"Cache-Control: no-store\r\n"
b"Content-Length: " + str(len(resp_body)).encode() + b"\r\n"
b"Connection: close\r\n\r\n" + resp_body
)
except Exception as exc:
logger.exception("rotate endpoint failed")
err = f"rotate error: {exc}".encode()
writer.write(
b"HTTP/1.1 500 Internal\r\n"
b"Content-Type: text/plain\r\n"
b"Content-Length: " + str(len(err)).encode() + b"\r\n"
b"Connection: close\r\n\r\n" + err
)
await writer.drain()
writer.close()
return
if path == "/pli":
# Lightweight recovery: ask the device for a fresh IDR via
# RTCP PLI and mark all subscribers as needing a key, but
Expand Down Expand Up @@ -1997,6 +2146,15 @@ async def _stop_audio():
await _bounded(_stop_video(), "_stop_active_stream")
logger.debug("shutdown: stopping audio stream")
await _bounded(_stop_audio(), "_stop_audio_stream")
# Close the accessibility audit BEFORE cancelling stragglers --
# otherwise its DTX reader task is one of the stragglers and
# its cancellation logs a 'Channel reader loop cancelled'
# ERROR-with-traceback. Closing first sets _closed=True on
# the channel so the reader exits silently. Any in-flight
# /accessibility request gets an exception out of its
# await audit.* call and falls through to its try/except.
logger.debug("shutdown: closing accessibility audit")
await _bounded(self._stop_accessibility(), "_stop_accessibility")
# Cancel any lingering connection-handler tasks that the
# HTTP server's wait_closed couldn't drain (e.g. a
# /stream.bin or /audio.bin handler blocked in queue.get()
Expand Down
Loading
Loading