-
Notifications
You must be signed in to change notification settings - Fork 20
ft: Instantiate Specfile from strings or file-like objects #458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,27 @@ | ||
| # Copyright Contributors to the Packit project. | ||
| # SPDX-License-Identifier: MIT | ||
|
|
||
| import copy | ||
| import datetime | ||
| import logging | ||
| import re | ||
| import types | ||
| from dataclasses import dataclass | ||
| from io import FileIO, StringIO | ||
| from pathlib import Path | ||
| from typing import Generator, List, Optional, Tuple, Type, Union, cast | ||
| from typing import ( | ||
| IO, | ||
| Any, | ||
| Dict, | ||
| Generator, | ||
| List, | ||
| Optional, | ||
| TextIO, | ||
| Tuple, | ||
| Type, | ||
| Union, | ||
| cast, | ||
| ) | ||
|
|
||
| import rpm | ||
|
|
||
|
|
@@ -31,6 +45,7 @@ | |
| from specfile.sources import Patches, Sources | ||
| from specfile.spec_parser import SpecParser | ||
| from specfile.tags import Tag, Tags | ||
| from specfile.types import EncodingArgs | ||
| from specfile.value_parser import ( | ||
| SUBSTITUTION_GROUP_PREFIX, | ||
| ConditionalMacroExpansion, | ||
|
|
@@ -50,19 +65,27 @@ class Specfile: | |
| autosave: Whether to automatically save any changes made. | ||
| """ | ||
|
|
||
| ENCODING_ARGS: EncodingArgs = {"encoding": "utf8", "errors": "surrogateescape"} | ||
|
|
||
| def __init__( | ||
| self, | ||
| path: Union[Path, str], | ||
| path: Optional[Union[Path, str]] = None, | ||
| content: Optional[str] = None, | ||
| file: Optional[IO] = None, | ||
| sourcedir: Optional[Union[Path, str]] = None, | ||
| autosave: bool = False, | ||
| macros: Optional[List[Tuple[str, Optional[str]]]] = None, | ||
| force_parse: bool = False, | ||
| ) -> None: | ||
| """ | ||
| Initializes a specfile object. | ||
| Initializes a specfile object. You can specify either a path to the spec file, | ||
| its content as a string or a file object representing it. `sourcedir` is optional | ||
| if `path` or a named `file` is provided and will be set to the parent directory. | ||
nforro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Args: | ||
| path: Path to the spec file. | ||
| content: String containing the spec file content. | ||
| file: File object representing the spec file. | ||
| sourcedir: Path to sources and patches. | ||
| autosave: Whether to automatically save any changes made. | ||
| macros: List of extra macro definitions. | ||
|
|
@@ -71,12 +94,29 @@ def __init__( | |
| Such sources include sources referenced from shell expansions | ||
| in tag values and sources included using the _%include_ directive. | ||
| """ | ||
| # count mutually exclusive arguments | ||
| if sum([file is not None, path is not None, content is not None]) > 1: | ||
| raise ValueError( | ||
| "Only one of `file`, `path` or `content` should be provided" | ||
| ) | ||
| if file is not None: | ||
| self._file = file | ||
| elif path is not None: | ||
| self._file = Path(path).open("r+", **self.ENCODING_ARGS) | ||
| elif content is not None: | ||
| self._file = StringIO(content) | ||
| else: | ||
| raise ValueError("Either `file`, `path` or `content` must be provided") | ||
| if sourcedir is None: | ||
| try: | ||
| sourcedir = Path(self._file.name).parent | ||
nforro marked this conversation as resolved.
Show resolved
Hide resolved
nforro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| except AttributeError: | ||
| raise ValueError( | ||
| "`sourcedir` is required when providing `content` or file object without a name" | ||
| ) | ||
| self.autosave = autosave | ||
| self._path = Path(path) | ||
| self._lines, self._trailing_newline = self._read_lines(self._path) | ||
| self._parser = SpecParser( | ||
| Path(sourcedir or self.path.parent), macros, force_parse | ||
| ) | ||
| self._lines, self._trailing_newline = self._read_lines(self._file) | ||
| self._parser = SpecParser(Path(sourcedir), macros, force_parse) | ||
| self._parser.parse(str(self)) | ||
| self._dump_debug_info("After initial parsing") | ||
|
|
||
|
|
@@ -85,7 +125,7 @@ def __eq__(self, other: object) -> bool: | |
| return NotImplemented | ||
| return ( | ||
| self.autosave == other.autosave | ||
| and self._path == other._path | ||
| and self.path == other.path | ||
| and self._lines == other._lines | ||
| and self._parser == other._parser | ||
| ) | ||
|
|
@@ -111,6 +151,39 @@ def __exit__( | |
| ) -> None: | ||
| self.save() | ||
|
|
||
| def __deepcopy__(self, memodict: Dict[int, Any]): | ||
| """ | ||
| Deepcopies the object, handling file-like attributes. | ||
| """ | ||
| specfile = self.__class__.__new__(self.__class__) | ||
| memodict[id(self)] = specfile | ||
|
|
||
| for k, v in self.__dict__.items(): | ||
| if k == "_file": | ||
| continue | ||
| setattr(specfile, k, copy.deepcopy(v, memodict)) | ||
|
|
||
| try: | ||
| path = Path(cast(FileIO, self._file).name) | ||
| except AttributeError: | ||
| # IO doesn't implement getvalue() so tell mypy this is StringIO | ||
| # (could also be BytesIO) | ||
| sio = cast(StringIO, self._file) | ||
| # not a named file, try `getvalue()` | ||
| specfile._file = type(sio)(sio.getvalue()) | ||
|
Comment on lines
+169
to
+173
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a fan of this, but I don't know how to do this better without actually checking for type.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could also a type ignore, but I think that's worse.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't think of any alternative method so far. |
||
| else: | ||
| try: | ||
| # encoding and errors are only available on TextIO objects | ||
| file = cast(TextIO, self._file) | ||
| specfile._file = path.open( | ||
| mode=file.mode, encoding=file.encoding, errors=file.errors | ||
| ) | ||
| except AttributeError: | ||
| # files open in binary mode have no `encoding`/`errors` | ||
| specfile._file = path.open(self._file.mode) | ||
|
|
||
| return specfile | ||
|
|
||
| def _dump_debug_info(self, message) -> None: | ||
| logger.debug( | ||
| f"DBG: {message}:\n" | ||
|
|
@@ -119,19 +192,30 @@ def _dump_debug_info(self, message) -> None: | |
| f" {self._parser.spec!r} @ 0x{id(self._parser.spec):012x}" | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _read_lines(path: Path) -> Tuple[List[str], bool]: | ||
| content = path.read_text(encoding="utf8", errors="surrogateescape") | ||
| return content.splitlines(), content[-1] == "\n" | ||
| @classmethod | ||
| def _read_lines(cls, file: IO) -> Tuple[List[str], bool]: | ||
| file.seek(0) | ||
| raw_content = file.read() | ||
| if isinstance(raw_content, str): | ||
| content = raw_content | ||
| else: | ||
| content = raw_content.decode(**cls.ENCODING_ARGS) | ||
| return content.splitlines(), content.endswith("\n") | ||
|
|
||
| @property | ||
| def path(self) -> Path: | ||
| def path(self) -> Optional[Path]: | ||
| """Path to the spec file.""" | ||
| return self._path | ||
| try: | ||
| return Path(cast(FileIO, self._file).name) | ||
| except AttributeError: | ||
| return None | ||
|
|
||
| @path.setter | ||
| def path(self, value: Union[Path, str]) -> None: | ||
| self._path = Path(value) | ||
| path = Path(value) | ||
| if path == self.path: | ||
| return | ||
| self._file = path.open("r+", **self.ENCODING_ARGS) | ||
|
|
||
| @property | ||
| def sourcedir(self) -> Path: | ||
|
|
@@ -179,11 +263,26 @@ def rpm_spec(self) -> rpm.spec: | |
|
|
||
| def reload(self) -> None: | ||
| """Reloads the spec file content.""" | ||
| self._lines, self._trailing_newline = self._read_lines(self.path) | ||
| try: | ||
| path = Path(cast(FileIO, self._file).name) | ||
| except AttributeError: | ||
| pass | ||
| else: | ||
mynk8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # reopen the path in case the original file has been deleted/replaced | ||
| self._file.close() | ||
| self._file = path.open("r+", **self.ENCODING_ARGS) | ||
| self._lines, self._trailing_newline = self._read_lines(self._file) | ||
|
|
||
| def save(self) -> None: | ||
| """Saves the spec file content.""" | ||
| self.path.write_text(str(self), encoding="utf8", errors="surrogateescape") | ||
| self._file.seek(0) | ||
| self._file.truncate(0) | ||
| content = str(self) | ||
| try: | ||
| self._file.write(content) | ||
| except TypeError: | ||
| self._file.write(content.encode(**self.ENCODING_ARGS)) | ||
| self._file.flush() | ||
|
|
||
| def expand( | ||
| self, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.