Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 84 additions & 119 deletions pymobiledevice3/cli/developer/dvt/sysmon/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,51 @@ class ProcessSelectionMode(str, Enum):
LAST = "last"


ProcessFilterExpressions = Annotated[
Optional[list[str]],
typer.Option(
"--filter",
"-f",
help="Filter processes by key=value. Can be specified multiple times.",
),
]

ProcessKeys = Annotated[
Optional[list[str]],
typer.Option(
"--key",
"-k",
help="Show only selected process keys for each emitted record. Can be specified multiple times.",
),
]

ProcessMonitorDuration = Annotated[
Optional[int],
typer.Option(
"--duration",
"-d",
help="Maximum duration in milliseconds to run monitoring (optional)",
),
]

HumanReadableProcessValues = Annotated[
bool,
typer.Option(
"--human",
help="Format known byte-count fields such as physFootprint using human-readable units.",
),
]

ProcessResultsOutputPath = Annotated[
Optional[Path],
typer.Option(
"--output",
"-o",
help="Output file path for JSONL format (optional, defaults to stdout)",
),
]


def _parse_process_filters(filter_expressions: Optional[list[str]]) -> dict[str, list[str]]:
parsed_filters: dict[str, list[str]] = {}
if filter_expressions:
Expand Down Expand Up @@ -160,12 +205,16 @@ def _duration_elapsed(start_time: float, duration_ms: Optional[int]) -> bool:
return ((asyncio.get_running_loop().time() - start_time) * 1000) >= duration_ms


async def iter_initialized_processes(sysmon: Sysmontap):
sample_index = 0
def _should_skip_first_snapshot(keys: Optional[list[str]]) -> bool:
# The first sample does not contain initialized cpuUsage values
return (keys is None) or ("cpuUsage" in keys)


async def iter_processes(sysmon: Sysmontap, skip_first_snapshot: bool = False):
should_skip_snapshot = skip_first_snapshot
async for process_snapshot in sysmon.iter_processes():
sample_index += 1
# The first sample does not contain initialized cpuUsage values.
if sample_index < 2:
if should_skip_snapshot:
should_skip_snapshot = False
continue
yield process_snapshot

Expand Down Expand Up @@ -240,7 +289,9 @@ async def _select_process_from_sysmon(
dvt, parsed_filters: dict[str, list[str]], keys: Optional[list[str]], selection_mode: ProcessSelectionMode
) -> dict:
async with await Sysmontap.create(dvt) as selection_sysmon:
async for process_snapshot in iter_initialized_processes(selection_sysmon):
async for process_snapshot in iter_processes(
selection_sysmon, skip_first_snapshot=_should_skip_first_snapshot(keys)
):
# All process entries in a sysmon sample share the same schema, so validating one entry is sufficient.
_validate_process_keys(process_snapshot[0], keys or [])
return _select_process_from_snapshot(process_snapshot, parsed_filters, selection_mode)
Expand All @@ -252,37 +303,10 @@ async def _select_process_from_sysmon(
@async_command
async def sysmon_process_single(
service_provider: ServiceProviderDep,
filter_expressions: Annotated[
Optional[list[str]],
typer.Option(
"--filter",
"-f",
help="filter processes by key=value. Can be specified multiple times.",
),
] = None,
keys: Annotated[
Optional[list[str]],
typer.Option(
"--key",
"-k",
help="show only selected process keys for each emitted record. Can be specified multiple times.",
),
] = None,
output: Annotated[
Optional[Path],
typer.Option(
"--output",
"-o",
help="output file path for JSON format (optional, defaults to stdout)",
),
] = None,
human: Annotated[
bool,
typer.Option(
"--human",
help="format known byte-count fields such as physFootprint using human-readable units.",
),
] = False,
filter_expressions: ProcessFilterExpressions = None,
keys: ProcessKeys = None,
human: HumanReadableProcessValues = False,
output: ProcessResultsOutputPath = None,
) -> None:
"""show a single snapshot of currently running processes."""
with contextlib.ExitStack() as stack:
Expand All @@ -304,7 +328,7 @@ async def sysmon_process_single_task(
DeviceInfo(dvt) as device_info,
await Sysmontap.create(dvt) as sysmon,
):
async for process_snapshot in iter_initialized_processes(sysmon):
async for process_snapshot in iter_processes(sysmon, skip_first_snapshot=_should_skip_first_snapshot(keys)):
if process_snapshot and parsed_filters:
# All process entries in a sysmon sample share the same schema, so validating one entry is sufficient.
_validate_process_keys(process_snapshot[0], list(parsed_filters))
Expand All @@ -331,38 +355,11 @@ async def sysmon_process_single_task(
@async_command
async def sysmon_process_monitor_threshold(
service_provider: ServiceProviderDep,
threshold: Annotated[float, typer.Argument(help="minimum cpuUsage value to emit")],
output: Annotated[
Optional[Path],
typer.Option(
"--output",
"-o",
help="output file path for JSONL format (optional, defaults to stdout)",
),
] = None,
duration: Annotated[
Optional[int],
typer.Option(
"--duration",
"-d",
help="maximum duration in milliseconds to run monitoring (optional)",
),
] = None,
keys: Annotated[
Optional[list[str]],
typer.Option(
"--key",
"-k",
help="show only selected process keys for each emitted record. Can be specified multiple times.",
),
] = None,
human: Annotated[
bool,
typer.Option(
"--human",
help="format known byte-count fields such as physFootprint using human-readable units.",
),
] = False,
threshold: Annotated[float, typer.Argument(help="Minimum cpuUsage value to emit")],
keys: ProcessKeys = None,
duration: ProcessMonitorDuration = None,
human: HumanReadableProcessValues = False,
output: ProcessResultsOutputPath = None,
) -> None:
"""Continuously monitor processes above a cpuUsage threshold."""

Expand All @@ -375,64 +372,30 @@ async def sysmon_process_monitor_threshold(
@async_command
async def sysmon_process_monitor_process(
service_provider: ServiceProviderDep,
filter_expressions: Annotated[
Optional[list[str]],
typer.Option(
"--filter",
"-f",
help="filter processes by key=value. Can be specified multiple times.",
),
] = None,
output: Annotated[
Optional[Path],
typer.Option(
"--output",
"-o",
help="Output file path for JSONL format (optional, defaults to stdout)",
),
] = None,
interval: Annotated[
int,
typer.Option(
"--interval",
"-i",
help="Minimum interval in milliseconds between outputs (optional)",
),
] = Sysmontap.DEFAULT_INTERVAL,
duration: Annotated[
Optional[int],
typer.Option(
"--duration",
"-d",
help="Maximum duration in milliseconds to run monitoring (optional)",
),
] = None,
filter_expressions: ProcessFilterExpressions = None,
keys: ProcessKeys = None,
choose: Annotated[
ProcessSelectionMode,
typer.Option(
"--choose",
help=(
'how to resolve multiple matching processes: "prompt" asks interactively; '
'How to resolve multiple matching processes: "prompt" asks interactively; '
'"first" selects the oldest matching process; "last" selects the newest matching process. '
"Automatic ordering is by startAbsTime, then pid, then name."
),
),
] = ProcessSelectionMode.PROMPT,
keys: Annotated[
Optional[list[str]],
typer.Option(
"--key",
"-k",
help="Show only selected process keys for each emitted record. Can be specified multiple times.",
),
] = None,
human: Annotated[
bool,
interval: Annotated[
int,
typer.Option(
"--human",
help="Format known byte-count fields such as physFootprint using human-readable units.",
"--interval",
"-i",
help="Minimum interval in milliseconds between outputs (optional)",
),
] = False,
] = Sysmontap.DEFAULT_INTERVAL_MS,
duration: ProcessMonitorDuration = None,
human: HumanReadableProcessValues = False,
output: ProcessResultsOutputPath = None,
) -> None:
"""Continuously monitor one process selected from the current snapshot by key=value filters."""

Expand All @@ -456,7 +419,7 @@ async def sysmon_process_monitor_threshold_task(
start_time = None

async with DvtProvider(service_provider) as dvt, await Sysmontap.create(dvt) as sysmon:
async for process_snapshot in iter_initialized_processes(sysmon):
async for process_snapshot in iter_processes(sysmon, skip_first_snapshot=True):
if start_time is None:
start_time = asyncio.get_running_loop().time()

Expand All @@ -481,7 +444,7 @@ async def sysmon_process_monitor_threshold_task(
async def sysmon_process_monitor_process_task(
service_provider: ServiceProviderDep,
filter_expressions: Optional[list[str]] = None,
interval: int = Sysmontap.DEFAULT_INTERVAL,
interval: int = Sysmontap.DEFAULT_INTERVAL_MS,
duration: Optional[int] = None,
choose: ProcessSelectionMode = ProcessSelectionMode.PROMPT,
keys: Optional[list[str]] = None,
Expand All @@ -501,7 +464,9 @@ async def sysmon_process_monitor_process_task(

monitoring_start_time = None
async with await Sysmontap.create(dvt, interval=interval) as monitor_sysmon:
async for process_snapshot in iter_initialized_processes(monitor_sysmon):
async for process_snapshot in iter_processes(
monitor_sysmon, skip_first_snapshot=_should_skip_first_snapshot(keys)
):
if monitoring_start_time is None:
monitoring_start_time = asyncio.get_running_loop().time()

Expand Down
17 changes: 11 additions & 6 deletions pymobiledevice3/services/dvt/instruments/sysmontap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,37 @@

class Sysmontap(Tap):
IDENTIFIER = "com.apple.instruments.server.services.sysmontap"
DEFAULT_INTERVAL = 500
DEFAULT_INTERVAL_MS = 500
MINIMUM_INTERVAL_MS = 1

def __init__(
self, dvt, process_attributes: list[str], system_attributes: list[str], interval: int = DEFAULT_INTERVAL
self,
dvt,
process_attributes: list[str],
system_attributes: list[str],
interval_ms: int = DEFAULT_INTERVAL_MS,
) -> None:
self.process_attributes_cls = dataclasses.make_dataclass("SysmonProcessAttributes", process_attributes)
self.system_attributes_cls = dataclasses.make_dataclass("SysmonSystemAttributes", system_attributes)

config = {
"ur": interval, # Output frequency ms
"ur": Sysmontap.MINIMUM_INTERVAL_MS, # Output frequency ms
"bm": 0,
"procAttrs": process_attributes,
"sysAttrs": system_attributes,
"cpuUsage": True,
"physFootprint": True, # memory value
"sampleInterval": interval * 1000000,
"sampleInterval": interval_ms * 1_000_000,
}

super().__init__(dvt, self.IDENTIFIER, config)

@classmethod
async def create(cls, dvt, interval: int = DEFAULT_INTERVAL) -> "Sysmontap":
async def create(cls, dvt, interval: int = DEFAULT_INTERVAL_MS) -> "Sysmontap":
async with DeviceInfo(dvt) as device_info:
process_attributes = list(await device_info.sysmon_process_attributes())
system_attributes = list(await device_info.sysmon_system_attributes())
return cls(dvt, process_attributes, system_attributes, interval=interval)
return cls(dvt, process_attributes, system_attributes, interval_ms=interval)

async def iter_processes(self):
async for row in self:
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/developer/dvt/sysmon/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
ProcessSelectionMode,
_process_sort_key,
_select_process_from_snapshot,
iter_initialized_processes,
iter_processes,
sysmon_process_monitor_threshold_task,
sysmon_process_single_task,
)
Expand All @@ -18,14 +18,14 @@
@pytest_asyncio.fixture
async def process_snapshot(dvt) -> list[dict]:
async with await Sysmontap.create(dvt) as sysmon:
async for process_snapshot in iter_initialized_processes(sysmon):
async for process_snapshot in iter_processes(sysmon):
return process_snapshot

pytest.fail("failed to collect an initialized process snapshot")


@pytest.mark.asyncio
async def test_iter_initialized_processes_yields_process_dicts(process_snapshot) -> None:
async def test_iter_processes_yields_process_dicts(process_snapshot) -> None:
assert len(process_snapshot) > 0
assert isinstance(process_snapshot[0], dict)
assert process_snapshot[0].get("pid")
Expand Down
Loading
Loading