Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
samp-floatnorth-argo-pt-2017-2019-24h-v1:
name: samp-floatnorth-argo-pt-2017-2019-24h-v1
samp-floatsouth-argo-pt-2017-2019-24h-v1:
name: samp-floatsouth-argo-pt-2017-2019-24h-v1
group_as: float_argo
description: ARGO float data for northern hemisphere
description: ARGO float data for southern hemisphere
attribution: ARGO
licence: CC-BY-4.0

dates:
start: '2017-01-01T12:00:00'
Comment thread
jemrobinson marked this conversation as resolved.
end: '2017-01-20T12:00:00'
end: '2019-01-31T12:00:00'
frequency: 24h

input:
pipe:
- argo:
area: "90/-180/20/180" # northern hemisphere
area: "-20/-180/-90/180" # southern hemisphere
param:
- TEMP # temperature
- PSAL # salinity
Expand Down
3 changes: 2 additions & 1 deletion icenet_mp/data_processors/filters/nan_to_num.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ def forward_select(self) -> dict[str, str | list[str] | tuple[str]]:
def forward_transform(self, field: ekd.Field) -> ekd.Field:
"""A forward transform that replaces NaNs in the input field with 'replace_with'."""
return self.new_field_from_numpy(
np.nan_to_num(field.to_numpy(), nan=self.replace_with), template=field
np.nan_to_num(field.to_numpy().reshape(field.shape), nan=self.replace_with),
template=field,
)
60 changes: 27 additions & 33 deletions icenet_mp/data_processors/sources/argo.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
import logging
import time
from datetime import datetime, timedelta
from typing import Any

import earthkit.data as ekd
import numpy as np
import xarray as xr
from anemoi.datasets.create.input.context import Context
from anemoi.datasets.create.source import Source
from anemoi.datasets.create.sources import source_registry
from anemoi.datasets.create.sources.xarray import load_one
from anemoi.datasets.dates.groups import GroupOfDates
from argopy import DataFetcher
from earthkit.data import FieldList
from haversine import Unit, haversine_vector
from pandas import DataFrame

logger = logging.getLogger(__name__)


@source_registry.register("argo")
# class ArgoSource(LegacySource):
class ArgoSource(Source):
def __init__( # noqa: PLR0913
self,
context: dict[str, Any],
context: Context,
*,
area: str,
param: list[str],
*,
skip_interpolation: bool = False,
grid_resolution_degrees: float = 1,
distance_scale_km: float = 2000,
min_weight: float = 1e-10,
time_half_window_hrs: int = 2,
) -> None:
"""Initialise the source."""
"""Set parameters for fetching and interpolating Argo float data."""
self.context = context
self.param = param
self.skip_interpolation = skip_interpolation
Expand All @@ -45,11 +44,8 @@ def __init__( # noqa: PLR0913
msg = f"Invalid area: {area}. Expected format: 'N/W/S/E'"
raise ValueError(msg)

def execute(
self,
date_group: GroupOfDates,
) -> ekd.FieldList:
"""Download Argo float data within given parameters."""
def execute(self, dates: list[datetime] | GroupOfDates) -> FieldList:
"""Download Argo float data within given date range."""
# Set constants
# Construct the grid that we want to project onto
lats = np.arange(
Expand All @@ -72,7 +68,7 @@ def execute(
len(lats) * len(lons),
)

requested_dates = sorted(date_group.dates)
requested_dates = sorted(date for date in dates)
weighted_data: dict[str, np.ndarray] = {
variable: np.full(
(len(requested_dates), len(lats), len(lons)), np.nan, dtype=float
Expand Down Expand Up @@ -184,16 +180,15 @@ def execute(
},
)

field_lists = load_one(
multi_field_list = load_one(
"📂", self.context, [date.isoformat() for date in requested_dates], ds_out
)

n_dates = len(field_lists) // len(self.param)
n_dates = len(multi_field_list) // len(self.param)
if n_dates != len(requested_dates):
msg = f"Expected {len(requested_dates)} dates, got {n_dates} dates"
raise ValueError(msg)

return field_lists
return multi_field_list


def _fetch_argo_dataframe_with_retry(
Expand Down Expand Up @@ -224,6 +219,7 @@ def _fetch_argo_dataframe_with_retry(
is_500 = "500" in error_str
is_503 = "503" in error_str

# Retry on 503 or 500 errors, with exponential backoff
if (is_503 or is_500) and attempt < max_retries:
backoff = initial_backoff_s * (2 ** (attempt - 1))

Expand All @@ -234,26 +230,24 @@ def _fetch_argo_dataframe_with_retry(
logger.warning(msg)
time.sleep(backoff)
continue
if is_503:
msg = f"ERDDAP data server failed with 503 after {max_retries} retries. Error: {error_str}"
raise RuntimeError(msg) from exc

# Otherwise raise an exception
if is_500:
msg = f"ERDDAP data server failed with 500 after {max_retries} retries. Error: {error_str}"
raise RuntimeError(msg) from exc

# Otherwise don't retry
if is_503:
msg = f"ERDDAP data server failed with 503 after {max_retries} retries. Error: {error_str}"
raise RuntimeError(msg) from exc
raise

else:
break
# If we successfully fetched the data, attempt to return it as a DataFrame
try:
msg = f"Successfully fetched data from erddap on attempt {attempt}"
logger.debug(msg)
return fetcher.to_dataframe()
except FileNotFoundError:
msg = f"Failed to load data for {region} and {time_window}. Check whether the data exists at https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats.html"
logger.warning(msg)

try:
df = fetcher.to_dataframe()
except FileNotFoundError:
msg = f"Failed to load file for {region + time_window}, it may be empty. Check whether the data exists at https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats.html"
logger.warning(msg)
return DataFrame() # Return empty DataFrame if file not found (e.g., no data for that region/time)
else:
msg = f"Successfully fetched data from erddap on attempt {attempt}"
logger.debug(msg)
return df
# Return empty DataFrame if file not found (e.g., no data for that region/time)
return DataFrame()
40 changes: 22 additions & 18 deletions icenet_mp/data_processors/sources/ftp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
from datetime import datetime
from ftplib import FTP
from ftplib import Error as FtpError
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any

import earthkit.data as ekd
from anemoi.datasets.create.input.context import Context
from anemoi.datasets.create.source import Source
from anemoi.datasets.create.sources import source_registry
from anemoi.datasets.create.sources.legacy import LegacySource
from anemoi.datasets.create.sources.xarray import load_one
from anemoi.datasets.dates.groups import GroupOfDates
from earthkit.data.core.fieldlist import FieldList, MultiFieldList
Expand All @@ -19,32 +19,36 @@


@source_registry.register("ftp")
class FTPSource(LegacySource):
@staticmethod
def _execute(
context: dict[str, Any],
dates: GroupOfDates,
class FTPSource(Source):
def __init__(
self,
context: Context,
*,
url: str,
passwd: str = "",
user: str = "anonymous",
) -> ekd.FieldList:
"""Execute the data loading process from an FTP source."""
) -> None:
"""Initialise the source."""
self.context = context
# Parse the FTP URL
server, path_pattern = url.replace("ftp://", "").split("/", 1)
self.ftp_args = {"passwd": passwd, "user": user}
self.server, self.path_pattern = url.replace("ftp://", "").split("/", 1)

def execute(self, dates: list[datetime] | GroupOfDates) -> FieldList:
"""Execute the data loading process from an FTP source."""
# Get list of remote file paths
remote_paths = {
date.isoformat(): to_list(
Pattern(path_pattern).substitute(date=date, allow_extra=True)
Pattern(self.path_pattern).substitute(date=date, allow_extra=True)
)
for date in dates
}

# Connect to the FTP server
downloaded_files: list[FieldList] = []
with TemporaryDirectory() as tmpdir, FTP(server) as session: # noqa: S321
field_lists: list[FieldList] = []
with TemporaryDirectory() as tmpdir, FTP(self.server) as session: # noqa: S321
base_path = Path(tmpdir)
session.login(user=user, passwd=passwd)
session.login(**self.ftp_args)

# Iterate over remote paths
for iso_date, remote_path_list in remote_paths.items():
Expand All @@ -56,12 +60,12 @@ def _execute(
session.cwd(("/" + directory).replace("//", "/"))
with local_path.open("wb") as local_file:
session.retrbinary(f"RETR {filename}", local_file.write)
downloaded_files.append(
load_one("📂", context, [iso_date], str(local_path))
field_lists.append(
load_one("📂", self.context, [iso_date], str(local_path))
)
except FtpError as exc:
msg = f"Failed to download from '{remote_path}': {exc}"
logger.warning(msg)

# Combine all downloaded files into a MultiFieldList
return MultiFieldList(downloaded_files)
return MultiFieldList(field_lists)
14 changes: 11 additions & 3 deletions tests/data_processors/sources/test_argo_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pandas as pd
import pytest
from anemoi.datasets.create.input import FieldContext
from anemoi.datasets.dates import DatesProvider
from anemoi.datasets.dates.groups import GroupOfDates
from anemoi.utils.registry import Registry
Expand All @@ -16,6 +17,13 @@
class TestArgoSource:
"""Test suite for ArgoSource class."""

context = FieldContext(
argument=None,
order_by="none",
flatten_grid=False,
remapping={},
use_grib_paramid=False,
)
dates = GroupOfDates(
[datetime(2020, 1, day) for day in range(1, 4)],
provider=DatesProvider.from_config(
Expand Down Expand Up @@ -70,12 +78,12 @@ def test_argo_source_execute_basic(self) -> None:
)
mp.setattr("icenet_mp.data_processors.sources.argo.load_one", mock_load_one)

tmp_source = ArgoSource(
context={},
source = ArgoSource(
context=self.context,
area="20/30/0/40",
param=["TEMP"],
)
result = tmp_source.execute(date_group=self.dates)
result = source.execute(dates=self.dates)

# DataFetcher instantiated once per requested date (inside retry helper)
assert mock_datafetcher_cls.call_count == n_dates
Expand Down
32 changes: 20 additions & 12 deletions tests/data_processors/sources/test_ftp_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import MagicMock

import pytest
from anemoi.datasets.create.input import FieldContext
from anemoi.datasets.dates import DatesProvider
from anemoi.datasets.dates.groups import GroupOfDates
from anemoi.utils.registry import Registry
Expand All @@ -14,6 +15,13 @@
class TestFTPSource:
"""Test suite for FTPSource class."""

context = FieldContext(
argument=None,
order_by="none",
flatten_grid=False,
remapping={},
use_grib_paramid=False,
)
dates = GroupOfDates(
[datetime(2020, 1, day) for day in range(1, 4)],
provider=DatesProvider.from_config(
Expand Down Expand Up @@ -53,13 +61,13 @@ def test_ftp_source_execute_basic(self) -> None:
mp.setattr("icenet_mp.data_processors.sources.ftp.load_one", mock_load_one)

# Execute
FTPSource._execute(
context={},
dates=self.dates,
source = FTPSource(
context=self.context,
url=r"ftp://example.com/data/file.nc",
user="testuser",
passwd="testpass", # noqa: S106
)
source.execute(dates=self.dates)

# Verify FTP session was created with correct credentials
mock_ftp_class.assert_called_once_with("example.com")
Expand All @@ -85,11 +93,11 @@ def test_ftp_source_execute_anonymous_login(self) -> None:
mp.setattr("icenet_mp.data_processors.sources.ftp.load_one", mock_load_one)

# Execute without providing user/passwd
FTPSource._execute(
context={},
dates=self.dates,
source = FTPSource(
context=self.context,
url=r"ftp://example.com/data/file.nc",
)
source.execute(dates=self.dates)

# Verify FTP session was created with correct credentials
mock_ftp_class.assert_called_once_with("example.com")
Expand Down Expand Up @@ -122,11 +130,11 @@ def test_ftp_source_execute_file_download(self) -> None:
)

# Execute with a complex URL
FTPSource._execute(
context={},
dates=self.dates,
source = FTPSource(
context=self.context,
url=r"ftp://data.server.com/archive/datasets/file.nc",
)
source.execute(dates=self.dates)

# Verify correct server was used
mock_ftp_class.assert_called_once_with("data.server.com")
Expand Down Expand Up @@ -158,11 +166,11 @@ def test_ftp_source_execute_pattern_substitution(self) -> None:
mp.setattr("icenet_mp.data_processors.sources.ftp.FTP", mock_ftp_class)
mp.setattr("icenet_mp.data_processors.sources.ftp.load_one", mock_load_one)

FTPSource._execute(
context={},
dates=self.dates,
source = FTPSource(
context=self.context,
url=r"ftp://example.com/data/{date:strftime(%Y%m%d)}.nc",
)
source.execute(dates=self.dates)

# Verify load_one was called with correct iso dates
calls = mock_load_one.call_args_list
Expand Down
Loading