-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathftp.py
More file actions
71 lines (62 loc) · 2.68 KB
/
ftp.py
File metadata and controls
71 lines (62 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import logging
from datetime import datetime
from ftplib import FTP
from ftplib import Error as FtpError
from pathlib import Path
from tempfile import TemporaryDirectory
from anemoi.datasets.create.input.context import Context
from anemoi.datasets.create.source import Source
from anemoi.datasets.create.sources import source_registry
from anemoi.datasets.create.sources.xarray import load_one
from anemoi.datasets.dates.groups import GroupOfDates
from earthkit.data.core.fieldlist import FieldList, MultiFieldList
from earthkit.data.utils.patterns import Pattern
from icenet_mp.utils import to_list
logger = logging.getLogger(__name__)
@source_registry.register("ftp")
class FTPSource(Source):
def __init__(
self,
context: Context,
*,
url: str,
passwd: str = "",
user: str = "anonymous",
) -> None:
"""Initialise the source."""
self.context = context
# Parse the FTP URL
self.ftp_args = {"passwd": passwd, "user": user}
self.server, self.path_pattern = url.replace("ftp://", "").split("/", 1)
def execute(self, dates: list[datetime] | GroupOfDates) -> FieldList:
"""Execute the data loading process from an FTP source."""
# Get list of remote file paths
remote_paths = {
date.isoformat(): to_list(
Pattern(self.path_pattern).substitute(date=date, allow_extra=True)
)
for date in dates
}
# Connect to the FTP server
field_lists: list[FieldList] = []
with TemporaryDirectory() as tmpdir, FTP(self.server) as session: # noqa: S321
base_path = Path(tmpdir)
session.login(**self.ftp_args)
# Iterate over remote paths
for iso_date, remote_path_list in remote_paths.items():
for remote_path in remote_path_list:
directory, filename = remote_path.rsplit("/", 1)
local_path = base_path / filename
try:
# Download the remote file
session.cwd(("/" + directory).replace("//", "/"))
with local_path.open("wb") as local_file:
session.retrbinary(f"RETR {filename}", local_file.write)
field_lists.append(
load_one("📂", self.context, [iso_date], str(local_path))
)
except FtpError as exc:
msg = f"Failed to download from '{remote_path}': {exc}"
logger.warning(msg)
# Combine all downloaded files into a MultiFieldList
return MultiFieldList(field_lists)