Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/instrumentman/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from . import setmeasurement
from . import protocoltest
from . import inclination
from . import filetransfer


@extra_group("iman", params=None) # type: ignore[misc]
Expand Down Expand Up @@ -51,6 +52,16 @@ def cli_test() -> None:
"""Test protocol responsiveness."""


@cli.group("list") # type: ignore[misc]
def cli_list() -> None:
"""List various data stored on the instrument."""


@cli.group("download") # type: ignore[misc]
def cli_download() -> None:
"""Download data from the instrument."""


cli.add_command(morse.cli)
cli.add_command(terminal.cli)
cli_measure.add_command(setmeasurement.cli_measure)
Expand All @@ -64,3 +75,5 @@ def cli_test() -> None:
cli_merge.add_command(inclination.cli_merge)
cli_validate.add_command(setmeasurement.cli_validate)
cli_import.add_command(setup.cli_import)
cli_list.add_command(filetransfer.cli_list)
cli_download.add_command(filetransfer.cli_download)
146 changes: 146 additions & 0 deletions src/instrumentman/filetransfer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from typing import Any

from click_extra import (
extra_command,
option,
argument,
IntRange,
Choice,
File
)

from ..utils import (
com_option_group,
com_port_argument
)


@extra_command(
"files",
params=None,
context_settings={"auto_envvar_prefix": None}
) # type: ignore[misc]
@com_port_argument()
@argument(
"directory",
help="directory to list files in (path should end with '/')",
type=str,
default="/"
)
@com_option_group()
@option(
"-d",
"--device",
help="memory device",
type=Choice(
(
"internal",
"cf",
"sd",
"usb",
"ram"
),
case_sensitive=False
),
default="internal"
)
@option(
"-f",
"--filetype",
help="file type",
type=Choice(
(
"image",
"database",
"overview-jpg",
"overview-bmp",
"telescope-jpg",
"telescope-bmp",
"scan",
"unknown",
"last"
),
case_sensitive=False
),
default="unknown"
)
def cli_list(**kwargs: Any) -> None:
"""List files on an instrument."""
from .app import main_list

main_list(**kwargs)


@extra_command(
"file",
params=None,
context_settings={"auto_envvar_prefix": None}
) # type: ignore[misc]
@com_port_argument()
@argument(
"filename",
help=(
"file to download (including path with '/' separators if filetype "
"option is not specified)"
),
type=str
)
@argument(
"output",
help="file to save downloaded data to",
type=File("wb", lazy=False)
)
@com_option_group()
@option(
"-d",
"--device",
help="memory device",
type=Choice(
(
"internal",
"cf",
"sd",
"usb",
"ram"
),
case_sensitive=False
),
default="internal"
)
@option(
"-f",
"--filetype",
help="file type",
type=Choice(
(
"image",
"database",
"overview-jpg",
"overview-bmp",
"telescope-jpg",
"telescope-bmp",
"scan",
"unknown",
"last"
),
case_sensitive=False
),
default="unknown"
)
@option(
"-c",
"--chunk",
help="chunk size (max 450 for normal and 1800 for VivaTPS large download)",
type=IntRange(1, 1800),
default=450
)
@option(
"--large",
help="use large download commands (only available from VivaTPS)",
is_flag=True
)
def cli_download(**kwargs: Any) -> None:
"""Download a file from the instrument."""
from .app import main_download

main_download(**kwargs)
196 changes: 196 additions & 0 deletions src/instrumentman/filetransfer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from io import BufferedWriter

from click_extra import echo, progressbar
from geocompy.communication import open_serial
from geocompy.geo import GeoCom
from geocompy.geo.gctypes import GeoComCode
from geocompy.geo.gcdata import File, Device

from ..utils import echo_red, echo_green, echo_yellow


_FILE = {
"image": File.IMAGE,
"database": File.DATABASE,
"overview-jpg": File.IMAGES_OVERVIEW_JPG,
"overview-bmp": File.IMAGES_OVERVIEW_BMP,
"telescope-jpg": File.IMAGES_TELESCOPIC_JPG,
"telescope-bmp": File.IMAGES_TELESCOPIC_BMP,
"scan": File.SCANS,
"unknown": File.UNKNOWN,
"last": File.LAST
}


_DEVICE = {
"internal": Device.INTERNAL,
"cf": Device.CFCARD,
"sd": Device.SDCARD,
"usb": Device.USB,
"ram": Device.RAM
}


def run_listing(
tps: GeoCom,
dev: str,
directory: str,
filetype: str
) -> None:
resp_setup = tps.ftr.setup_listing(
_DEVICE[dev],
_FILE[filetype],
directory
)
if resp_setup.error != GeoComCode.OK:
echo_red(f"Could not set up file listing ({resp_setup.error.name})")
return

resp_list = tps.ftr.list()
if resp_list.error != GeoComCode.OK or resp_list.params is None:
echo_red(f"Could not start listing ({resp_list.error.name})")
return

last, name, size, lastmodified = resp_list.params
if name == "":
echo_yellow("Directory is empty or path does not exist")
return

count = 1
echo(f"{'file name':<55.55s}{'bytes':>10.10s}{'last modified':>25.25s}")
echo(f"{'---------':<55.55s}{'-----':>10.10s}{'-------------':>25.25s}")
fmt = "{name:<55.55s}{size:>10s}{date:>25.25s}"
echo(
fmt.format_map(
{
"name": name,
"size": str(size),
"date": (
lastmodified.isoformat(sep=" ")
if lastmodified is not None
else ""
)
}
)
)
while not last:
resp_list = tps.ftr.list(True)
if resp_list.error != GeoComCode.OK or resp_list.params is None:
echo_red(
f"An error occured during listing ({resp_list.error.name})"
)
return

last, name, size, lastmodified = resp_list.params
echo(
fmt.format_map(
{
"name": name,
"size": str(size),
"date": (
lastmodified.isoformat(sep=" ")
if lastmodified is not None
else ""
)
}
)
)
count += 1

echo("-" * 90)
echo(f"total: {count} files")


def run_download(
tps: GeoCom,
filename: str,
file: BufferedWriter,
device: str = "internal",
filetype: str = "unknown",
chunk: int = 450,
large: bool = False
) -> None:
setup = tps.ftr.setup_download
download = tps.ftr.download
if large:
setup = tps.ftr.setup_large_download
download = tps.ftr.download_large

resp_setup = setup(
filename,
chunk,
_DEVICE[device],
_FILE[filetype]
)
if resp_setup.error != GeoComCode.OK or resp_setup.params is None:
echo_red(f"Could not set up file download ({resp_setup.error.name})")
return

block_count = resp_setup.params
with progressbar(
range(block_count),
label="Downloading"
) as bar:
for i in bar:
resp_pull = download(i + 1)
if resp_pull.error != GeoComCode.OK or resp_pull.params is None:
echo_red(
"An error occured during download "
f"({resp_setup.error.name})"
)
return

echo(bytes.fromhex(resp_pull.params), file, False)

echo_green("Download complete")


def main_download(
port: str,
filename: str,
output: BufferedWriter,
baud: int = 9600,
timeout: int = 15,
retry: int = 1,
sync_after_timeout: bool = False,
device: str = "internal",
filetype: str = "unknown",
chunk: int = 450,
large: bool = False
) -> None:
with open_serial(
port=port,
speed=baud,
timeout=timeout,
retry=retry,
sync_after_timeout=sync_after_timeout
) as com:
tps = GeoCom(com)
try:
run_download(tps, filename, output, device, filetype, chunk, large)
finally:
tps.ftr.abort_download()


def main_list(
port: str,
directory: str = "/",
baud: int = 9600,
timeout: int = 15,
retry: int = 1,
sync_after_timeout: bool = False,
device: str = "internal",
filetype: str = "unknown"
) -> None:
with open_serial(
port=port,
speed=baud,
timeout=timeout,
retry=retry,
sync_after_timeout=sync_after_timeout
) as com:
tps = GeoCom(com)
try:
run_listing(tps, device, directory, filetype)
finally:
tps.ftr.abort_list()
Loading