diff --git a/linuxpy/io.py b/linuxpy/io.py index 44d8216..7c65c2b 100644 --- a/linuxpy/io.py +++ b/linuxpy/io.py @@ -4,13 +4,25 @@ # Copyright (c) 2023 Tiago Coutinho # Distributed under the GPLv3 license. See LICENSE for more info. +import typing import functools import importlib import os +import io import select +StrOrBytesPath: typing.TypeAlias = str | bytes | os.PathLike[str] | os.PathLike[bytes] -def fopen(path, rw=False, binary=True, blocking=False, close_on_exec=True): +FileDescriptorOrPath: typing.TypeAlias = int | StrOrBytesPath + + +def fopen( + path: StrOrBytesPath, + rw: bool = False, + binary: bool = True, + blocking: bool = False, + close_on_exec: bool = True + ) -> io.FileIO | io.TextIOWrapper: def opener(path, flags): if not blocking: flags |= os.O_NONBLOCK @@ -39,7 +51,7 @@ class IO: class GeventModule: - def __init__(self, name): + def __init__(self, name: str): self.name = name self._module = None @@ -49,7 +61,7 @@ def module(self): self._module = importlib.import_module(f"gevent.{self.name}") return self._module - def __getattr__(self, name): + def __getattr__(self, name: str): attr = getattr(self.module, name) setattr(self, name, attr) return attr @@ -57,7 +69,7 @@ def __getattr__(self, name): class GeventIO: @staticmethod - def open(path, rw=False, blocking=False): + def open(path: FileDescriptorOrPath, rw: bool = False, blocking: bool = False): mode = "rb+" if rw else "rb" import gevent.fileobject diff --git a/linuxpy/video/device.py b/linuxpy/video/device.py index 79a39f0..535fd4f 100644 --- a/linuxpy/video/device.py +++ b/linuxpy/video/device.py @@ -84,7 +84,7 @@ def V4L2_CTRL_ID2CLASS(id_): return id_ & 0x0FFF0000 # unsigned long -def human_pixel_format(ifmt): +def human_pixel_format(ifmt: int): return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8)))) @@ -134,7 +134,7 @@ def human_pixel_format(ifmt): } -def mem_map(fd, length, offset): +def mem_map(fd: int, length: int, offset: int): log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset) return mmap.mmap(to_fd(fd), length, offset=offset) @@ -165,13 +165,13 @@ def raw_crop_caps_to_crop_caps(crop): CropCapability.from_raw = raw_crop_caps_to_crop_caps -def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap: +def raw_read_crop_capabilities(fd: int, buffer_type: BufferType) -> raw.v4l2_cropcap: crop = raw.v4l2_cropcap() crop.type = buffer_type return ioctl(fd, IOC.CROPCAP, crop) -def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability: +def read_crop_capabilities(fd: int, buffer_type: BufferType) -> CropCapability: try: crop = raw_read_crop_capabilities(fd, buffer_type) except OSError as error: @@ -184,7 +184,7 @@ def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability: ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE) -def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False): +def iter_read(fd: int, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False): for index in range(start, stop, step): indexed_struct.index = index try: @@ -202,7 +202,7 @@ def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval= raise -def iter_read_frame_intervals(fd, fmt, w, h): +def iter_read_frame_intervals(fd: int, fmt, w, h): value = raw.v4l2_frmivalenum() value.pixel_format = fmt value.width = w @@ -252,7 +252,7 @@ def iter_read_frame_intervals(fd, fmt, w, h): ) -def iter_read_frame_sizes(fd, pixel_format): +def iter_read_frame_sizes(fd: int, pixel_format): size = raw.v4l2_frmsizeenum() size.index = 0 size.pixel_format = pixel_format @@ -265,20 +265,20 @@ def iter_read_frame_sizes(fd, pixel_format): yield FrameSize(val.index, PixelFormat(val.pixel_format), FrameSizeType(val.type), copy.copy(info)) -def iter_read_pixel_formats_frame_intervals(fd, pixel_formats): +def iter_read_pixel_formats_frame_intervals(fd: int, pixel_formats): for pixel_format in pixel_formats: for size in iter_read_frame_sizes(fd, pixel_format): if size.type == FrameSizeType.DISCRETE: yield from iter_read_frame_intervals(fd, pixel_format, size.info.width, size.info.height) -def read_capabilities(fd): +def read_capabilities(fd: int): caps = raw.v4l2_capability() ioctl(fd, IOC.QUERYCAP, caps) return caps -def iter_read_formats(fd, type): +def iter_read_formats(fd: int, type): format = raw.v4l2_fmtdesc() format.type = type pixel_formats = set(PixelFormat) @@ -312,7 +312,7 @@ def iter_read_formats(fd, type): yield image_format -def iter_read_inputs(fd): +def iter_read_inputs(fd: int): input = raw.v4l2_input() for inp in iter_read(fd, IOC.ENUMINPUT, input): input_type = Input( @@ -328,7 +328,7 @@ def iter_read_inputs(fd): yield input_type -def iter_read_outputs(fd): +def iter_read_outputs(fd: int): output = raw.v4l2_output() for out in iter_read(fd, IOC.ENUMOUTPUT, output): output_type = Output( @@ -343,7 +343,7 @@ def iter_read_outputs(fd): yield output_type -def iter_read_video_standards(fd): +def iter_read_video_standards(fd: int): std = raw.v4l2_standard() for item in iter_read(fd, IOC.ENUMSTD, std): period = item.frameperiod @@ -356,7 +356,7 @@ def iter_read_video_standards(fd): ) -def iter_read_controls(fd): +def iter_read_controls(fd: int): ctrl = raw.v4l2_query_ext_ctrl() nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND ctrl.id = nxt @@ -365,7 +365,7 @@ def iter_read_controls(fd): ctrl_ext.id |= nxt -def iter_read_menu(fd, ctrl): +def iter_read_menu(fd: int, ctrl): qmenu = raw.v4l2_querymenu() qmenu.id = ctrl.id for menu in iter_read( @@ -380,7 +380,7 @@ def iter_read_menu(fd, ctrl): yield copy.deepcopy(menu) -def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer: +def query_buffer(fd: int, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer: buff = raw.v4l2_buffer() buff.type = buffer_type buff.memory = memory @@ -390,14 +390,14 @@ def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw return buff -def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer: +def enqueue_buffer_raw(fd: int, buff: raw.v4l2_buffer) -> raw.v4l2_buffer: if not buff.timestamp.secs: buff.timestamp.set_ns() ioctl(fd, IOC.QBUF, buff) return buff -def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: +def enqueue_buffer(fd: int, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: buff = raw.v4l2_buffer() buff.type = buffer_type buff.memory = memory @@ -408,7 +408,7 @@ def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index return enqueue_buffer_raw(fd, buff) -def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: +def dequeue_buffer(fd: int, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: buff = raw.v4l2_buffer() buff.type = buffer_type buff.memory = memory @@ -418,7 +418,7 @@ def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buff return buff -def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers: +def request_buffers(fd: int, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers: req = raw.v4l2_requestbuffers() req.type = buffer_type req.memory = memory @@ -429,7 +429,7 @@ def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> return req -def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers: +def free_buffers(fd: int, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers: req = raw.v4l2_requestbuffers() req.type = buffer_type req.memory = memory @@ -438,12 +438,12 @@ def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_reques return req -def export_buffer(fd, buffer_type: BufferType, index: int) -> int: +def export_buffer(fd: int, buffer_type: BufferType, index: int) -> int: req = raw.v4l2_exportbuffer(type=buffer_type, index=index) return ioctl(fd, IOC.EXPBUF, req).fd -def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers: +def create_buffers(fd: int, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers: """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O""" req = raw.v4l2_create_buffers() req.format = format @@ -455,11 +455,11 @@ def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> r return req -def set_raw_format(fd, fmt: raw.v4l2_format): +def set_raw_format(fd: int, fmt: raw.v4l2_format): return ioctl(fd, IOC.S_FMT, fmt) -def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): +def set_format(fd: int, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): fmt = raw.v4l2_format() if isinstance(pixel_format, str): pixel_format = raw.v4l2_fourcc(*pixel_format) @@ -473,14 +473,14 @@ def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_forma return set_raw_format(fd, fmt) -def get_raw_format(fd, buffer_type) -> raw.v4l2_format: +def get_raw_format(fd: int, buffer_type) -> raw.v4l2_format: fmt = raw.v4l2_format() fmt.type = buffer_type ioctl(fd, IOC.G_FMT, fmt) return fmt -def get_format(fd, buffer_type) -> Union[Format, MetaFmt]: +def get_format(fd: int, buffer_type) -> Union[Format, MetaFmt]: f = get_raw_format(fd, buffer_type) if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: return MetaFmt( @@ -499,11 +499,11 @@ def get_format(fd, buffer_type) -> Union[Format, MetaFmt]: ) -def try_raw_format(fd, fmt: raw.v4l2_format): +def try_raw_format(fd: int, fmt: raw.v4l2_format): ioctl(fd, IOC.TRY_FMT, fmt) -def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): +def try_format(fd: int, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): fmt = raw.v4l2_format() if isinstance(pixel_format, str): pixel_format = raw.v4l2_fourcc(*pixel_format) @@ -517,14 +517,14 @@ def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_forma return try_raw_format(fd, fmt) -def get_parm(fd, buffer_type): +def get_parm(fd: int, buffer_type): p = raw.v4l2_streamparm() p.type = buffer_type ioctl(fd, IOC.G_PARM, p) return p -def set_fps(fd, buffer_type, fps): +def set_fps(fd: int, buffer_type, fps): # v4l2 fraction is u32 max_denominator = int(min(2**32, 2**32 / fps)) p = raw.v4l2_streamparm() @@ -541,7 +541,7 @@ def set_fps(fd, buffer_type, fps): return ioctl(fd, IOC.S_PARM, p) -def get_fps(fd, buffer_type): +def get_fps(fd: int, buffer_type): p = get_parm(fd, buffer_type) if buffer_type == BufferType.VIDEO_CAPTURE: parm = p.parm.capture @@ -552,17 +552,17 @@ def get_fps(fd, buffer_type): return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator) -def stream_on(fd, buffer_type): +def stream_on(fd: int, buffer_type): btype = cenum(buffer_type) return ioctl(fd, IOC.STREAMON, btype) -def stream_off(fd, buffer_type): +def stream_off(fd: int, buffer_type): btype = cenum(buffer_type) return ioctl(fd, IOC.STREAMOFF, btype) -def set_selection(fd, buffer_type, target, rectangle): +def set_selection(fd: int, buffer_type, target, rectangle): sel = raw.v4l2_selection() sel.type = buffer_type sel.target = target @@ -585,7 +585,7 @@ def get_selection( return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height) -def get_control(fd, id): +def get_control(fd: int, id): control = raw.v4l2_control(id) ioctl(fd, IOC.G_CTRL, control) return control.value @@ -670,7 +670,7 @@ def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_e return data -def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0): +def get_controls_values(fd: int, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0): n = len(controls) ctrls = raw.v4l2_ext_controls() ctrls.which = which @@ -682,7 +682,7 @@ def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.C return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values, strict=False)] -def set_control(fd, id, value): +def set_control(fd: int, id, value): control = raw.v4l2_control(id, value) ioctl(fd, IOC.S_CTRL, control) @@ -731,7 +731,7 @@ def get_priority(fd) -> Priority: return Priority(priority.value) -def set_priority(fd, priority: Priority): +def set_priority(fd: int, priority: Priority): priority = ctypes.c_uint(priority.value) ioctl(fd, IOC.S_PRIORITY, priority) @@ -749,7 +749,7 @@ def subscribe_event( ioctl(fd, IOC.SUBSCRIBE_EVENT, sub) -def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0): +def unsubscribe_event(fd: int, event_type: EventType = EventType.ALL, id: int = 0): sub = raw.v4l2_event_subscription() sub.type = event_type sub.id = id @@ -761,7 +761,7 @@ def deque_event(fd) -> raw.v4l2_event: return ioctl(fd, IOC.DQEVENT, event) -def set_edid(fd, edid): +def set_edid(fd: int, edid): if len(edid) % 128: raise ValueError(f"EDID length {len(edid)} is not multiple of 128") edid_struct = raw.v4l2_edid() @@ -773,11 +773,11 @@ def set_edid(fd, edid): ioctl(fd, IOC.S_EDID, edid_struct) -def clear_edid(fd): +def clear_edid(fd: int): set_edid(fd, b"") -def get_edid(fd): +def get_edid(fd: int): edid_struct = raw.v4l2_edid() ioctl(fd, IOC.G_EDID, edid_struct) if edid_struct.blocks == 0: @@ -789,39 +789,39 @@ def get_edid(fd): return string_at(edid_struct.edid, edid_len) -def get_input(fd): +def get_input(fd: int): inp = ctypes.c_uint() ioctl(fd, IOC.G_INPUT, inp) return inp.value -def set_input(fd, index: int): +def set_input(fd: int, index: int): index = ctypes.c_uint(index) ioctl(fd, IOC.S_INPUT, index) -def get_output(fd): +def get_output(fd: int): out = ctypes.c_uint() ioctl(fd, IOC.G_OUTPUT, out) return out.value -def set_output(fd, index: int): +def set_output(fd: int, index: int): index = ctypes.c_uint(index) ioctl(fd, IOC.S_OUTPUT, index) -def get_std(fd) -> StandardID: +def get_std(fd: int) -> StandardID: out = ctypes.c_uint64() ioctl(fd, IOC.G_STD, out) return StandardID(out.value) -def set_std(fd, std): +def set_std(fd: int, std): ioctl(fd, IOC.S_STD, std) -def query_std(fd) -> StandardID: +def query_std(fd: int) -> StandardID: out = ctypes.c_uint64() ioctl(fd, IOC.QUERYSTD, out) return StandardID(out.value) @@ -848,7 +848,7 @@ def _translate_subdev_format(fmt: raw.v4l2_subdev_format): ) -def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format: +def get_subdevice_format(fd: int, pad: int = 0) -> raw.v4l2_subdev_format: fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE) return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt)) @@ -856,32 +856,32 @@ def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format: # Helpers -def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: +def request_and_query_buffer(fd: int, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: """request + query buffers""" buffers = request_and_query_buffers(fd, buffer_type, memory, 1) return buffers[0] -def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: +def request_and_query_buffers(fd: int, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: """request + query buffers""" request_buffers(fd, buffer_type, memory, count) return [query_buffer(fd, buffer_type, memory, index) for index in range(count)] -def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap: +def mmap_from_buffer(fd: int, buff: raw.v4l2_buffer) -> mmap.mmap: return mem_map(fd, buff.length, offset=buff.m.offset) -def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]: +def create_mmap_buffers(fd: int, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]: """create buffers + mmap_from_buffer""" return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)] -def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap: +def create_mmap_buffer(fd: int, buffer_type: BufferType, memory: Memory) -> mmap.mmap: return create_mmap_buffers(fd, buffer_type, memory, 1) -def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: +def enqueue_buffers(fd: int, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)] @@ -1763,7 +1763,12 @@ def array(self): class VideoCapture(BufferManager): - def __init__(self, device: Device, size: int = 2, buffer_type=None): + def __init__( + self, + device: Device, + size: int = 2, + buffer_type: Optional["type[MemoryMap] | type[ReadSource]"] = None + ): super().__init__(device, BufferType.VIDEO_CAPTURE, size) self.buffer = None self._buffer_type = buffer_type diff --git a/linuxpy/video/util.py b/linuxpy/video/util.py index 85d2b3d..4c4d215 100644 --- a/linuxpy/video/util.py +++ b/linuxpy/video/util.py @@ -5,9 +5,14 @@ # Distributed under the GPLv3 license. See LICENSE for more info. -def v4l2_fourcc(a, b, c, d): +import typing + +CharType: typing.TypeAlias = str | bytes + + +def v4l2_fourcc(a: CharType, b: CharType, c: CharType, d: CharType): return ord(a) | (ord(b) << 8) | (ord(c) << 16) | (ord(d) << 24) -def v4l2_fourcc_be(a, b, c, d): +def v4l2_fourcc_be(a: CharType, b: CharType, c: CharType, d: CharType): return (v4l2_fourcc(a, b, c, d)) | (1 << 31)