Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions linuxpy/proc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#
# This file is part of the linuxpy project
#
# Copyright (c) 2023 Tiago Coutinho
# Distributed under the GPLv3 license. See LICENSE for more info.

from pathlib import Path

from linuxpy.util import try_numeric
Expand All @@ -7,6 +13,22 @@
CPU_INFO_PATH: Path = PROC_PATH / "cpuinfo"
MEM_INFO_PATH: Path = PROC_PATH / "meminfo"
MODULES_PATH: Path = PROC_PATH / "modules"
STAT_PATH: Path = PROC_PATH / "stat"
NET_PATH: Path = PROC_PATH / "net"
DEV_PATH: Path = NET_PATH / "dev"
WIRELESS_PATH: Path = NET_PATH / "wireless"
NETSTAT_PATH = NET_PATH / "netstat"
SNMP_PATH = NET_PATH / "snmp"


def _iter_read_kv(path: Path):
with path.open() as fobj:
lines = fobj.readlines()
for keys, values in zip(lines[::2], lines[1::2]):
key, *keys = keys.split()
value, *values = values.split()
assert key == value
yield key.rstrip(":"), dict(zip(keys, [int(value) for value in values]))


def iter_cpu_info():
Expand All @@ -23,6 +45,10 @@ def iter_cpu_info():
yield info


def cpu_info():
return tuple(iter_cpu_info())


def iter_mem_info():
data = MEM_INFO_PATH.read_text()
for line in data.splitlines():
Expand All @@ -34,6 +60,10 @@ def iter_mem_info():
yield key, value


def mem_info():
return dict(iter_mem_info())


def iter_modules():
data = MODULES_PATH.read_text()
for line in data.splitlines():
Expand All @@ -48,3 +78,110 @@ def iter_modules():
mod["state"] = fields[4]
mod["offset"] = int(fields[5], 16)
yield mod


def modules():
return tuple(iter_modules())


def iter_stat():
CPU = "user", "nice", "system", "idle", "iowait", "irq", "softirq", "steal", "guest", "guest_nice"
data = STAT_PATH.read_text()
for line in data.splitlines():
name, *fields = line.split()
if name.startswith("cpu"):
payload = dict(zip(CPU, map(int, fields)))
elif name in {"intr", "softirq"}:
total, *fields = (int(field) for field in fields)
payload = dict(enumerate(fields, start=1))
payload["total"] = total
elif name in {"ctxt", "btime", "processes", "procs_running", "procs_blocked"}:
payload = int(fields[0])
else:
continue
yield name, payload


def stat():
return dict(iter_stat())


def iter_dev():
with DEV_PATH.open() as fobj:
lines = fobj.readlines()
# Skip the header lines (usually first 2 lines)
for line in lines[2:]:
fields = line.strip().split()
if not fields:
continue
yield {
"interface": fields[0].rstrip(":"),
"receive": {
"bytes": int(fields[1]),
"packets": int(fields[2]),
"errs": int(fields[3]),
"drop": int(fields[4]),
"fifo": int(fields[5]),
"frame": int(fields[6]),
"compressed": int(fields[7]),
"multicast": int(fields[8]),
},
"transmit": {
"bytes": int(fields[9]),
"packets": int(fields[10]),
"errs": int(fields[11]),
"drop": int(fields[12]),
"fifo": int(fields[13]),
"colls": int(fields[14]),
"carrier": int(fields[15]),
"compressed": int(fields[16]),
},
}


def dev():
return tuple(iter_dev())


def iter_wireless():
with WIRELESS_PATH.open() as fobj:
lines = fobj.readlines()
# Skip the header lines (usually first 2 lines)
for line in lines[2:]:
fields = line.strip().split()
if not fields:
continue
yield {
"interface": fields[0].rstrip(":"),
"status": int(fields[1], 16),
"quality": {
"link": int(fields[2].rstrip(".")),
"level": int(fields[3].rstrip(".")),
"noise": int(fields[4].rstrip(".")),
},
"discarded": {
"nwid": int(fields[5]),
"crypt": int(fields[6]),
"misc": int(fields[7]),
},
}


def wireless():
return tuple(iter_wireless())


def iter_netstat():
return _iter_read_kv(NETSTAT_PATH)


def netstat():
return dict(iter_netstat())


def iter_snmp():
return _iter_read_kv(SNMP_PATH)


def snmp():
return dict(iter_snmp())
4 changes: 2 additions & 2 deletions linuxpy/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def to_fd(fd: FDLike):
int16 = functools.partial(int, base=16)


def try_numeric(text: str):
def try_numeric(text: str, order=(int, int16, float)):
"""
Try to translate given text into int, int base 16 or float.
Returns the orig and return the original text if it fails.
Expand All @@ -97,7 +97,7 @@ def try_numeric(text: str):
Returns:
int, float or str: The converted text
"""
for func in (int, int16, float):
for func in order:
try:
return func(text)
except ValueError:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_gpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,10 @@ def _(chip=emulate_gpiochip):
assert request[1, 2] == {2: 1, 1: 0}

request[:] = 9 * [1]
assert request[:] == {i: 1 for i in range(1, 10)}
assert request[:] == dict.fromkeys(range(1, 10), 1)

request[:] = 0
assert request[:] == {i: 0 for i in range(1, 10)}
assert request[:] == dict.fromkeys(range(1, 10), 0)

request[3, 4, 7:10] = 1, 0, 1, 0, 1
assert request[3, 4, 7:10] == {3: 1, 4: 0, 7: 1, 8: 0, 9: 1}
Expand Down Expand Up @@ -742,10 +742,10 @@ def _():
assert request[9:12] == {9: 0, 10: 0, 11: 0}

request[:] = 9 * [1]
assert request[:] == {i: 1 for i in range(5, 14)}
assert request[:] == dict.fromkeys(range(5, 14), 1)

request[:] = 0
assert request[:] == {i: 0 for i in range(5, 14)}
assert request[:] == dict.fromkeys(range(5, 14), 0)

request[7, 8, 11:14] = 1, 0, 1, 0, 1
assert request[7, 8, 11:14] == {7: 1, 8: 0, 11: 1, 12: 0, 13: 1}
Expand Down