diff --git a/doc/conf.py b/doc/conf.py index 0a4fbdb..41a30b2 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # sdds documentation build configuration file, created by # sphinx-quickstart on Tue Feb 6 12:10:18 2018. diff --git a/pyproject.toml b/pyproject.toml index 8b0e213..f9ab3a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,3 +70,56 @@ homepage = "https://github.com/pylhc/sdds" repository = "https://github.com/pylhc/sdds" documentation = "https://pylhc.github.io/sdds/" changelog = "https://github.com/pylhc/sdds/blob/master/CHANGELOG.md" + +# ----- Dev Tools Configuration ----- # + +[tool.ruff] +exclude = [ + ".eggs", + ".git", + ".mypy_cache", + ".venv", + "_build", + "build", + "dist", +] + +# Assume Python 3.10+ +target-version = "py310" + +line-length = 100 +indent-width = 4 + +[tool.ruff.lint] +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" +ignore = [ + "E501", # line too long + "FBT001", # boolean-type-hint-positional-argument + "FBT002", # boolean-default-value-positional-argument + "PT019", # pytest-fixture-param-without-value (but suggested solution fails) +] +extend-select = [ + "F", # Pyflakes rules + "W", # PyCodeStyle warnings + "E", # PyCodeStyle errors + "I", # Sort imports properly + "A", # Detect shadowed builtins + "N", # enforce naming conventions, e.g. ClassName vs function_name + "UP", # Warn if certain things can changed due to newer Python versions + "C4", # Catch incorrect use of comprehensions, dict, list, etc + "FA", # Enforce from __future__ import annotations + "FBT", # detect boolean traps + "ISC", # Good use of string concatenation + "BLE", # disallow catch-all exceptions + "ICN", # Use common import conventions + "RET", # Good return practices + "SIM", # Common simplification rules + "TID", # Some good import practices + "TC", # Enforce importing certain types in a TYPE_CHECKING block + "PTH", # Use pathlib instead of os.path + "NPY", # Some numpy-specific things +] +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] diff --git a/sdds/__init__.py b/sdds/__init__.py index b49a99b..c92d37a 100644 --- a/sdds/__init__.py +++ b/sdds/__init__.py @@ -1,4 +1,5 @@ """Exposes SddsFile, read_sdds and write_sdds directly in sdds namespace.""" + from sdds.classes import SddsFile from sdds.reader import read_sdds from sdds.writer import write_sdds diff --git a/sdds/classes.py b/sdds/classes.py index 6077913..f790a27 100644 --- a/sdds/classes.py +++ b/sdds/classes.py @@ -6,10 +6,16 @@ Implementation are based on documentation at: https://ops.aps.anl.gov/manuals/SDDStoolkit/SDDStoolkitsu2.html """ + +# Note: do not add 'from __future__ import annotations' in this file, +# as the __post_init__ method relies on the type hints determination +# at runtime and will fail if they are all made strings (in asserts). + import logging import warnings +from collections.abc import Iterator from dataclasses import dataclass, fields -from typing import Any, ClassVar, Dict, Iterator, List, Optional, Tuple +from typing import Any, ClassVar LOGGER = logging.getLogger(__name__) @@ -33,11 +39,27 @@ "boolean": "i1", "string": "s", } -NUMTYPES_SIZES = {"float": 4, "double": 8, "short": 2, "long": 4, "llong": 8, "char": 1, "boolean": 1} -NUMTYPES_CAST = {"float": float, "double": float, "short": int, "long": int, "llong": int, "char": str, "boolean": int} +NUMTYPES_SIZES = { + "float": 4, + "double": 8, + "short": 2, + "long": 4, + "llong": 8, + "char": 1, + "boolean": 1, +} +NUMTYPES_CAST = { + "float": float, + "double": float, + "short": int, + "long": int, + "llong": int, + "char": str, + "boolean": int, +} -def get_dtype_str(type_: str, endianness: str = "big", length: Optional[int] = None): +def get_dtype_str(type_: str, endianness: str = "big", length: int | None = None): return f"{ENDIAN[endianness]}{length if length is not None else ''}{NUMTYPES[type_]}" @@ -62,8 +84,8 @@ class Description: contents (str): Optional. Formal specification of the type of data stored in a data set. """ - text: Optional[str] = None - contents: Optional[str] = None + text: str | None = None + contents: str | None = None TAG: ClassVar[str] = "&description" def __repr__(self): @@ -120,11 +142,11 @@ class Definition: name: str type: str - symbol: Optional[str] = None - units: Optional[str] = None - description: Optional[str] = None - format_string: Optional[str] = None - TAG: ClassVar[Optional[str]] = None + symbol: str | None = None + units: str | None = None + description: str | None = None + format_string: str | None = None + TAG: ClassVar[str | None] = None def __post_init__(self): # Fix types (probably strings from reading files) by using the type-hints @@ -149,7 +171,9 @@ def __post_init__(self): # all is fine continue - LOGGER.debug(f"converting {field.name}: " f"{type(value).__name__} -> {hinted_type.__name__}") + LOGGER.debug( + f"converting {field.name}: {type(value).__name__} -> {hinted_type.__name__}" + ) setattr(self, field.name, hinted_type(value)) def get_key_value_string(self) -> str: @@ -158,7 +182,9 @@ def get_key_value_string(self) -> str: Hint: `ClassVars` (like ``TAG``) are ignored in `fields`. """ field_values = {field.name: getattr(self, field.name) for field in fields(self)} - return ", ".join([f"{key}={value}" for key, value in field_values.items() if value is not None]) + return ", ".join( + [f"{key}={value}" for key, value in field_values.items() if value is not None] + ) def __repr__(self): return f"" @@ -196,7 +222,7 @@ class Parameter(Definition): """ TAG: ClassVar[str] = "¶meter" - fixed_value: Optional[str] = None + fixed_value: str | None = None @dataclass @@ -219,9 +245,9 @@ class Array(Definition): """ TAG: ClassVar[str] = "&array" - field_length: Optional[int] = None - group_name: Optional[str] = None - dimensions: Optional[int] = None + field_length: int | None = None + group_name: str | None = None + dimensions: int | None = None @dataclass @@ -279,16 +305,16 @@ class SddsFile: """ version: str # This should always be "SDDS1" - description: Optional[Description] - definitions: Dict[str, Definition] - values: Dict[str, Any] + description: Description | None + definitions: dict[str, Definition] + values: dict[str, Any] def __init__( self, version: str, - description: Optional[Description], - definitions_list: List[Definition], - values_list: List[Any], + description: Description | None, + definitions_list: list[Definition], + values_list: list[Any], ) -> None: self.version = version @@ -298,12 +324,14 @@ def __init__( self.description = description self.definitions = {definition.name: definition for definition in definitions_list} - self.values = {definition.name: value for definition, value in zip(definitions_list, values_list)} + self.values = { + definition.name: value for definition, value in zip(definitions_list, values_list) + } - def __getitem__(self, name: str) -> Tuple[Definition, Any]: + def __getitem__(self, name: str) -> tuple[Definition, Any]: return self.definitions[name], self.values[name] - def __iter__(self) -> Iterator[Tuple[Definition, Any]]: + def __iter__(self) -> Iterator[tuple[Definition, Any]]: for def_name in self.definitions: yield self[def_name] diff --git a/sdds/reader.py b/sdds/reader.py index d48e496..b61a7de 100644 --- a/sdds/reader.py +++ b/sdds/reader.py @@ -5,32 +5,38 @@ This module contains the reading functionality of ``sdds``. It provides a high-level function to read SDDS files in different formats, and a series of helpers. """ + +from __future__ import annotations # For type hints in Python < 3.10 + import gzip import os import pathlib import struct import sys -from collections.abc import Callable +from collections.abc import Callable, Generator from contextlib import AbstractContextManager from functools import partial -from typing import IO, Any, Dict, Generator, List, Optional, Tuple, Type, Union +from typing import IO, Any import numpy as np -from sdds.classes import (ENCODING, NUMTYPES_CAST, NUMTYPES_SIZES, Array, - Column, Data, Definition, Description, Parameter, - SddsFile, get_dtype_str) +from sdds.classes import ( + ENCODING, + NUMTYPES_CAST, + NUMTYPES_SIZES, + Array, + Column, + Data, + Definition, + Description, + Parameter, + SddsFile, + get_dtype_str, +) # ----- Providing Opener Abstractions for the Reader ----- # -# On Python 3.8, we cannot subscript contextlib.AbstractContextManager or collections.abc.Callable, -# which became possible with PEP 585 in Python 3.9. We will check for the runtime version and simply -# not subscript if running on 3.8. The cost here is degraded typing. -# TODO: remove this conditional once Python 3.8 has reached EoL and we drop support for it -if sys.version_info < (3, 9, 0): # we're running on 3.8, which is our lowest supported - OpenerType = Callable -else: - OpenerType = Callable[[os.PathLike], AbstractContextManager[IO]] +OpenerType = Callable[[os.PathLike], AbstractContextManager[IO]] binary_open = partial(open, mode="rb") # default opening mode, simple sdds files gzip_open = partial(gzip.open, mode="rb") # for gzip-compressed sdds files @@ -40,8 +46,8 @@ def read_sdds( - file_path: Union[pathlib.Path, str], - endianness: Optional[str] = None, + file_path: pathlib.Path | str, + endianness: str | None = None, opener: OpenerType = binary_open, ) -> SddsFile: """ @@ -112,15 +118,17 @@ def read_sdds( def _read_header( inbytes: IO[bytes], -) -> Tuple[str, List[Definition], Optional[Description], Data]: +) -> tuple[str, list[Definition], Description | None, Data]: word_gen = _gen_words(inbytes) version = next(word_gen) # First token is the SDDS version - assert version == "SDDS1", "This module is compatible with SDDS v1 only... are there really other versions?" - definitions: List[Definition] = [] - description: Optional[Description] = None - data: Optional[Data] = None + assert version == "SDDS1", ( + "This module is compatible with SDDS v1 only... are there really other versions?" + ) + definitions: list[Definition] = [] + description: Description | None = None + data: Data | None = None for word in word_gen: - def_dict: Dict[str, str] = _get_def_as_dict(word_gen) + def_dict: dict[str, str] = _get_def_as_dict(word_gen) if word in (Column.TAG, Parameter.TAG, Array.TAG): definitions.append( {Column.TAG: Column, Parameter.TAG: Parameter, Array.TAG: Array}[word]( @@ -146,22 +154,26 @@ def _read_header( return version, definitions, description, data -def _sort_definitions(orig_defs: List[Definition]) -> List[Definition]: +def _sort_definitions(orig_defs: list[Definition]) -> list[Definition]: """ Sorts the definitions in the parameter, array, column order. According to the specification, parameters appear first in data pages then arrays and then columns. Inside each group they follow the order of appearance in the header. """ - definitions: List[Definition] = [definition for definition in orig_defs if isinstance(definition, Parameter)] + definitions: list[Definition] = [ + definition for definition in orig_defs if isinstance(definition, Parameter) + ] definitions.extend([definition for definition in orig_defs if isinstance(definition, Array)]) definitions.extend([definition for definition in orig_defs if isinstance(definition, Column)]) return definitions -def _read_data(data: Data, definitions: List[Definition], inbytes: IO[bytes], endianness: str) -> List[Any]: +def _read_data( + data: Data, definitions: list[Definition], inbytes: IO[bytes], endianness: str +) -> list[Any]: if data.mode == "binary": return _read_data_binary(definitions, inbytes, endianness) - elif data.mode == "ascii": + if data.mode == "ascii": return _read_data_ascii(definitions, inbytes) raise ValueError(f"Unsupported data mode {data.mode}.") @@ -172,17 +184,24 @@ def _read_data(data: Data, definitions: List[Definition], inbytes: IO[bytes], en ############################################################################## -def _read_data_binary(definitions: List[Definition], inbytes: IO[bytes], endianness: str) -> List[Any]: +def _read_data_binary( + definitions: list[Definition], inbytes: IO[bytes], endianness: str +) -> list[Any]: row_count: int = _read_bin_int(inbytes, endianness) # First int in bin data - functs_dict: Dict[Type[Definition], Callable] = { + functs_dict: dict[type[Definition], Callable] = { Parameter: _read_bin_param, Column: lambda x, y, z: _read_bin_column(x, y, z, row_count), Array: _read_bin_array, } - return [functs_dict[definition.__class__](inbytes, definition, endianness) for definition in definitions] + return [ + functs_dict[definition.__class__](inbytes, definition, endianness) + for definition in definitions + ] -def _read_bin_param(inbytes: IO[bytes], definition: Parameter, endianness: str) -> Union[int, float, str]: +def _read_bin_param( + inbytes: IO[bytes], definition: Parameter, endianness: str +) -> int | float | str: try: if definition.fixed_value is not None: if definition.type == "string": @@ -193,7 +212,9 @@ def _read_bin_param(inbytes: IO[bytes], definition: Parameter, endianness: str) if definition.type == "string": str_len: int = _read_bin_int(inbytes, endianness) return _read_string(inbytes, str_len, endianness) - return NUMTYPES_CAST[definition.type](_read_bin_numeric(inbytes, definition.type, 1, endianness)[0]) + return NUMTYPES_CAST[definition.type]( + _read_bin_numeric(inbytes, definition.type, 1, endianness)[0] + ) def _read_bin_column(inbytes: IO[bytes], definition: Column, endianness: str, row_count: int): @@ -216,7 +237,9 @@ def _read_bin_array(inbytes: IO[bytes], definition: Array, endianness: str) -> A return data.reshape(dims) -def _read_bin_array_len(inbytes: IO[bytes], num_dims: Optional[int], endianness: str) -> Tuple[List[int], int]: +def _read_bin_array_len( + inbytes: IO[bytes], num_dims: int | None, endianness: str +) -> tuple[list[int], int]: if num_dims is None: num_dims = 1 @@ -246,10 +269,9 @@ def _read_string(inbytes: IO[bytes], str_len: int, endianness: str) -> str: ############################################################################## -def _read_data_ascii(definitions: List[Definition], inbytes: IO[bytes]) -> List[Any]: +def _read_data_ascii(definitions: list[Definition], inbytes: IO[bytes]) -> list[Any]: def _ascii_generator(ascii_text): - for line in ascii_text: - yield line + yield from ascii_text # Convert bytes to ASCII, separate by lines and remove comments ascii_text = [chr(r) for r in inbytes.read()] @@ -260,7 +282,7 @@ def _ascii_generator(ascii_text): ascii_gen = _ascii_generator(ascii_text) # Iterate through every parameters and arrays in the file - data: List[Any] = [] + data: list[Any] = [] for definition in definitions: # Call the function handling the tag we're on # Change the current line according to the tag and dimensions @@ -272,7 +294,9 @@ def _ascii_generator(ascii_text): return data -def _read_ascii_parameter(ascii_gen: Generator[str, None, None], definition: Parameter) -> Union[str, int, float]: +def _read_ascii_parameter( + ascii_gen: Generator[str, None, None], definition: Parameter +) -> str | int | float: # Check if we got fixed values, no need to read a line if that's the case if definition.fixed_value is not None: if definition.type == "string": @@ -297,7 +321,7 @@ def _read_ascii_array(ascii_gen: Generator[str, None, None], definition: Array) dimensions = np.array(next(ascii_gen).split(), dtype="int") # Get all the data given by the dimensions - data: List[str] = [] + data: list[str] = [] while len(data) != np.prod(dimensions): # The values on each line are split by a space data += next(ascii_gen).strip().split(" ") @@ -308,9 +332,7 @@ def _read_ascii_array(ascii_gen: Generator[str, None, None], definition: Array) # Convert to np.array so that it can be reshaped to reflect the dimensions npdata = np.array(data) - npdata = npdata.reshape(dimensions) - - return npdata + return npdata.reshape(dimensions) ############################################################################## @@ -348,17 +370,19 @@ def _gen_real_lines(inbytes: IO[bytes]) -> Generator[str, None, None]: def _gen_words(inbytes: IO[bytes]) -> Generator[str, None, None]: for line in _gen_real_lines(inbytes): - for word in line.split(): - yield word + yield from line.split() return -def _get_def_as_dict(word_gen: Generator[str, None, None]) -> Dict[str, str]: - raw_str: List[str] = [] +def _get_def_as_dict(word_gen: Generator[str, None, None]) -> dict[str, str]: + raw_str: list[str] = [] for word in word_gen: if word.strip() == "&end": recomposed: str = " ".join(raw_str) parts = [assign for assign in recomposed.split(",") if assign] - return {key.strip(): value.strip() for (key, value) in [assign.split("=") for assign in parts]} + return { + key.strip(): value.strip() + for (key, value) in [assign.split("=") for assign in parts] + } raw_str.append(word.strip()) raise ValueError("EOF found while looking for &end tag.") diff --git a/sdds/writer.py b/sdds/writer.py index 17d1f55..d1039b0 100644 --- a/sdds/writer.py +++ b/sdds/writer.py @@ -5,16 +5,32 @@ This module contains the writing functionality of ``sdds``. It provides a high-level function to write SDDS files in different formats, and a series of helpers. """ + +from __future__ import annotations # For type hints in Python < 3.10 + import pathlib import struct -from typing import IO, Any, Iterable, List, Tuple, Union +from typing import IO, TYPE_CHECKING, Any import numpy as np -from sdds.classes import ENCODING, Array, Column, Data, Definition, Description, Parameter, SddsFile, get_dtype_str +from sdds.classes import ( + ENCODING, + Array, + Column, + Data, + Definition, + Description, + Parameter, + SddsFile, + get_dtype_str, +) + +if TYPE_CHECKING: + from collections.abc import Iterable -def write_sdds(sdds_file: SddsFile, output_path: Union[pathlib.Path, str]) -> None: +def write_sdds(sdds_file: SddsFile, output_path: pathlib.Path | str) -> None: """ Writes SddsFile object into ``output_path``. The byteorder will be big-endian, independent of the byteorder of the current machine. @@ -30,7 +46,7 @@ def write_sdds(sdds_file: SddsFile, output_path: Union[pathlib.Path, str]) -> No _write_data(names, sdds_file, outbytes) -def _write_header(sdds_file: SddsFile, outbytes: IO[bytes]) -> List[str]: +def _write_header(sdds_file: SddsFile, outbytes: IO[bytes]) -> list[str]: outbytes.writelines(("SDDS1\n".encode(ENCODING), "!# big-endian\n".encode(ENCODING))) names = [] if sdds_file.description is not None: @@ -44,17 +60,17 @@ def _write_header(sdds_file: SddsFile, outbytes: IO[bytes]) -> List[str]: return names -def _sdds_def_as_str(definition: Union[Description, Definition, Data]) -> str: +def _sdds_def_as_str(definition: Description | Definition | Data) -> str: return f"{definition.TAG} {definition.get_key_value_string()} &end\n" -def _write_data(names: List[str], sdds_file: SddsFile, outbytes: IO[bytes]) -> None: +def _write_data(names: list[str], sdds_file: SddsFile, outbytes: IO[bytes]) -> None: # row_count: outbytes.write(np.array(0, dtype=get_dtype_str("long")).tobytes()) - parameters: List[Tuple[Parameter, Any]] = [] - arrays: List[Tuple[Array, Any]] = [] - columns: List[Tuple[Column, Any]] = [] + parameters: list[tuple[Parameter, Any]] = [] + arrays: list[tuple[Array, Any]] = [] + columns: list[tuple[Column, Any]] = [] for name in names: if isinstance(sdds_file[name][0], Parameter): parameters.append(sdds_file[name]) # type: ignore @@ -67,7 +83,7 @@ def _write_data(names: List[str], sdds_file: SddsFile, outbytes: IO[bytes]) -> N _write_columns(columns, outbytes) -def _write_parameters(param_gen: Iterable[Tuple[Parameter, Any]], outbytes: IO[bytes]): +def _write_parameters(param_gen: Iterable[tuple[Parameter, Any]], outbytes: IO[bytes]): for param_def, value in param_gen: if param_def.type == "string": _write_string(value, outbytes) @@ -75,15 +91,14 @@ def _write_parameters(param_gen: Iterable[Tuple[Parameter, Any]], outbytes: IO[b outbytes.write(np.array(value, dtype=get_dtype_str(param_def.type)).tobytes()) -def _write_arrays(array_gen: Iterable[Tuple[Array, Any]], outbytes: IO[bytes]): +def _write_arrays(array_gen: Iterable[tuple[Array, Any]], outbytes: IO[bytes]): def get_dimensions_from_array(value): # Return the number of items per dimension # For an array a[n][m], returns [n, m] - if isinstance(value, np.ndarray) or isinstance(value, list): + if isinstance(value, np.ndarray | list): if len(value) == 0: return [0] - else: - return [len(value)] + get_dimensions_from_array(value[0]) + return [len(value)] + get_dimensions_from_array(value[0]) return [] for array_def, value in array_gen: @@ -99,7 +114,7 @@ def get_dimensions_from_array(value): outbytes.write(np.array(value, dtype=get_dtype_str(array_def.type)).tobytes()) -def _write_columns(col_gen: Iterable[Tuple[Column, Any]], outbytes: IO[bytes]): +def _write_columns(col_gen: Iterable[tuple[Column, Any]], outbytes: IO[bytes]): # TODO: Implement the columns thing. pass diff --git a/tests/test_sdds.py b/tests/test_sdds.py index 8d6fd35..71aa03a 100644 --- a/tests/test_sdds.py +++ b/tests/test_sdds.py @@ -1,9 +1,7 @@ import io -import os import pathlib import struct import sys -from typing import Dict import numpy as np import pytest @@ -20,7 +18,15 @@ SddsFile, get_dtype_str, ) -from sdds.reader import _gen_words, _get_def_as_dict, _read_data, _read_header, _sort_definitions, gzip_open, read_sdds +from sdds.reader import ( + _gen_words, + _get_def_as_dict, + _read_data, + _read_header, + _sort_definitions, + gzip_open, + read_sdds, +) from sdds.writer import _sdds_def_as_str, write_sdds CURRENT_DIR = pathlib.Path(__file__).parent @@ -187,7 +193,7 @@ def test_read_header_optionals(self): def test_def_as_dict(): - test_str = b"test1=value1, test2= value2, \n" b"test3=value3, &end" + test_str = b"test1=value1, test2= value2, \ntest3=value3, &end" word_gen = _gen_words(io.BytesIO(test_str)) def_dict = _get_def_as_dict(word_gen) assert def_dict["test1"] == "value1" @@ -221,7 +227,7 @@ def template_ascii_read_write_read(self, filepath, output): if not isinstance(value, np.ndarray): values_equal = np.isclose(value, new_val, atol=0.0001) elif isinstance(value[0], np.str_): - values_equal = all([a == b for a, b in zip(value, new_val)]) + values_equal = all(a == b for a, b in zip(value, new_val)) else: values_equal = np.isclose(value, new_val, atol=0.0001).all() @@ -331,7 +337,7 @@ def _write_read_header(): assert def_dict["type"] == original.type -def _header_from_dict(d: Dict[str, Dict[str, str]]) -> str: +def _header_from_dict(d: dict[str, dict[str, str]]) -> str: """Build a quick header from given dict.""" d = {k: v.copy() for k, v in d.items()} return ( @@ -354,7 +360,7 @@ def _sdds_file_pathlib() -> pathlib.Path: @pytest.fixture() def _sdds_file_str() -> str: - return os.path.join(os.path.dirname(__file__), "inputs", "test_file.sdds") + return str(CURRENT_DIR / "inputs" / "test_file.sdds") @pytest.fixture() @@ -364,7 +370,7 @@ def _sdds_gzipped_file_pathlib() -> pathlib.Path: @pytest.fixture() def _sdds_gzipped_file_str() -> str: - return os.path.join(os.path.dirname(__file__), "inputs", "test_file.sdds.gz") + return str(CURRENT_DIR / "inputs" / "test_file.sdds.gz") @pytest.fixture()