From 393b940218a0b79214f54b0913ab62220a7c6394 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 09:22:17 +0000 Subject: [PATCH 01/22] :alien: Remove workaround now that upstream change has been merged --- icenet_mp/data_processors/data_downloader.py | 2 +- pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index dc35507e4..16c6a8f53 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -96,7 +96,7 @@ def inspect(self) -> None: AnemoiInspectArgs( path=str(self.path_dataset), detailed=True, - progress=False, # must be disabled until https://github.com/ecmwf/anemoi-datasets/pull/372 is merged + progress=True, statistics=False, size=True, ) diff --git a/pyproject.toml b/pyproject.toml index 7017de9ea..f80d00171 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ authors = [ ] requires-python = ">=3.11,<3.13" # Note "3.13" is currently incompatible with the tensorflow requirement from icenet dependencies = [ - "anemoi-datasets>=0.5.25", + "anemoi-datasets>=0.5.27", # to get the fix from https://github.com/ecmwf/anemoi-datasets/pull/372 "anemoi-transform>=0.1.16", "anemoi-utils>=0.4.42", "cachetools>=6.1.0", diff --git a/uv.lock b/uv.lock index edc3972f5..a9ae9c3f7 100644 --- a/uv.lock +++ b/uv.lock @@ -1717,7 +1717,7 @@ notebooks = [ [package.metadata] requires-dist = [ - { name = "anemoi-datasets", specifier = ">=0.5.25" }, + { name = "anemoi-datasets", specifier = ">=0.5.27" }, { name = "anemoi-transform", specifier = ">=0.1.16" }, { name = "anemoi-utils", specifier = ">=0.4.42" }, { name = "cachetools", specifier = ">=6.1.0" }, From a7529f102959d9ae1c74ddf697a9b4c1d4d94889 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 09:28:00 +0000 Subject: [PATCH 02/22] :sparkles: Check dataset status before deciding what to do with it --- icenet_mp/data_processors/data_downloader.py | 46 +++++++++++++++----- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 16c6a8f53..e78a21a16 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -59,20 +59,37 @@ def create(self, *, overwrite: bool) -> None: # Otherwise we check whether a valid dataset exists elif self.path_dataset.exists(): - try: - self.inspect() - logger.info( - "Dataset %s already exists at %s, no need to download.", + download_in_progress, download_complete = self.status() + # This dataset is being downloaded + if download_in_progress: + logger.warning( + "Dataset %s at %s is currently being downloaded by another process. Please wait until it is complete.", self.name, self.path_dataset, ) - except (AttributeError, FileNotFoundError, PathNotFoundError): - # If the dataset is invalid we delete it - logger.info("Dataset %s not found at %s.", self.name, self.path_dataset) - shutil.rmtree(self.path_dataset, ignore_errors=True) - else: - # If the dataset is valid we return here return + # The download is complete, but we check the dataset is valid before returning + elif download_complete: + try: + self.inspect() + logger.info( + "Dataset %s already exists at %s, no need to download.", + self.name, + self.path_dataset, + ) + except (AttributeError, FileNotFoundError, PathNotFoundError): + # If the dataset is invalid we delete it + logger.info("Dataset %s not found at %s.", self.name, self.path_dataset) + shutil.rmtree(self.path_dataset, ignore_errors=True) + else: + # If the dataset is valid we return here + return + else: + logger.warning( + "Dataset %s at %s is incomplete.", + self.name, + self.path_dataset, + ) # Download the dataset self.download() @@ -143,6 +160,15 @@ def init(self, *, overwrite: bool) -> None: ) ) + def status(self) -> tuple[bool, bool]: + """Return a tuple indicating whether the dataset exists and whether it is complete.""" + inspector = InspectZarr() + version = inspector._info(str(self.path_dataset)) + download_in_progress = version.copy_in_progress + download_complete = all(version.build_flags or []) + return (download_in_progress, download_complete) + + def _part_tracker_path(self) -> Path: """Path for part_trackerdata file that tracks completed parts.""" return ( From fb6a7cd9de627906d60a0d03236a1014fcbcac33 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:22:24 +0000 Subject: [PATCH 03/22] :sparkles: Add simple function to load in chunks using native Anemoi functionality --- icenet_mp/data_processors/data_downloader.py | 19 +++++++++++++++---- icenet_mp/types/__init__.py | 2 ++ icenet_mp/types/simple_datatypes.py | 11 ++++++++++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index e78a21a16..225a0c3d1 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -23,6 +23,7 @@ AnemoiInitArgs, AnemoiInspectArgs, AnemoiLoadArgs, + AnemoiLoadArgsOld, ) from .preprocessors import IPreprocessor @@ -69,7 +70,7 @@ def create(self, *, overwrite: bool) -> None: ) return # The download is complete, but we check the dataset is valid before returning - elif download_complete: + if download_complete: try: self.inspect() logger.info( @@ -79,7 +80,9 @@ def create(self, *, overwrite: bool) -> None: ) except (AttributeError, FileNotFoundError, PathNotFoundError): # If the dataset is invalid we delete it - logger.info("Dataset %s not found at %s.", self.name, self.path_dataset) + logger.info( + "Dataset %s not found at %s.", self.name, self.path_dataset + ) shutil.rmtree(self.path_dataset, ignore_errors=True) else: # If the dataset is valid we return here @@ -160,6 +163,15 @@ def init(self, *, overwrite: bool) -> None: ) ) + def load_in_chunks(self) -> None: + """Download a single Anemoi dataset in chunks, skipping those already present.""" + Load().run( + AnemoiLoadArgs( + path=str(self.path_dataset), + config=self.config, + ) + ) + def status(self) -> tuple[bool, bool]: """Return a tuple indicating whether the dataset exists and whether it is complete.""" inspector = InspectZarr() @@ -168,7 +180,6 @@ def status(self) -> tuple[bool, bool]: download_complete = all(version.build_flags or []) return (download_in_progress, download_complete) - def _part_tracker_path(self) -> Path: """Path for part_trackerdata file that tracks completed parts.""" return ( @@ -239,7 +250,7 @@ def load(self, parts: str) -> None: self.path_dataset, ) Load().run( - AnemoiLoadArgs( + AnemoiLoadArgsOld( path=str(self.path_dataset), config=self.config, parts=parts, diff --git a/icenet_mp/types/__init__.py b/icenet_mp/types/__init__.py index 1c33b3056..86e28b964 100644 --- a/icenet_mp/types/__init__.py +++ b/icenet_mp/types/__init__.py @@ -7,6 +7,7 @@ AnemoiInitArgs, AnemoiInspectArgs, AnemoiLoadArgs, + AnemoiLoadArgsOld, DataloaderArgs, DiffColourmapSpec, Metadata, @@ -29,6 +30,7 @@ "AnemoiInitArgs", "AnemoiInspectArgs", "AnemoiLoadArgs", + "AnemoiLoadArgsOld", "ArrayCHW", "ArrayHW", "ArrayTCHW", diff --git a/icenet_mp/types/simple_datatypes.py b/icenet_mp/types/simple_datatypes.py index 22755fdd5..3d250275c 100644 --- a/icenet_mp/types/simple_datatypes.py +++ b/icenet_mp/types/simple_datatypes.py @@ -41,7 +41,7 @@ class AnemoiInitArgs: @dataclass -class AnemoiLoadArgs: +class AnemoiLoadArgsOld: """Arguments for anemoi load.""" config: DictConfig @@ -59,6 +59,15 @@ class AnemoiFinaliseArgs: command: str = "unused" +@dataclass +class AnemoiLoadArgs: + """Arguments for anemoi load.""" + + config: DictConfig + path: str + command: str = "unused" + + class DataloaderArgs(TypedDict): """Arguments for the data loader.""" From f4543f297ebd274700fd555e97c6e2de73b2f73f Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:26:06 +0000 Subject: [PATCH 04/22] :sparkles: Add an initialise function to wrap Anemoi Init --- icenet_mp/data_processors/data_downloader.py | 23 ++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 225a0c3d1..3d65f5ee0 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -163,6 +163,29 @@ def init(self, *, overwrite: bool) -> None: ) ) + def initialise(self) -> None: + """Initialise an Anemoi dataset.""" + if self.path_dataset.exists(): + logger.info( + "Dataset %s already initialised at %s.", self.name, self.path_dataset + ) + return + try: + Init().run( + AnemoiInitArgs( + path=str(self.path_dataset), + config=self.config, + ) + ) + logger.info("Initialised dataset %s at %s.", self.name, self.path_dataset) + except (AttributeError, FileNotFoundError, PathNotFoundError): + logger.exception( + "Failed to initialise dataset %s at %s.", + self.name, + self.path_dataset, + ) + raise + def load_in_chunks(self) -> None: """Download a single Anemoi dataset in chunks, skipping those already present.""" Load().run( From 71f060e957f1c2f451a8b2058b7ea7108da87e7a Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:28:25 +0000 Subject: [PATCH 05/22] :recycle: Replace download Create with download in parts --- icenet_mp/data_processors/data_downloader.py | 24 ++++++++++++-------- icenet_mp/types/__init__.py | 2 -- icenet_mp/types/simple_datatypes.py | 12 ---------- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 3d65f5ee0..581bf97c9 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -8,7 +8,6 @@ from tempfile import NamedTemporaryFile from time import sleep -from anemoi.datasets.commands.create import Create from anemoi.datasets.commands.finalise import Finalise from anemoi.datasets.commands.init import Init from anemoi.datasets.commands.inspect import InspectZarr @@ -18,7 +17,6 @@ from zarr.errors import PathNotFoundError from icenet_mp.types import ( - AnemoiCreateArgs, AnemoiFinaliseArgs, AnemoiInitArgs, AnemoiInspectArgs, @@ -98,18 +96,26 @@ def create(self, *, overwrite: bool) -> None: self.download() def download(self) -> None: - """Download a single Anemoi dataset.""" + """Download an Anemoi dataset in parts.""" self.preprocessor.download(self.path_preprocessor) logger.info("Creating dataset %s at %s.", self.name, self.path_dataset) - Create().run( - AnemoiCreateArgs( - path=str(self.path_dataset), - config=self.config, + # Initialise + self.initialise() + # Load in parts + self.load_in_chunks() + # Finalise if the status indicates the dataset is complete + download_in_progress, download_complete = self.status() + if download_complete and not download_in_progress: + self.finalise() + else: + logger.warning( + "Dataset %s at %s is incomplete after loading, skipping finalise.", + self.name, + self.path_dataset, ) - ) def inspect(self) -> None: - """Inspect a single Anemoi dataset.""" + """Inspect an Anemoi dataset.""" logger.info("Inspecting dataset %s at %s.", self.name, self.path_dataset) if self.path_dataset.exists(): InspectZarr().run( diff --git a/icenet_mp/types/__init__.py b/icenet_mp/types/__init__.py index 86e28b964..c2a701288 100644 --- a/icenet_mp/types/__init__.py +++ b/icenet_mp/types/__init__.py @@ -2,7 +2,6 @@ from .enums import BetaSchedule, RangeRestriction, TensorDimensions from .protocols import SupportsMetadata from .simple_datatypes import ( - AnemoiCreateArgs, AnemoiFinaliseArgs, AnemoiInitArgs, AnemoiInspectArgs, @@ -25,7 +24,6 @@ ) __all__ = [ - "AnemoiCreateArgs", "AnemoiFinaliseArgs", "AnemoiInitArgs", "AnemoiInspectArgs", diff --git a/icenet_mp/types/simple_datatypes.py b/icenet_mp/types/simple_datatypes.py index 3d250275c..29fbf4a66 100644 --- a/icenet_mp/types/simple_datatypes.py +++ b/icenet_mp/types/simple_datatypes.py @@ -7,18 +7,6 @@ from .typedefs import DiffMode, DiffStrategy -@dataclass -class AnemoiCreateArgs: - """Arguments for anemoi create.""" - - config: DictConfig - path: str - command: str = "unused" - overwrite: bool = False - processes: int = 0 - threads: int = 0 - - @dataclass class AnemoiInspectArgs: """Arguments for anemoi inspect.""" From 7ca0ea4afca8d1c035b5682ee7c2afdf221cdb81 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:31:08 +0000 Subject: [PATCH 06/22] :coffin: Remove load_in_parts and associated tracking functionality as this is handled directly by Anemoi --- icenet_mp/cli/datasets.py | 40 --- icenet_mp/data_processors/data_downloader.py | 286 ------------------- 2 files changed, 326 deletions(-) diff --git a/icenet_mp/cli/datasets.py b/icenet_mp/cli/datasets.py index 086196d86..8ea323cdb 100644 --- a/icenet_mp/cli/datasets.py +++ b/icenet_mp/cli/datasets.py @@ -69,46 +69,6 @@ def load( downloader.load(parts=parts) -@datasets_cli.command("load_in_parts") -@hydra_adaptor -def load_in_parts( - config: DictConfig, - *, - continue_on_error: Annotated[ - bool, typer.Option(help="Continue to next part on error") - ] = True, - force_reset: Annotated[ - bool, - typer.Option( - help="Clear existing progress part_tracker file and start from part 1" - ), - ] = False, - dataset: Annotated[ - str | None, typer.Option(help="Run only a single dataset by name") - ] = None, - total_parts: Annotated[ - int, typer.Option(help="Override default total parts (10)") - ] = 10, - overwrite: Annotated[ - bool, - typer.Option(help="Delete the dataset directory before loading"), - ] = False, -) -> None: - """Load all parts for all datasets in parts, recording progress so runs can be resumed.""" - factory = DataDownloaderFactory(config) - for downloader in factory.downloaders: - if dataset and downloader.name != dataset: - logger.info("Not loading %s.", downloader.name) - continue - logger.info("Working on %s.", downloader.name) - downloader.load_in_parts( - continue_on_error=continue_on_error, - force_reset=force_reset, - total_parts=total_parts, - overwrite=overwrite, - ) - - @datasets_cli.command("finalise") @hydra_adaptor def finalise( diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 581bf97c9..70ff34921 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -1,18 +1,11 @@ -import json import logging -import os import shutil -import socket -from datetime import UTC, datetime from pathlib import Path -from tempfile import NamedTemporaryFile -from time import sleep from anemoi.datasets.commands.finalise import Finalise from anemoi.datasets.commands.init import Init from anemoi.datasets.commands.inspect import InspectZarr from anemoi.datasets.commands.load import Load -from filelock import FileLock, Timeout from omegaconf import DictConfig, OmegaConf from zarr.errors import PathNotFoundError @@ -209,67 +202,6 @@ def status(self) -> tuple[bool, bool]: download_complete = all(version.build_flags or []) return (download_in_progress, download_complete) - def _part_tracker_path(self) -> Path: - """Path for part_trackerdata file that tracks completed parts.""" - return ( - Path(self.path_dataset).with_suffix(".load_parts.json") - if not Path(self.path_dataset).is_dir() - else Path(self.path_dataset) / ".load_parts.json" - ) - - def _part_tracker_lock_path(self) -> Path: - """Path for the lockfile used when updating the part tracker.""" - return self._part_tracker_path().with_suffix(".lock") - - def _read_part_tracker(self) -> dict: - """Read part_tracker data JSON (returns {'completed': {part_spec: timestamp}}).""" - part_tracker_file = self._part_tracker_path() - if not part_tracker_file.exists(): - return {"completed": {}} - try: - with part_tracker_file.open("r", encoding="utf-8") as fh: - return json.load(fh) - except (OSError, json.JSONDecodeError): - logger.warning( - "Failed to read load chunks part_tracker data %s; starting fresh", - part_tracker_file, - ) - return {"completed": {}} - - def _write_part_tracker(self, data: dict) -> None: - """Write part_tracker data JSON to disk. - - Ensure parent dir exists. We write to a temporary file in the same directory - then use replace() to get an atomic move. This is a POSIX-only feature. - This means that if the process is killed, the part_tracker data will not be lost. - """ - part_tracker_file = self._part_tracker_path() - # Ensure parent directory exists (NamedTemporaryFile requires it) - try: - part_tracker_file.parent.mkdir(parents=True, exist_ok=True) - except OSError: - logger.debug( - "Could not ensure parent dir for part_tracker data: %s", - part_tracker_file.parent, - ) - - try: - with NamedTemporaryFile( - "w", delete=False, dir=str(part_tracker_file.parent), encoding="utf-8" - ) as fh: - json.dump(data, fh, indent=2, sort_keys=True) - tmp_name = Path(fh.name) - tmp_name.replace(part_tracker_file) # atomic replace if on same filesystem - except OSError: - # fallback to naive write - try: - with part_tracker_file.open("w", encoding="utf-8") as fh: - json.dump(data, fh, indent=2, sort_keys=True) - except OSError: - logger.exception( - "Failed to write part_tracker data file %s", part_tracker_file - ) - def load(self, parts: str) -> None: """Download a segment of an Anemoi dataset.""" logger.info( @@ -286,224 +218,6 @@ def load(self, parts: str) -> None: ) ) - def load_in_parts( # noqa: C901, PLR0915 - self, - *, - continue_on_error: bool = True, - force_reset: bool = False, - use_lock: bool = True, - lock_timeout: int = 30, - total_parts: int = 10, - overwrite: bool = False, - ) -> None: - """Load all parts automatically and record progress so runs can be resumed. - - Args: - continue_on_error: if True, log errors and continue with later parts. - force_reset: if True, clear part_tracker file and re-run all parts from scratch. - use_lock: if True, use file locking to prevent concurrent updates to part_tracker file. - lock_timeout: timeout in seconds for acquiring the lock. - total_parts: number of parts to load, default = 10. - overwrite: if True, delete the dataset directory before loading. - - """ - - # Nested helper functions that access outer scope variables - def check_and_mark_in_progress(part_spec: str) -> bool: - """Check if part should be skipped and mark it as in progress. - - Returns True if part should be skipped (already completed), False otherwise. - """ - lock_path = self._part_tracker_lock_path() - max_retries = 3 - backoff = 1.0 # seconds between lock retries - - def _claim(part_tracker: dict) -> bool: - completed = set(part_tracker.get("completed", {}).keys()) - # Skip if already done - # if part_spec in completed: - if part_spec in completed: - logger.info( - "Skipping already completed part %s for %s", - part_spec, - self.name, - ) - return True - - # mark in-progress with PID/host - part_tracker.setdefault("in_progress", {})[part_spec] = { - "started_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), - "pid": os.getpid(), - "host": socket.gethostname(), - } - self._write_part_tracker(part_tracker) - return False - - if use_lock: - for attempt in range(1, max_retries + 1): - try: - with FileLock(str(lock_path), timeout=lock_timeout): - part_tracker = self._read_part_tracker() - return _claim(part_tracker) - except Timeout: - logger.warning( - "Timeout acquiring part-tracker lock for %s (attempt %d/%d)", - self.name, - attempt, - max_retries, - ) - if attempt < max_retries: - sleep(backoff) - backoff *= 2 - continue - # final attempt failed — skip (will retry next run) - logger.exception( - "Failed to acquire lock after %d attempts, skipping part %s", - max_retries, - part_spec, - ) - return True - else: - part_tracker = self._read_part_tracker() - return _claim(part_tracker) - return False # Explicit return for type checker - - def clear_part_in_progress(part_spec: str) -> None: - """Clear the in_progress status for a part.""" - lock_path = self._part_tracker_lock_path() - if use_lock: - try: - with FileLock(str(lock_path), timeout=lock_timeout): - part_tracker = self._read_part_tracker() - part_tracker.get("in_progress", {}).pop(part_spec, None) - self._write_part_tracker(part_tracker) - except Timeout: - logger.warning( - "Timeout acquiring lock to clear in_progress for %s", part_spec - ) - else: - part_tracker = self._read_part_tracker() - part_tracker.get("in_progress", {}).pop(part_spec, None) - self._write_part_tracker(part_tracker) - - def mark_part_completed(part_spec: str) -> None: - """Mark a part as completed in the tracker.""" - ts = datetime.now(UTC).isoformat().replace("+00:00", "Z") - lock_path = self._part_tracker_lock_path() - if use_lock: - try: - with FileLock(str(lock_path), timeout=lock_timeout): - part_tracker = self._read_part_tracker() - part_tracker.get("in_progress", {}).pop(part_spec, None) - part_tracker.setdefault("completed", {})[part_spec] = { - "completed_at": ts, - "pid": os.getpid(), - "host": socket.gethostname(), - } - self._write_part_tracker(part_tracker) - except Timeout: - logger.warning( - "Timeout acquiring lock to mark part %s completed; data likely written", - part_spec, - ) - else: - part_tracker = self._read_part_tracker() - part_tracker.get("in_progress", {}).pop(part_spec, None) - part_tracker.setdefault("completed", {})[part_spec] = { - "completed_at": ts, - "pid": os.getpid(), - "host": socket.gethostname(), - } - self._write_part_tracker(part_tracker) - - if "sic" in self.name: - logger.warning( - "Loading dataset %s in parts is not supported for non-anemoi datasets, use create.", - self.name, - ) - return - - if overwrite: - logger.info( - "overwrite set to true, deleting dataset %s at %s", - self.name, - self.path_dataset, - ) - shutil.rmtree(self.path_dataset, ignore_errors=True) - # Also delete the tracker file if it exists (may be outside dataset directory) - tracker_path = self._part_tracker_path() - if tracker_path.exists(): - tracker_path.unlink() - logger.info("Deleted part tracker file at %s", tracker_path) - # Initialize the dataset after deletion so we can load parts into it - self.init(overwrite=False) - else: - # Ensure dataset is initialized before loading parts - try: - self.inspect() - - if not self._part_tracker_path().exists(): - logger.info( - "Dataset %s already exists at %s.", self.name, self.path_dataset - ) - return - except (AttributeError, FileNotFoundError, PathNotFoundError): - logger.info( - "Dataset %s not found at %s, initialising before loading parts.", - self.name, - self.path_dataset, - ) - self.init(overwrite=False) - - logger.info( - "Starting chunked load for dataset %s (parts=%d)", self.name, total_parts - ) - - part_tracker = self._read_part_tracker() - # Skip force_reset if overwrite was used (already cleared everything) - if ( - force_reset - and not overwrite - and (part_tracker.get("completed") or part_tracker.get("in_progress")) - ): - logger.info( - "force_reset requested — clearing previous load part_tracker data (completed + in_progress) at %s", - self._part_tracker_path(), - ) - part_tracker = {"completed": {}} - # ensure no in_progress remains - self._write_part_tracker(part_tracker) - - for i in range(1, total_parts + 1): - part_spec = f"{i}/{total_parts}" - # Check if part should be skipped and mark as in progress - if check_and_mark_in_progress(part_spec): - continue - - # Do the heavy work outside the lock - logger.info("Loading part %s for dataset %s", part_spec, self.name) - try: - self.load(parts=part_spec) - except Exception: - logger.exception( - "Error while loading part %s for dataset %s", part_spec, self.name - ) - # clear in_progress under lock and optionally re-raise - clear_part_in_progress(part_spec) - - if not continue_on_error: - raise - logger.info("Continuing to next part after error in %s", part_spec) - continue - - # Mark completed under lock - mark_part_completed(part_spec) - logger.info( - "Marked part %s as completed for dataset %s", part_spec, self.name - ) - - logger.info("Chunked load finished for dataset %s", self.name) - def finalise(self) -> None: """Finalise the segmented Anemoi dataset.""" Finalise().run( From ba1980b47c998512f47a96e273ea718adee8bf43 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:35:24 +0000 Subject: [PATCH 07/22] :coffin: Remove init command and CLI --- icenet_mp/cli/datasets.py | 16 -------- icenet_mp/data_processors/data_downloader.py | 39 -------------------- 2 files changed, 55 deletions(-) diff --git a/icenet_mp/cli/datasets.py b/icenet_mp/cli/datasets.py index 8ea323cdb..36ccff47d 100644 --- a/icenet_mp/cli/datasets.py +++ b/icenet_mp/cli/datasets.py @@ -40,22 +40,6 @@ def inspect(config: DictConfig) -> None: downloader.inspect() -@datasets_cli.command("init") -@hydra_adaptor -def init( - config: DictConfig, - *, - overwrite: Annotated[ - bool, typer.Option(help="Specify whether to overwrite existing datasets") - ] = False, -) -> None: - """Create all datasets.""" - factory = DataDownloaderFactory(config) - for downloader in factory.downloaders: - logger.info("Working on %s.", downloader.name) - downloader.init(overwrite=overwrite) - - @datasets_cli.command("load") @hydra_adaptor def load( diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 70ff34921..f056b6c80 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -123,45 +123,6 @@ def inspect(self) -> None: else: logger.error("Dataset %s not found at %s.", self.name, self.path_dataset) - def init(self, *, overwrite: bool) -> None: - """Initialise a single Anemoi dataset.""" - logger.info("Initialising dataset %s at %s.", self.name, self.path_dataset) - - if overwrite: - logger.info( - "Overwrite set to true, reinitialising %s to %s", - self.name, - self.path_dataset, - ) - shutil.rmtree(self.path_dataset, ignore_errors=True) - Init().run( - AnemoiInitArgs( - path=str(self.path_dataset), - config=self.config, - ) - ) - else: - try: - self.inspect() - logger.info( - "Dataset %s already exists at %s, no need to download.", - self.name, - self.path_dataset, - ) - except (AttributeError, FileNotFoundError, PathNotFoundError): - logger.info( - "Dataset %s not found at %s, initialising.", - self.name, - self.path_dataset, - ) - shutil.rmtree(self.path_dataset, ignore_errors=True) - Init().run( - AnemoiInitArgs( - path=str(self.path_dataset), - config=self.config, - ) - ) - def initialise(self) -> None: """Initialise an Anemoi dataset.""" if self.path_dataset.exists(): From aa800cc9d8af1b09c0918c3650fb2e9a25deb8e1 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:36:39 +0000 Subject: [PATCH 08/22] :coffin: Remove load command and CLI --- icenet_mp/cli/datasets.py | 13 ------------- icenet_mp/data_processors/data_downloader.py | 17 ----------------- icenet_mp/types/__init__.py | 2 -- icenet_mp/types/simple_datatypes.py | 10 ---------- 4 files changed, 42 deletions(-) diff --git a/icenet_mp/cli/datasets.py b/icenet_mp/cli/datasets.py index 36ccff47d..f96380d6d 100644 --- a/icenet_mp/cli/datasets.py +++ b/icenet_mp/cli/datasets.py @@ -40,19 +40,6 @@ def inspect(config: DictConfig) -> None: downloader.inspect() -@datasets_cli.command("load") -@hydra_adaptor -def load( - config: DictConfig, - parts: Annotated[str, typer.Option(help="The part to process, specified as 'i/n'")], -) -> None: - """Load dataset in parts.""" - factory = DataDownloaderFactory(config) - for downloader in factory.downloaders: - logger.info("Working on %s.", downloader.name) - downloader.load(parts=parts) - - @datasets_cli.command("finalise") @hydra_adaptor def finalise( diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index f056b6c80..60c9f1fdb 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -14,7 +14,6 @@ AnemoiInitArgs, AnemoiInspectArgs, AnemoiLoadArgs, - AnemoiLoadArgsOld, ) from .preprocessors import IPreprocessor @@ -163,22 +162,6 @@ def status(self) -> tuple[bool, bool]: download_complete = all(version.build_flags or []) return (download_in_progress, download_complete) - def load(self, parts: str) -> None: - """Download a segment of an Anemoi dataset.""" - logger.info( - "Downloading %s part of %s to %s.", - parts, - self.name, - self.path_dataset, - ) - Load().run( - AnemoiLoadArgsOld( - path=str(self.path_dataset), - config=self.config, - parts=parts, - ) - ) - def finalise(self) -> None: """Finalise the segmented Anemoi dataset.""" Finalise().run( diff --git a/icenet_mp/types/__init__.py b/icenet_mp/types/__init__.py index c2a701288..2036b94c8 100644 --- a/icenet_mp/types/__init__.py +++ b/icenet_mp/types/__init__.py @@ -6,7 +6,6 @@ AnemoiInitArgs, AnemoiInspectArgs, AnemoiLoadArgs, - AnemoiLoadArgsOld, DataloaderArgs, DiffColourmapSpec, Metadata, @@ -28,7 +27,6 @@ "AnemoiInitArgs", "AnemoiInspectArgs", "AnemoiLoadArgs", - "AnemoiLoadArgsOld", "ArrayCHW", "ArrayHW", "ArrayTCHW", diff --git a/icenet_mp/types/simple_datatypes.py b/icenet_mp/types/simple_datatypes.py index 29fbf4a66..8cbe67237 100644 --- a/icenet_mp/types/simple_datatypes.py +++ b/icenet_mp/types/simple_datatypes.py @@ -28,16 +28,6 @@ class AnemoiInitArgs: overwrite: bool = False -@dataclass -class AnemoiLoadArgsOld: - """Arguments for anemoi load.""" - - config: DictConfig - path: str - parts: str - command: str = "unused" - - @dataclass class AnemoiFinaliseArgs: """Arguments for anemoi finalise.""" From 4d364491ba4975991ee175024c9f8319ed62b44a Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:37:34 +0000 Subject: [PATCH 09/22] :truck: Alphabetise DataDownloader methods --- icenet_mp/data_processors/data_downloader.py | 46 ++++++++++---------- icenet_mp/types/simple_datatypes.py | 20 ++++----- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 60c9f1fdb..a79eaf9d7 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -106,21 +106,14 @@ def download(self) -> None: self.path_dataset, ) - def inspect(self) -> None: - """Inspect an Anemoi dataset.""" - logger.info("Inspecting dataset %s at %s.", self.name, self.path_dataset) - if self.path_dataset.exists(): - InspectZarr().run( - AnemoiInspectArgs( - path=str(self.path_dataset), - detailed=True, - progress=True, - statistics=False, - size=True, - ) + def finalise(self) -> None: + """Finalise the segmented Anemoi dataset.""" + Finalise().run( + AnemoiFinaliseArgs( + path=str(self.path_dataset), + config=self.config, ) - else: - logger.error("Dataset %s not found at %s.", self.name, self.path_dataset) + ) def initialise(self) -> None: """Initialise an Anemoi dataset.""" @@ -145,6 +138,22 @@ def initialise(self) -> None: ) raise + def inspect(self) -> None: + """Inspect an Anemoi dataset.""" + logger.info("Inspecting dataset %s at %s.", self.name, self.path_dataset) + if self.path_dataset.exists(): + InspectZarr().run( + AnemoiInspectArgs( + path=str(self.path_dataset), + detailed=True, + progress=True, + statistics=False, + size=True, + ) + ) + else: + logger.error("Dataset %s not found at %s.", self.name, self.path_dataset) + def load_in_chunks(self) -> None: """Download a single Anemoi dataset in chunks, skipping those already present.""" Load().run( @@ -161,12 +170,3 @@ def status(self) -> tuple[bool, bool]: download_in_progress = version.copy_in_progress download_complete = all(version.build_flags or []) return (download_in_progress, download_complete) - - def finalise(self) -> None: - """Finalise the segmented Anemoi dataset.""" - Finalise().run( - AnemoiFinaliseArgs( - path=str(self.path_dataset), - config=self.config, - ) - ) diff --git a/icenet_mp/types/simple_datatypes.py b/icenet_mp/types/simple_datatypes.py index 8cbe67237..c47632d23 100644 --- a/icenet_mp/types/simple_datatypes.py +++ b/icenet_mp/types/simple_datatypes.py @@ -8,14 +8,12 @@ @dataclass -class AnemoiInspectArgs: - """Arguments for anemoi inspect.""" +class AnemoiFinaliseArgs: + """Arguments for anemoi finalise.""" - detailed: bool + config: DictConfig path: str - progress: bool - size: bool - statistics: bool + command: str = "unused" @dataclass @@ -29,12 +27,14 @@ class AnemoiInitArgs: @dataclass -class AnemoiFinaliseArgs: - """Arguments for anemoi finalise.""" +class AnemoiInspectArgs: + """Arguments for anemoi inspect.""" - config: DictConfig + detailed: bool path: str - command: str = "unused" + progress: bool + size: bool + statistics: bool @dataclass From 25f00cabec7cdaf9d2e93a006f6f033c7992cb57 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 10:53:56 +0000 Subject: [PATCH 10/22] :children_crossing: Also check whether statistics are ready before finalising --- icenet_mp/data_processors/data_downloader.py | 32 ++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index a79eaf9d7..5a7dde092 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -50,8 +50,8 @@ def create(self, *, overwrite: bool) -> None: # Otherwise we check whether a valid dataset exists elif self.path_dataset.exists(): - download_in_progress, download_complete = self.status() - # This dataset is being downloaded + download_in_progress, download_complete, _ = self.status() + # The dataset is being downloaded if download_in_progress: logger.warning( "Dataset %s at %s is currently being downloaded by another process. Please wait until it is complete.", @@ -96,12 +96,12 @@ def download(self) -> None: # Load in parts self.load_in_chunks() # Finalise if the status indicates the dataset is complete - download_in_progress, download_complete = self.status() - if download_complete and not download_in_progress: + download_in_progress, download_complete, statistics_ready = self.status() + if download_complete and (not download_in_progress) and statistics_ready: self.finalise() else: logger.warning( - "Dataset %s at %s is incomplete after loading, skipping finalise.", + "Dataset %s at %s is not fully loaded, skipping finalise.", self.name, self.path_dataset, ) @@ -138,17 +138,24 @@ def initialise(self) -> None: ) raise - def inspect(self) -> None: + def inspect( + self, + *, + detailed: bool = True, + progress: bool = True, + statistics: bool = True, + size: bool = True, + ) -> None: """Inspect an Anemoi dataset.""" logger.info("Inspecting dataset %s at %s.", self.name, self.path_dataset) if self.path_dataset.exists(): InspectZarr().run( AnemoiInspectArgs( path=str(self.path_dataset), - detailed=True, - progress=True, - statistics=False, - size=True, + detailed=detailed, + progress=progress, + statistics=statistics, + size=size, ) ) else: @@ -163,10 +170,11 @@ def load_in_chunks(self) -> None: ) ) - def status(self) -> tuple[bool, bool]: + def status(self) -> tuple[bool, bool, bool]: """Return a tuple indicating whether the dataset exists and whether it is complete.""" inspector = InspectZarr() version = inspector._info(str(self.path_dataset)) download_in_progress = version.copy_in_progress download_complete = all(version.build_flags or []) - return (download_in_progress, download_complete) + statistics_ready = version.statistics_ready + return (download_in_progress, download_complete, statistics_ready) From aa1f278d8e2d511d6dc8b2e20482bcb206df5c20 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 11:03:21 +0000 Subject: [PATCH 11/22] :bug: Remove invalid datasets whether or not they appear to be complete --- icenet_mp/data_processors/data_downloader.py | 52 ++++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 5a7dde092..b153e2e50 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -50,7 +50,7 @@ def create(self, *, overwrite: bool) -> None: # Otherwise we check whether a valid dataset exists elif self.path_dataset.exists(): - download_in_progress, download_complete, _ = self.status() + download_in_progress, _, _ = self.status() # The dataset is being downloaded if download_in_progress: logger.warning( @@ -59,30 +59,25 @@ def create(self, *, overwrite: bool) -> None: self.path_dataset, ) return - # The download is complete, but we check the dataset is valid before returning - if download_complete: - try: - self.inspect() - logger.info( - "Dataset %s already exists at %s, no need to download.", - self.name, - self.path_dataset, - ) - except (AttributeError, FileNotFoundError, PathNotFoundError): - # If the dataset is invalid we delete it - logger.info( - "Dataset %s not found at %s.", self.name, self.path_dataset - ) - shutil.rmtree(self.path_dataset, ignore_errors=True) - else: - # If the dataset is valid we return here - return - else: - logger.warning( - "Dataset %s at %s is incomplete.", + # Check whether the dataset is valid, even if it is incomplete + try: + self.inspect() + logger.info( + "Dataset %s already exists at %s, no need to download.", + self.name, + self.path_dataset, + ) + except (AttributeError, FileNotFoundError, PathNotFoundError): + # If the dataset is invalid we delete it + logger.info( + "Dataset %s at %s is invalid, removing it.", self.name, self.path_dataset, ) + shutil.rmtree(self.path_dataset, ignore_errors=True) + else: + # If the dataset is valid we return here + return # Download the dataset self.download() @@ -173,8 +168,13 @@ def load_in_chunks(self) -> None: def status(self) -> tuple[bool, bool, bool]: """Return a tuple indicating whether the dataset exists and whether it is complete.""" inspector = InspectZarr() - version = inspector._info(str(self.path_dataset)) - download_in_progress = version.copy_in_progress - download_complete = all(version.build_flags or []) - statistics_ready = version.statistics_ready + try: + version = inspector._info(str(self.path_dataset)) + download_in_progress = version.copy_in_progress + download_complete = all(version.build_flags or []) + statistics_ready = version.statistics_ready + except (AttributeError, FileNotFoundError, PathNotFoundError): + download_in_progress = False + download_complete = False + statistics_ready = False return (download_in_progress, download_complete, statistics_ready) From 3e0e0b56074e1ea1ff26da74725e31bfa0a7ca70 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 11:29:58 +0000 Subject: [PATCH 12/22] :coffin: Removed tests for removed code --- pyproject.toml | 1 - tests/conftest.py | 37 +- tests/data_processors/test_data_downloader.py | 500 ------------------ tests/test_cli.py | 3 - uv.lock | 2 - 5 files changed, 15 insertions(+), 528 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f80d00171..859c60e60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,6 @@ dependencies = [ "cachetools>=6.1.0", "cdsapi>=0.7.6", "eccodeslib>=2.40.0", # indirect dependency of icenet that is not automatically installed - "filelock>=3.12.0", "hydra-core>=1.3.2", "icenet==0.2.7", # newer versions require netCDF4<1.6.1 which does not work on macOS 15 "jaxtyping>=0.3.2", diff --git a/tests/conftest.py b/tests/conftest.py index 34af8b253..565472567 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,18 @@ from anemoi.datasets.commands.create import Create from omegaconf import DictConfig -from icenet_mp.types import AnemoiCreateArgs + +class MockAnemoiCreateArgs: + """Arguments for anemoi create.""" + + def __init__(self, config: DictConfig, path: Path) -> None: + """Initialise the arguments.""" + self.command = "unused" + self.config = config + self.overwrite = True + self.path = str(path) + self.processes = 0 + self.threads = 0 @pytest.fixture @@ -278,13 +289,7 @@ def mock_dataset(mock_data_path: Path, mock_data: dict[str, dict[str, Any]]) -> } ) zarr_path = mock_data_path / "anemoi" / "mock_dataset.zarr" - Create().run( - AnemoiCreateArgs( - path=str(zarr_path), - config=config, - overwrite=True, - ) - ) + Create().run(MockAnemoiCreateArgs(config, zarr_path)) return Path(str(zarr_path)) @@ -314,13 +319,7 @@ def mock_dataset_missing_dates( } ) zarr_path = mock_data_path / "anemoi" / "mock_dataset_missing_dates.zarr" - Create().run( - AnemoiCreateArgs( - path=str(zarr_path), - config=config, - overwrite=True, - ) - ) + Create().run(MockAnemoiCreateArgs(config, zarr_path)) return Path(str(zarr_path)) @@ -349,13 +348,7 @@ def mock_dataset_non_normalized_times( } ) zarr_path = mock_data_path / "anemoi" / "mock_dataset_non_normalized_times.zarr" - Create().run( - AnemoiCreateArgs( - path=str(zarr_path), - config=config, - overwrite=True, - ) - ) + Create().run(MockAnemoiCreateArgs(config, zarr_path)) return Path(str(zarr_path)) diff --git a/tests/data_processors/test_data_downloader.py b/tests/data_processors/test_data_downloader.py index 98920b6b0..316a78ca6 100644 --- a/tests/data_processors/test_data_downloader.py +++ b/tests/data_processors/test_data_downloader.py @@ -1,11 +1,6 @@ -import json -import shutil from pathlib import Path -from typing import Any, Literal -from unittest.mock import patch import pytest -from filelock import Timeout from omegaconf import DictConfig, OmegaConf from icenet_mp.data_processors.data_downloader import DataDownloader @@ -69,498 +64,3 @@ def downloader_with_directory_dataset(tmp_path: Path) -> DataDownloader: downloader.path_dataset = tmp_path / "test_dir.zarr" downloader.path_dataset.mkdir(parents=True, exist_ok=True) return downloader - - -def test_part_tracker_path_for_file( - downloader_with_file_dataset: DataDownloader, -) -> None: - """When dataset path is a file, tracker should use a sibling JSON with custom suffix.""" - downloader = downloader_with_file_dataset - tracker_path = downloader._part_tracker_path() - assert tracker_path == downloader.path_dataset.with_suffix(".load_parts.json") - assert downloader.path_dataset.is_file() - - -def test_part_tracker_path_for_directory( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """When dataset path is a directory, tracker should live inside it as .load_parts.json.""" - downloader = downloader_with_directory_dataset - tracker_path = downloader._part_tracker_path() - assert tracker_path == downloader.path_dataset / ".load_parts.json" - assert downloader.path_dataset.is_dir() - - -def test_read_part_tracker_returns_default_when_missing( - downloader_with_file_dataset: DataDownloader, -) -> None: - """_read_part_tracker should return an empty completed map when no file exists.""" - downloader = downloader_with_file_dataset - tracker_path = downloader._part_tracker_path() - assert not tracker_path.exists() - assert downloader._read_part_tracker() == {"completed": {}} - - -def test_write_and_read_part_tracker_roundtrip( - downloader_with_file_dataset: DataDownloader, -) -> None: - """_write_part_tracker should persist JSON that _read_part_tracker can load.""" - downloader = downloader_with_file_dataset - data = {"completed": {"1/3": {"completed_at": "2020-01-05T00:00:00Z"}}} - downloader._write_part_tracker(data) - - tracker_path = downloader._part_tracker_path() - assert tracker_path.exists() - - # File content should be valid JSON with the same structure - with tracker_path.open("r", encoding="utf-8") as fh: - on_disk = json.load(fh) - assert on_disk == data - assert downloader._read_part_tracker() == data - - -def test_write_part_tracker_fallback_on_atomic_write_failure( - downloader_with_file_dataset: DataDownloader, -) -> None: - """_write_part_tracker should fallback to naive write when atomic write fails.""" - downloader = downloader_with_file_dataset - data = {"completed": {"2/3": {"completed_at": "2020-01-10T00:00:00Z"}}} - tracker_path = downloader._part_tracker_path() - - # Mock Path.replace() to raise OSError, simulating atomic write failure - with patch.object( - Path, "replace", side_effect=OSError("Cross-device link not permitted") - ): - downloader._write_part_tracker(data) - - # Verify the file was written via fallback path - assert tracker_path.exists() - with tracker_path.open("r", encoding="utf-8") as fh: - on_disk = json.load(fh) - assert on_disk == data - assert downloader._read_part_tracker() == data - - -def _read_tracker(proc: DataDownloader) -> dict: - p = proc._part_tracker_path() - if not p.exists(): - return {"completed": {}} - return json.loads(p.read_text(encoding="utf-8")) - - -def test_load_in_parts_loads_all_parts_successfully( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """load_in_parts should load all parts and track completion.""" - downloader = downloader_with_directory_dataset - - initial_tracker = _read_tracker(downloader) - downloader._write_part_tracker(initial_tracker) - - with ( - patch.object(downloader, "load") as mock_load, - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts(continue_on_error=False, use_lock=False) - # Should have called load 10 times (for parts 1/10 - 10/10) - assert mock_load.call_count == 10 - mock_load.assert_any_call(parts="1/10") - mock_load.assert_any_call(parts="5/10") - mock_load.assert_any_call(parts="10/10") - - # Verify all parts are tracked as completed - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 10 - assert "1/10" in tracker["completed"] - assert "5/10" in tracker["completed"] - assert "10/10" in tracker["completed"] - # Each should have a completed_at timestamp - for part_spec in ["1/10", "5/10", "10/10"]: - assert "completed_at" in tracker["completed"][part_spec] - - -def test_load_in_parts_resumes_skipping_completed_parts( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """load_in_parts should skip parts that are already completed.""" - downloader = downloader_with_directory_dataset - # Pre-mark part 5/10 as completed - initial_tracker = { - "completed": { - "5/10": {"completed_at": "2020-01-15T00:00:00Z"}, - } - } - downloader._write_part_tracker(initial_tracker) - - with ( - patch.object(downloader, "load") as mock_load, - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts(continue_on_error=False, use_lock=False) - - # Should call load for all 10 parts except 5/10 - assert mock_load.call_count == 9 - mock_load.assert_any_call(parts="1/10") - mock_load.assert_any_call(parts="10/10") - # Should not have called 5/10 - called_parts = [ - call.kwargs.get("parts") or (call.args[0] if call.args else None) - for call in mock_load.call_args_list - ] - assert "5/10" not in called_parts - - # Verify all parts are now completed - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 10 - assert "1/10" in tracker["completed"] - assert "5/10" in tracker["completed"] # Still there from before - assert "10/10" in tracker["completed"] - - -def test_load_in_parts_force_reset_clears_tracker( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """load_in_parts should clear tracker when force_reset=True.""" - downloader = downloader_with_directory_dataset - # Pre-mark all parts as completed - initial_tracker = { - "completed": { - "1/10": {"completed_at": "2020-01-05T00:00:00Z"}, - "5/10": {"completed_at": "2020-01-15T00:00:00Z"}, - "10/10": {"completed_at": "2020-01-25T00:00:00Z"}, - } - } - downloader._write_part_tracker(initial_tracker) - - with ( - patch.object(downloader, "load") as mock_load, - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts( - continue_on_error=False, force_reset=True, use_lock=False - ) - - # Should call load for all 10 parts (not skipping any) - assert mock_load.call_count == 10 - - # Verify tracker was cleared and repopulated - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 10 - # Timestamps should be new (not the original ones) - for part_spec in ["1/10", "5/10", "10/10"]: - assert ( - tracker["completed"][part_spec]["completed_at"] - != initial_tracker["completed"][part_spec]["completed_at"] - ) - - -def test_load_in_parts_continues_on_error_when_enabled( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """load_in_parts should continue to next part when error occurs and continue_on_error=True.""" - downloader = downloader_with_directory_dataset - - initial_tracker = _read_tracker(downloader) - downloader._write_part_tracker(initial_tracker) - - # Make load fail for part 5/10 - call_count = 0 - error_msg = "Simulated load failure" - - def mock_load_side_effect(parts: str) -> None: - nonlocal call_count - call_count += 1 - if parts == "5/10": - raise RuntimeError(error_msg) - - with ( - patch.object(downloader, "load", side_effect=mock_load_side_effect), - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts(continue_on_error=True, use_lock=False) - - # Should have attempted all 10 parts - assert call_count == 10 - - # All 10 parts marked as completed except 5/10 marked as failed - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 9 - assert "1/10" in tracker["completed"] - assert "5/10" not in tracker["completed"] - assert "10/10" in tracker["completed"] - - -def test_load_in_parts_raises_on_error_when_continue_disabled( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """load_in_parts should raise exception when error occurs and continue_on_error=False.""" - downloader = downloader_with_directory_dataset - - initial_tracker = _read_tracker(downloader) - downloader._write_part_tracker(initial_tracker) - - # Make load fail for part 5/10 - error_msg = "Simulated load failure" - - def mock_load_side_effect(parts: str) -> None: - if parts == "5/10": - raise RuntimeError(error_msg) - # First 4 parts should succeed - - with ( - patch.object(downloader, "load", side_effect=mock_load_side_effect), - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - pytest.raises(RuntimeError, match=error_msg), - ): - downloader.load_in_parts(continue_on_error=False, use_lock=False) - - # Only first 4 parts should be marked as completed (5/10 failed and raised) - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 4 - assert "1/10" in tracker["completed"] - assert "4/10" in tracker["completed"] - assert "5/10" not in tracker["completed"] - assert "10/10" not in tracker["completed"] - - -def test_lock_timeout_skips_part( - monkeypatch: pytest.MonkeyPatch, - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Simulate FileLock timeout on acquisition by having FileLock.__enter__ raise Timeout. - - The implementation treats lock-timeout as a skip for that part (it will be retried on next run). - """ - proc = downloader_with_directory_dataset - - # Patch FileLock to raise Timeout on entering context - timeout_msg = "could not acquire" - - class BadLock: - def __init__(self, *args: object, **kwargs: object) -> None: - pass - - def __enter__(self) -> None: - raise Timeout(timeout_msg) - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc: BaseException | None, - tb: object, - ) -> Literal[False]: - return False - - monkeypatch.setattr("icenet_mp.data_processors.data_downloader.FileLock", BadLock) - - calls = [] - - def fake_load(parts: str) -> None: - calls.append(parts) - - # run: since lock acquisition fails, load() should not be called for that part - with ( - patch.object(proc, "load", side_effect=fake_load), - patch.object(proc, "inspect"), # Mock inspect to assume dataset exists - ): - proc.load_in_parts( - continue_on_error=True, - force_reset=False, - use_lock=True, - lock_timeout=0, - ) - - # Because lock acquisition failed for each part claim attempt, no loads should have been attempted - assert calls == [] - - -def test_total_parts_override( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Test that total_parts overrides the computed total parts.""" - downloader = downloader_with_directory_dataset - - initial_tracker = _read_tracker(downloader) - downloader._write_part_tracker(initial_tracker) - - # Override to 5 parts instead of computed 10 - with ( - patch.object(downloader, "load") as mock_load, - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts( - continue_on_error=False, - use_lock=False, - total_parts=5, - ) - - # Should have called load 5 times (for parts 1/5, 2/5, 3/5, 4/5, 5/5) - assert mock_load.call_count == 5 - mock_load.assert_any_call(parts="1/5") - mock_load.assert_any_call(parts="2/5") - mock_load.assert_any_call(parts="3/5") - mock_load.assert_any_call(parts="4/5") - mock_load.assert_any_call(parts="5/5") - - # Verify all 5 parts are tracked as completed - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 5 - for part_spec in ["1/5", "2/5", "3/5", "4/5", "5/5"]: - assert part_spec in tracker["completed"] - - -def test_overwrite_deletes_dataset_and_tracker( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Test that overwrite deletes dataset and tracker, then initializes.""" - downloader = downloader_with_directory_dataset - # Create some initial state - initial_tracker = { - "completed": { - "1/10": {"completed_at": "2020-01-05T00:00:00Z"}, - }, - "in_progress": { - "5/10": {"started_at": "2020-01-10T00:00:00Z"}, - }, - } - downloader._write_part_tracker(initial_tracker) - # Create some dataset content - if downloader.path_dataset.exists(): - if downloader.path_dataset.is_dir(): - shutil.rmtree(downloader.path_dataset) - else: - downloader.path_dataset.unlink() - downloader.path_dataset.mkdir(parents=True, exist_ok=True) - (downloader.path_dataset / "some_file").touch() - - with ( - patch.object(downloader, "init") as mock_init, - patch.object(downloader, "load") as mock_load, - ): - downloader.load_in_parts( - continue_on_error=False, use_lock=False, overwrite=True - ) - - # Should have called init to reinitialize the dataset - mock_init.assert_called_once_with(overwrite=False) - # Dataset should have been deleted (we can't easily verify this, but init was called) - # Tracker should be cleared (all parts should be loaded) - assert mock_load.call_count == 10 - - # Verify tracker was cleared and repopulated - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 10 - # All parts should have new timestamps - for part_spec in ["1/10", "5/10", "10/10"]: - assert part_spec in tracker["completed"] - assert tracker["completed"][part_spec]["completed_at"] != initial_tracker.get( - "completed", {} - ).get(part_spec, {}).get("completed_at", "") - - -def test_overwrite_skips_force_reset( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Test that force_reset is skipped when overwrite is used.""" - downloader = downloader_with_directory_dataset - # Create initial tracker state - initial_tracker = { - "completed": { - "1/10": {"completed_at": "2020-01-05T00:00:00Z"}, - }, - "in_progress": { - "5/10": {"started_at": "2020-01-10T00:00:00Z"}, - }, - } - downloader._write_part_tracker(initial_tracker) - # Ensure dataset directory exists - if downloader.path_dataset.exists(): - if downloader.path_dataset.is_dir(): - shutil.rmtree(downloader.path_dataset) - else: - downloader.path_dataset.unlink() - downloader.path_dataset.mkdir(parents=True, exist_ok=True) - - with ( - patch.object(downloader, "init") as mock_init, - patch.object(downloader, "load") as mock_load, - ): - # Both overwrite and force_reset are True - downloader.load_in_parts( - continue_on_error=False, - use_lock=False, - overwrite=True, - force_reset=True, - ) - - # init should be called (by overwrite) - mock_init.assert_called_once() - # All parts should be loaded (overwrite clears everything) - assert mock_load.call_count == 10 - - -def test_automatic_initialization_when_dataset_missing( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Test that dataset is automatically initialized if it doesn't exist.""" - downloader = downloader_with_directory_dataset - # Remove the dataset file/directory - if downloader.path_dataset.exists(): - if downloader.path_dataset.is_dir(): - shutil.rmtree(downloader.path_dataset) - else: - downloader.path_dataset.unlink() - - with ( - patch.object(downloader, "init") as mock_init, - patch.object(downloader, "inspect", side_effect=FileNotFoundError("not found")), - patch.object(downloader, "load") as mock_load, - ): - downloader.load_in_parts(continue_on_error=False, use_lock=False) - - # Should have called init to initialize the missing dataset - mock_init.assert_called_once_with(overwrite=False) - # Should then proceed to load parts - assert mock_load.call_count == 10 - - -def test_force_reset_clears_in_progress( - downloader_with_directory_dataset: DataDownloader, -) -> None: - """Test that force_reset clears both completed and in_progress entries.""" - downloader = downloader_with_directory_dataset - # Pre-mark some parts as completed and in_progress - initial_tracker: dict[str, Any] = { - "completed": { - "1/10": {"completed_at": "2020-01-05T00:00:00Z"}, - }, - "in_progress": { - "5/10": { - "started_at": "2020-01-10T00:00:00Z", - "pid": 12345, - "host": "test", - }, - }, - } - downloader._write_part_tracker(initial_tracker) - - with ( - patch.object(downloader, "load") as mock_load, - patch.object(downloader, "inspect"), # Mock inspect to assume dataset exists - ): - downloader.load_in_parts( - continue_on_error=False, force_reset=True, use_lock=False - ) - - # Should call load for all 10 parts (not skipping any due to force_reset) - assert mock_load.call_count == 10 - - # Verify tracker was cleared and repopulated (no in_progress entries) - tracker = downloader._read_part_tracker() - assert len(tracker["completed"]) == 10 - assert "in_progress" not in tracker or len(tracker.get("in_progress", {})) == 0 - # Timestamps should be new - assert ( - tracker["completed"]["1/10"]["completed_at"] - != initial_tracker["completed"]["1/10"]["completed_at"] - ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 8d236d0a9..8f1c52e29 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -70,9 +70,6 @@ def test_help(self) -> None: r"--help\s+-h\s+Show this message and exit.", r"create\s+Create all datasets.", r"inspect\s+Inspect all datasets.", - r"init\s+Create all datasets.", - r"load\s+Load dataset in parts.", - r"finalise\s+Finalise loaded dataset.", ], ) diff --git a/uv.lock b/uv.lock index a9ae9c3f7..a261c8031 100644 --- a/uv.lock +++ b/uv.lock @@ -1679,7 +1679,6 @@ dependencies = [ { name = "cachetools" }, { name = "cdsapi" }, { name = "eccodeslib" }, - { name = "filelock" }, { name = "hydra-core" }, { name = "icenet" }, { name = "jaxtyping" }, @@ -1723,7 +1722,6 @@ requires-dist = [ { name = "cachetools", specifier = ">=6.1.0" }, { name = "cdsapi", specifier = ">=0.7.6" }, { name = "eccodeslib", specifier = ">=2.40.0" }, - { name = "filelock", specifier = ">=3.12.0" }, { name = "hydra-core", specifier = ">=1.3.2" }, { name = "icenet", specifier = "==0.2.7" }, { name = "jaxtyping", specifier = ">=0.3.2" }, From 5ac32f91a5dd902b56330f1fdb627e65a04e51c4 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 16:17:06 +0000 Subject: [PATCH 13/22] :alembic: Test whether chunk-checking is sufficient to cause downloads to resume --- icenet_mp/data_processors/data_downloader.py | 45 +++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index b153e2e50..beaad6603 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -50,7 +50,7 @@ def create(self, *, overwrite: bool) -> None: # Otherwise we check whether a valid dataset exists elif self.path_dataset.exists(): - download_in_progress, _, _ = self.status() + download_in_progress, download_complete, _ = self.status() # The dataset is being downloaded if download_in_progress: logger.warning( @@ -59,25 +59,26 @@ def create(self, *, overwrite: bool) -> None: self.path_dataset, ) return - # Check whether the dataset is valid, even if it is incomplete - try: - self.inspect() - logger.info( - "Dataset %s already exists at %s, no need to download.", - self.name, - self.path_dataset, - ) - except (AttributeError, FileNotFoundError, PathNotFoundError): - # If the dataset is invalid we delete it - logger.info( - "Dataset %s at %s is invalid, removing it.", - self.name, - self.path_dataset, - ) - shutil.rmtree(self.path_dataset, ignore_errors=True) - else: - # If the dataset is valid we return here - return + # Check whether a dataset marked as complete is valid + if download_complete: + try: + self.inspect() + logger.info( + "Dataset %s already exists at %s, no need to download.", + self.name, + self.path_dataset, + ) + except (AttributeError, FileNotFoundError, PathNotFoundError): + # If the dataset is invalid we delete it + logger.info( + "Dataset %s at %s is invalid, removing it.", + self.name, + self.path_dataset, + ) + raise + else: + # If the dataset is valid we return here + return # Download the dataset self.download() @@ -171,7 +172,9 @@ def status(self) -> tuple[bool, bool, bool]: try: version = inspector._info(str(self.path_dataset)) download_in_progress = version.copy_in_progress - download_complete = all(version.build_flags or []) + n_dates_expected = len(version.dataset.dates) - len(version.dataset.missing) + n_dates_in_zarr = version.data.nchunks_initialized + download_complete = n_dates_expected == n_dates_in_zarr statistics_ready = version.statistics_ready except (AttributeError, FileNotFoundError, PathNotFoundError): download_in_progress = False From 7adc82274d9b3cc3c1faef70d609519da621af42 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 16:49:46 +0000 Subject: [PATCH 14/22] :wrench: Default to statistics=False when inspecting --- icenet_mp/cli/datasets.py | 10 ++++++++-- icenet_mp/data_processors/data_downloader.py | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/icenet_mp/cli/datasets.py b/icenet_mp/cli/datasets.py index f96380d6d..e2177eeb9 100644 --- a/icenet_mp/cli/datasets.py +++ b/icenet_mp/cli/datasets.py @@ -32,12 +32,18 @@ def create( @datasets_cli.command("inspect") @hydra_adaptor -def inspect(config: DictConfig) -> None: +def inspect( + config: DictConfig, + *, + statistics: Annotated[ + bool, typer.Option(help="Specify whether to show dataset statistics") + ] = False, +) -> None: """Inspect all datasets.""" factory = DataDownloaderFactory(config) for downloader in factory.downloaders: logger.info("Working on %s.", downloader.name) - downloader.inspect() + downloader.inspect(statistics=statistics) @datasets_cli.command("finalise") diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index beaad6603..b05fb13e7 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -139,8 +139,8 @@ def inspect( *, detailed: bool = True, progress: bool = True, - statistics: bool = True, size: bool = True, + statistics: bool = False, ) -> None: """Inspect an Anemoi dataset.""" logger.info("Inspecting dataset %s at %s.", self.name, self.path_dataset) From bdfb08b1eaee31c2d501389b18e74c46fecd76dd Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 16:50:23 +0000 Subject: [PATCH 15/22] :goal_net: Exit gracefully when inspect fails during download --- icenet_mp/data_processors/data_downloader.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index b05fb13e7..b16c209df 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -2,6 +2,7 @@ import shutil from pathlib import Path +import typer from anemoi.datasets.commands.finalise import Finalise from anemoi.datasets.commands.init import Init from anemoi.datasets.commands.inspect import InspectZarr @@ -69,13 +70,13 @@ def create(self, *, overwrite: bool) -> None: self.path_dataset, ) except (AttributeError, FileNotFoundError, PathNotFoundError): - # If the dataset is invalid we delete it - logger.info( - "Dataset %s at %s is invalid, removing it.", + # If the dataset is invalid we flag this to the user and exit + logger.error( # noqa: TRY400 + "Dataset %s at %s seems to be invalid. Please check manually.", self.name, self.path_dataset, ) - raise + typer.Exit(1) else: # If the dataset is valid we return here return From 3b53ac5a4f267650867459e5d5ed80fc2fad52e6 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 16:59:48 +0000 Subject: [PATCH 16/22] :wrench: Do not set progress when detailed is set or the same information is printed twice --- icenet_mp/data_processors/data_downloader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index b16c209df..301d4536c 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -139,7 +139,6 @@ def inspect( self, *, detailed: bool = True, - progress: bool = True, size: bool = True, statistics: bool = False, ) -> None: @@ -150,7 +149,7 @@ def inspect( AnemoiInspectArgs( path=str(self.path_dataset), detailed=detailed, - progress=progress, + progress=(not detailed), statistics=statistics, size=size, ) From 5dd9c3629cdcaa4fc068e3643acb8843f3c05e49 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 17:10:10 +0000 Subject: [PATCH 17/22] :bug: If only the statistics are missing we need to finalise --- icenet_mp/data_processors/data_downloader.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 301d4536c..9b9befc02 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -51,7 +51,7 @@ def create(self, *, overwrite: bool) -> None: # Otherwise we check whether a valid dataset exists elif self.path_dataset.exists(): - download_in_progress, download_complete, _ = self.status() + download_in_progress, download_complete, statistics_ready = self.status() # The dataset is being downloaded if download_in_progress: logger.warning( @@ -60,8 +60,12 @@ def create(self, *, overwrite: bool) -> None: self.path_dataset, ) return - # Check whether a dataset marked as complete is valid + # If the download is complete but the statistics are not ready we should finalise if download_complete: + if not statistics_ready: + self.finalise() + + # Check whether a dataset marked as complete is valid try: self.inspect() logger.info( @@ -111,6 +115,7 @@ def finalise(self) -> None: config=self.config, ) ) + logger.info("Finalised dataset %s at %s.", self.name, self.path_dataset) def initialise(self) -> None: """Initialise an Anemoi dataset.""" From 1c9a4bb6dfedb608b2d0c382b5320957e47852fd Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 26 Feb 2026 17:24:23 +0000 Subject: [PATCH 18/22] :rotating_light: Perform explicit type checking to avoid linting errors --- icenet_mp/data_processors/data_downloader.py | 31 ++++++++++++-------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 9b9befc02..a04baa4b4 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -7,7 +7,9 @@ from anemoi.datasets.commands.init import Init from anemoi.datasets.commands.inspect import InspectZarr from anemoi.datasets.commands.load import Load +from anemoi.datasets.data.dataset import Dataset as AnemoiDataset from omegaconf import DictConfig, OmegaConf +from zarr.core import Array as ZarrArray from zarr.errors import PathNotFoundError from icenet_mp.types import ( @@ -55,21 +57,22 @@ def create(self, *, overwrite: bool) -> None: # The dataset is being downloaded if download_in_progress: logger.warning( - "Dataset %s at %s is currently being downloaded by another process. Please wait until it is complete.", + "Dataset %s at %s is currently being downloaded by another process.", self.name, self.path_dataset, ) return - # If the download is complete but the statistics are not ready we should finalise + # If the download is complete then check whether the dataset is valid if download_complete: + # If the statistics are not ready we should finalise if not statistics_ready: self.finalise() - # Check whether a dataset marked as complete is valid + # Inspect the dataset for validity try: self.inspect() logger.info( - "Dataset %s already exists at %s, no need to download.", + "Dataset %s at %s has been downloaded and seems to be valid.", self.name, self.path_dataset, ) @@ -121,7 +124,7 @@ def initialise(self) -> None: """Initialise an Anemoi dataset.""" if self.path_dataset.exists(): logger.info( - "Dataset %s already initialised at %s.", self.name, self.path_dataset + "Dataset %s at %s is already initialised.", self.name, self.path_dataset ) return try: @@ -173,14 +176,18 @@ def load_in_chunks(self) -> None: def status(self) -> tuple[bool, bool, bool]: """Return a tuple indicating whether the dataset exists and whether it is complete.""" - inspector = InspectZarr() try: - version = inspector._info(str(self.path_dataset)) - download_in_progress = version.copy_in_progress - n_dates_expected = len(version.dataset.dates) - len(version.dataset.missing) - n_dates_in_zarr = version.data.nchunks_initialized - download_complete = n_dates_expected == n_dates_in_zarr - statistics_ready = version.statistics_ready + ds_info = InspectZarr()._info(str(self.path_dataset)) + download_in_progress = ds_info.copy_in_progress + if isinstance(dataset := ds_info.dataset, AnemoiDataset) and isinstance( + array := ds_info.data, ZarrArray + ): + n_dates_expected = len(dataset.dates) - len(dataset.missing) + n_dates_in_zarr = array.nchunks_initialized + download_complete = n_dates_expected == n_dates_in_zarr + else: + download_complete = False + statistics_ready = ds_info.statistics_ready except (AttributeError, FileNotFoundError, PathNotFoundError): download_in_progress = False download_complete = False From 1e2e5001dbaa0f8cef4a6a87137a9c794418e906 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Fri, 27 Feb 2026 10:51:15 +0000 Subject: [PATCH 19/22] :bug: Finalise when statistics are *not* ready --- icenet_mp/data_processors/data_downloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index a04baa4b4..3f10e2c5b 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -101,7 +101,7 @@ def download(self) -> None: self.load_in_chunks() # Finalise if the status indicates the dataset is complete download_in_progress, download_complete, statistics_ready = self.status() - if download_complete and (not download_in_progress) and statistics_ready: + if download_complete and (not download_in_progress) and (not statistics_ready): self.finalise() else: logger.warning( From b95c9658e19729a4cf75cf5e6f0b3ec00e2149d4 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Fri, 27 Feb 2026 11:50:11 +0000 Subject: [PATCH 20/22] :bug: Ensure that typer.Exit is raised --- icenet_mp/data_processors/data_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 3f10e2c5b..39d845426 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -76,14 +76,14 @@ def create(self, *, overwrite: bool) -> None: self.name, self.path_dataset, ) - except (AttributeError, FileNotFoundError, PathNotFoundError): + except (AttributeError, FileNotFoundError, PathNotFoundError) as exc: # If the dataset is invalid we flag this to the user and exit logger.error( # noqa: TRY400 "Dataset %s at %s seems to be invalid. Please check manually.", self.name, self.path_dataset, ) - typer.Exit(1) + raise typer.Exit(1) from exc else: # If the dataset is valid we return here return From 520862173cf06dc27da0f5ec691561e9bf685921 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Fri, 27 Feb 2026 11:52:23 +0000 Subject: [PATCH 21/22] :goal_net: Raise exception if status cannot be retrieved, rather than using default values --- icenet_mp/data_processors/data_downloader.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/icenet_mp/data_processors/data_downloader.py b/icenet_mp/data_processors/data_downloader.py index 39d845426..1f3b40846 100644 --- a/icenet_mp/data_processors/data_downloader.py +++ b/icenet_mp/data_processors/data_downloader.py @@ -188,8 +188,11 @@ def status(self) -> tuple[bool, bool, bool]: else: download_complete = False statistics_ready = ds_info.statistics_ready - except (AttributeError, FileNotFoundError, PathNotFoundError): - download_in_progress = False - download_complete = False - statistics_ready = False + except (AttributeError, FileNotFoundError, PathNotFoundError) as exc: + logger.error( # noqa: TRY400 + "Unable to get status for %s at %s.", + self.name, + self.path_dataset, + ) + raise typer.Exit(1) from exc return (download_in_progress, download_complete, statistics_ready) From 3f49369d58f438778858bd300add60cd65cb301e Mon Sep 17 00:00:00 2001 From: James Robinson Date: Fri, 27 Feb 2026 12:02:08 +0000 Subject: [PATCH 22/22] :coffin: Removed the unused finalise CLI command --- icenet_mp/cli/datasets.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/icenet_mp/cli/datasets.py b/icenet_mp/cli/datasets.py index e2177eeb9..9fb23f006 100644 --- a/icenet_mp/cli/datasets.py +++ b/icenet_mp/cli/datasets.py @@ -46,17 +46,5 @@ def inspect( downloader.inspect(statistics=statistics) -@datasets_cli.command("finalise") -@hydra_adaptor -def finalise( - config: DictConfig, -) -> None: - """Finalise loaded dataset.""" - factory = DataDownloaderFactory(config) - for downloader in factory.downloaders: - logger.info("Working on %s.", downloader.name) - downloader.finalise() - - if __name__ == "__main__": datasets_cli()