Skip to content

Commit 4fecd83

Browse files
committed
[settings] first draft of validation and loading
1 parent 50377e2 commit 4fecd83

File tree

9 files changed

+449
-2
lines changed

9 files changed

+449
-2
lines changed

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ dependencies = [
1717
"rapidfuzz",
1818
"jsonschema",
1919
"jmespath",
20+
"toml",
21+
"PyYAML",
2022
"click-extra"
2123
]
2224
classifiers = [
@@ -62,6 +64,8 @@ linting = [
6264
"types-pyserial",
6365
"types-jsonschema",
6466
"types-jmespath",
67+
"types-toml",
68+
"types-PyYAML",
6569
"mypy",
6670
"flake8"
6771
]

src/instrumentman/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from . import inclination
1414
from . import filetransfer
1515
from . import jobs
16+
from . import settings
1617

1718

1819
@extra_group("iman", params=None) # type: ignore[misc]
@@ -65,6 +66,8 @@ def cli_download() -> None:
6566

6667
cli.add_command(morse.cli)
6768
cli.add_command(terminal.cli)
69+
cli.add_command(settings.cli_load)
70+
cli.add_command(settings.cli_save)
6871
cli_measure.add_command(setmeasurement.cli_measure)
6972
cli_measure.add_command(setup.cli_measure)
7073
cli_measure.add_command(inclination.cli_measure)
@@ -75,6 +78,7 @@ def cli_download() -> None:
7578
cli_merge.add_command(setmeasurement.cli_merge)
7679
cli_merge.add_command(inclination.cli_merge)
7780
cli_validate.add_command(setmeasurement.cli_validate)
81+
cli_validate.add_command(settings.cli_validate)
7882
cli_import.add_command(setup.cli_import)
7983
cli_list.add_command(filetransfer.cli_list)
8084
cli_list.add_command(jobs.cli_list)

src/instrumentman/schema_targets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"title": "GeoComPy set measurement targets schema",
2+
"title": "Instrumentman set measurement targets schema",
33
"description": "Target point records for set measurements",
44
"tpye": "object",
55
"additionalProperties": false,

src/instrumentman/setmeasurement/schema_session.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"title": "GeoComPy set measurement session schema",
2+
"title": "Instrumentman set measurement session schema",
33
"description": "Data recorded during a set measurement session",
44
"type": "object",
55
"additionalProperties": false,
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from typing import Any
2+
3+
from click_extra import (
4+
extra_command,
5+
argument,
6+
option,
7+
file_path,
8+
Choice
9+
)
10+
from cloup import constraint
11+
from cloup.constraints import If, IsSet, require_all
12+
13+
from ..utils import (
14+
com_port_argument,
15+
com_option_group
16+
)
17+
18+
19+
@extra_command(
20+
"save",
21+
params=None,
22+
context_settings={"auto_envvar_prefix": None}
23+
) # type: ignore[misc]
24+
@com_port_argument()
25+
@argument(
26+
"file",
27+
help="file to save settings to",
28+
type=file_path(readable=False)
29+
)
30+
@com_option_group()
31+
@option(
32+
"-f",
33+
"--format",
34+
help="settings file format",
35+
type=Choice(["auto", "json", "yaml", "toml"], case_sensitive=False),
36+
default="auto"
37+
)
38+
@option(
39+
"--save-all",
40+
help="save every setting, even if not applicable to the instrument",
41+
is_flag=True
42+
)
43+
@option(
44+
"--add-defaults",
45+
help="add defaults for settings that cannot be saved",
46+
is_flag=True
47+
)
48+
@constraint(
49+
If(IsSet("add_defaults"), require_all),
50+
["add_defaults", "save_all"]
51+
)
52+
def cli_save(**kwargs: Any) -> None:
53+
"""Save instrument settings to file."""
54+
...
55+
56+
57+
@extra_command(
58+
"load",
59+
params=None,
60+
context_settings={"auto_envvar_prefix": None}
61+
) # type: ignore[misc]
62+
@com_port_argument()
63+
@argument(
64+
"settings",
65+
help="file containing instrument settings",
66+
type=file_path(exists=True, readable=True)
67+
)
68+
@com_option_group()
69+
@option(
70+
"-f",
71+
"--format",
72+
help="settings file format",
73+
type=Choice(["auto", "json", "yaml", "toml"], case_sensitive=False),
74+
default="auto"
75+
)
76+
def cli_load(**kwargs: Any) -> None:
77+
"""Load instrument settings from file."""
78+
from .load import main
79+
80+
main(**kwargs)
81+
82+
83+
@extra_command(
84+
"settings",
85+
params=None,
86+
context_settings={"auto_envvar_prefix": None}
87+
) # type: ignore[misc]
88+
@argument(
89+
"file",
90+
help="settings file to validate",
91+
type=file_path(exists=True, readable=True)
92+
)
93+
@option(
94+
"-f",
95+
"--format",
96+
help="settings file format",
97+
type=Choice(["auto", "json", "yaml", "toml"], case_sensitive=False),
98+
default="auto"
99+
)
100+
def cli_validate(**kwargs: Any) -> None:
101+
"""Validate instrument settings config."""
102+
from .validate import main
103+
104+
main(**kwargs)

src/instrumentman/settings/io.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from typing import Any, cast
2+
from pathlib import Path
3+
4+
import json
5+
import yaml
6+
import toml
7+
8+
from ..utils import echo_red
9+
10+
11+
_SettingsDict = dict[str, Any]
12+
13+
14+
def read_settings(
15+
file: Path,
16+
format: str = "auto"
17+
) -> _SettingsDict:
18+
if format == "auto":
19+
format = file.suffix[1:].lower() if file.suffix else ""
20+
21+
match format:
22+
case "json":
23+
with file.open("rt", encoding="utf8") as settings:
24+
data = cast(_SettingsDict, json.load(settings))
25+
case "yaml" | "yml":
26+
with file.open("rt", encoding="utf8") as settings:
27+
data = cast(_SettingsDict, yaml.load(settings, yaml.Loader))
28+
case "toml":
29+
# The TOML package doesn't support heterogenous arrays, even tho it
30+
# was added to the language spec in 2019. Therefore the standard
31+
# lib tomllib/tomli has to be used for parsing.
32+
import tomllib as toml
33+
with file.open("rb") as settings:
34+
data = cast(_SettingsDict, toml.load(settings))
35+
case _:
36+
echo_red(f"Unknown file format: {format}")
37+
exit(1)
38+
39+
return data
40+
41+
42+
def write_settings(
43+
data: _SettingsDict,
44+
file: Path,
45+
format: str = "auto"
46+
) -> None:
47+
if format == "auto":
48+
format = file.suffix[1:].lower() if file.suffix else ""
49+
50+
with file.open("wt", encoding="utf8") as settings:
51+
match format:
52+
case "json":
53+
json.dump(data, settings)
54+
case "yaml" | "yml":
55+
yaml.dump(data, settings, yaml.Dumper)
56+
case "toml":
57+
toml.dump(data, settings)
58+
case _:
59+
echo_red(f"Unknown file format: {format}")
60+
exit(1)

src/instrumentman/settings/load.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
from typing import Any, Callable, TypedDict
5+
6+
from geocompy.communication import open_serial
7+
from geocompy.geo import GeoCom
8+
from geocompy.geo.gctypes import GeoComSubsystem, GeoComResponse, GeoComCode
9+
from geocompy.gsi.dna import GsiOnlineDNA
10+
from geocompy.gsi.gsitypes import GsiOnlineResponse
11+
from geocompy.gsi.dna.settings import GsiOnlineDNASettings
12+
13+
from ..utils import echo_red, echo_yellow
14+
from .io import read_settings
15+
from .validate import validate_settings
16+
17+
18+
class _SubsystemSettingsDict(TypedDict):
19+
subsystem: str
20+
options: dict[str, Any]
21+
22+
23+
class _SettingsDict(TypedDict):
24+
protocol: str
25+
settings: list[_SubsystemSettingsDict]
26+
27+
28+
def set_setting_geocom(
29+
system: GeoComSubsystem,
30+
setting: str,
31+
value: int | float | bool | str | list[int | float | bool | str]
32+
) -> None:
33+
if isinstance(value, bool):
34+
name = f"switch_{setting}"
35+
else:
36+
name = f"set_{setting}"
37+
38+
method: Callable[
39+
...,
40+
GeoComResponse[Any]
41+
] | None = getattr(system, name)
42+
if method is None:
43+
echo_yellow(f"Could not find '{name}' to set '{setting}'")
44+
return
45+
46+
if isinstance(value, list):
47+
response = method(*value)
48+
else:
49+
response = method(value)
50+
51+
if response.error != GeoComCode.OK:
52+
echo_yellow(f"Could not set '{setting}' ({response.error.name})")
53+
return
54+
55+
56+
def set_setting_gsidna(
57+
system: GsiOnlineDNASettings,
58+
setting: str,
59+
value: int | float | bool | str
60+
) -> None:
61+
name = f"set_{setting}"
62+
method: Callable[
63+
...,
64+
GsiOnlineResponse[bool]
65+
] | None = getattr(system, name)
66+
if method is None:
67+
echo_yellow(f"Could not find '{name}' to set '{setting}'")
68+
return
69+
70+
response = method(value)
71+
72+
if response.value is None or not response.value:
73+
echo_yellow(f"Could not set '{setting}' ({response.response})")
74+
return
75+
76+
77+
def upload_settings_geocom(
78+
protocol: GeoCom,
79+
settings: _SettingsDict
80+
) -> None:
81+
for item in settings["settings"]:
82+
sysname = item["subsystem"]
83+
subsystem: Any = getattr(protocol, sysname)
84+
if subsystem is None:
85+
echo_red(f"Could not find '{sysname}' subsystem")
86+
exit(1)
87+
88+
for option, value in item["options"].items():
89+
if value is None:
90+
continue
91+
92+
set_setting_geocom(subsystem, option, value)
93+
94+
95+
def upload_settings_gsidna(
96+
protocol: GsiOnlineDNA,
97+
settings: _SettingsDict
98+
) -> None:
99+
for item in settings["settings"]:
100+
sysname = item["subsystem"]
101+
subsystem: Any = getattr(protocol, sysname)
102+
if subsystem is None:
103+
echo_red(f"Could not find '{sysname}' subsystem")
104+
exit(1)
105+
106+
for option, value in item["options"].items():
107+
if value is None:
108+
continue
109+
110+
set_setting_gsidna(subsystem, option, value)
111+
112+
113+
def main(
114+
port: str,
115+
settings: Path,
116+
baud: int = 9600,
117+
timeout: int = 15,
118+
retry: int = 1,
119+
sync_after_timeout: bool = False,
120+
format: str = "auto"
121+
) -> None:
122+
data = read_settings(settings, format)
123+
if not validate_settings(data):
124+
echo_red("Settings file does not follow schema")
125+
exit(1)
126+
127+
with open_serial(
128+
port,
129+
retry=retry,
130+
sync_after_timeout=sync_after_timeout,
131+
speed=baud,
132+
timeout=timeout
133+
) as com:
134+
if "geocom" in data:
135+
tps = GeoCom(com)
136+
upload_settings_geocom(tps, data["geocom"])
137+
elif "gsidna" in data:
138+
dna = GsiOnlineDNA(com)
139+
upload_settings_gsidna(dna, data["gsidna"])

0 commit comments

Comments
 (0)