diff --git a/.github/workflows/ruff.yml b/.github/workflows/ruff.yml new file mode 100644 index 0000000..c73bc9d --- /dev/null +++ b/.github/workflows/ruff.yml @@ -0,0 +1,10 @@ +name: Ruff +on: [ pull_request ] +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/ruff-action@v3 + - name: Ruff format + run: ruff format --diff diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b8a83d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# directories +__pycache__ +.idea +.vscode + +# extensions +*.nc + +# files +poetry.lock diff --git a/README.md b/README.md index 937cbf6..7f14fa0 100644 --- a/README.md +++ b/README.md @@ -1 +1,279 @@ -# mds-toolbox +# Marine Data Store ToolBox + +This Python script provides a command-line interface (CLI) for downloading datasets using +[copernicusmarine toolbox](https://help.marine.copernicus.eu/en/collections/4060068-copernicus-marine-toolbox) +or [botos3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) + +[![boto3](https://img.shields.io/badge/boto3->1.34-blue.svg)](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) +[![copernicusmarine](https://img.shields.io/badge/copernicusmarine->1.06-blue.svg)](https://help.marine.copernicus.eu/en/collections/4060068-copernicus-marine-toolbox) +[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) + + +* [Marine Data Store ToolBox](#marine-data-store-toolbox) +* [How to Install it](#how-to-install-it) + * [Uninstall](#uninstall) +* [Usage](#usage) + * [S3 direct access](#s3-direct-access) + * [s3-get](#s3-get) + * [s3-list](#s3-list) + * [Wrapper for copernicusmarine](#wrapper-for-copernicusmarine) + * [Subset](#subset) + * [Get](#get) + * [File List](#file-list) + * [Etag](#etag) + * [Authors](#authors) + + +--- +# How to Install it + +Create the conda environment: + +```shell +mamba env create -f environment.yml +mamba activate mdsenv + +pip install . +``` + +## Uninstall + +To uninstall it: + +```shell +mamba activate mdsenv + +pip uninstall mds-toolbox +``` + +--- + +# Usage + +The script provides several commands for different download operations: + +```shell +Usage: mds [OPTIONS] COMMAND [ARGS]... + +Options: + -h, --help Show this message and exit. + +Commands: + etag Get the etag of a give S3 file + file-list Wrapper to copernicus marine toolbox file list + get Wrapper to copernicusmarine get + s3-get Download files with direct access to MDS using S3 + s3-list Listing file on MDS using S3 + subset Wrapper to copernicusmarine subset +``` + +--- + +## S3 direct access + +Since the copernicusmarine tool add a heavy overhead to s3 request, two functions has been developed to: + +* make very fast s3 request +* provide a thread-safe access to s3 client + +### s3-get + +```shell +Usage: mds s3-get [OPTIONS] + +Options: + -b, --bucket TEXT Bucket name [required] + -f, --filter TEXT Filter on the online files [required] + -o, --output-directory TEXT Output directory [required] + -p, --product TEXT The product name [required] + -i, --dataset-id TEXT Dataset Id [required] + -g, --dataset-version TEXT Dataset version or tag + -r, --recursive List recursive all s3 files + --threads INTEGER Downloading file using threads + -s, --subdir TEXT Dataset directory on mds (i.e. {year}/{month}) + - If present boost the connection + --overwrite Force overwrite of the file + --keep-timestamps After the download, set the correct timestamp + to the file + --sync-time Update the file if it changes on the server + using last update information + --sync-etag Update the file if it changes on the server + using etag information + --help Show this message and exit. +``` + +**Example** + +```shell +mds s3-get -i cmems_obs-ins_med_phybgcwav_mynrt_na_irr -b mdl-native-03 -g 202311 -p INSITU_MED_PHYBGCWAV_DISCRETE_MYNRT_013_035 -o "/work/antonio/20240320" -s latest/$(date -du +"%Y%m%d") --keep-timestamps --sync-etag -f $(date -du +"%Y%m%d") +``` + +**Example using threads** + +```shell +mds s3-get --threads 10 -i cmems_obs-ins_med_phybgcwav_mynrt_na_irr -b mdl-native-03 -g 202311 -p INSITU_MED_PHYBGCWAV_DISCRETE_MYNRT_013_035 -o "." -s latest/$(date -du +"%Y%m%d") --keep-timestamps --sync-etag -f $(date -du +"%Y%m%d") +``` + +### s3-list + +```shell +Usage: mds.py s3-list [OPTIONS] + +Options: + -b, --bucket TEXT Filter on the online files [required] + -f, --filter TEXT Filter on the online files [required] + -p, --product TEXT The product name [required] + -i, --dataset-id TEXT Dataset Id + -g, --dataset-version TEXT Dataset version or tag + -s, --subdir TEXT Dataset directory on mds (i.e. {year}/{month}) - + If present boost the connection + -r, --recursive List recursive all s3 files + --help Show this message and exit. +``` + +**Example** + +```shell +mds s3-list -b mdl-native-01 -p INSITU_GLO_PHYBGCWAV_DISCRETE_MYNRT_013_030 -i cmems_obs-ins_glo_phybgcwav_mynrt_na_irr -g 202311 -s "monthly/BO/202401" -f "*" | tr " " "\n" +``` + +**Example recursive** + +```shell +mds s3-list -b mdl-native-12 -p MEDSEA_ANALYSISFORECAST_PHY_006_013 -f '*' -r | tr " " "\n" +``` + +--- + +## Wrapper for copernicusmarine + +**The following functions rely on copernicusmarine implementation, the final result is strictly related to the installed +version** + +### Subset + +```shell +Usage: mds.py subset [OPTIONS] + +Options: + -o, --output-directory TEXT Output directory [required] + -f, --output-filename TEXT Output filename [required] + -i, --dataset-id TEXT Dataset Id [required] + -v, --variables TEXT Variables to download. Can be used multiple times + -x, --minimum-longitude FLOAT Minimum longitude for the subset. + -X, --maximum-longitude FLOAT Maximum longitude for the subset. + -y, --minimum-latitude FLOAT Minimum latitude for the subset. Requires a + float within this range: [-90<=x<=90] + -Y, --maximum-latitude FLOAT Maximum latitude for the subset. Requires a + float within this range: [-90<=x<=90] + -z, --minimum-depth FLOAT Minimum depth for the subset. Requires a + float within this range: [x>=0] + -Z, --maximum-depth FLOAT Maximum depth for the subset. Requires a + float within this range: [x>=0] + -t, --start-datetime TEXT Start datetime as: + %Y|%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d + %H:%M:%S|%Y-%m-%dT%H:%M:%S.%fZ + -T, --end-datetime TEXT End datetime as: + %Y|%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d + %H:%M:%S|%Y-%m-%dT%H:%M:%S.%fZ + -r, --dry-run Dry run + -g, --dataset-version TEXT Dataset version or tag + -n, --username TEXT Username + -w, --password TEXT Password + --help Show this message and exit. +``` + +**Example** + +```shell +mds subset -f output.nc -o . -i cmems_mod_glo_phy-thetao_anfc_0.083deg_P1D-m -x -18.16667 -X 1.0 -y 30.16 -Y 46.0 -z 0.493 -Z 5727.918000000001 -t 2025-01-01 -T 2025-01-01 -v thetao +``` + +### Get + +**Command**: + +```shell +Usage: mds.py get [OPTIONS] + +Options: + -f, --filter TEXT Filter on the online files + -o, --output-directory TEXT Output directory [required] + -i, --dataset-id TEXT Dataset Id [required] + -g, --dataset-version TEXT Dataset version or tag + -s, --service TEXT Force download through one of the available + services using the service name among + ['original-files', 'ftp'] or its short name + among ['files', 'ftp']. + -d, --dry-run Dry run + -u, --update If the file not exists, download it, otherwise + update it it changed on mds + -v, --dataset-version TEXT Dry run + -nd, --no-directories TEXT Option to not recreate folder hierarchy in + output directory + --force-download TEXT Flag to skip confirmation before download + --disable-progress-bar TEXT Flag to hide progress bar + -n, --username TEXT Username + -w, --password TEXT Password + --help Show this message and exi +``` + +**Example** + +```shell +mds get -f '20250210*_d-CMCC--TEMP-MFSeas9-MEDATL-b20250225_an-sv10.00.nc' -o . -i cmems_mod_med_phy-tem_anfc_4.2km_P1D-m +``` + +### File List + +To retrieve a list of file, use: + +```shell +Usage: mds.py file-list [OPTIONS] DATASET_ID MDS_FILTER + +Options: + -g, --dataset-version TEXT Dataset version or tag + --help Show this message and exit. +``` + +**Example** + +```shell +mds file-list cmems_mod_med_phy-cur_anfc_4.2km_PT15M-i *b20250225* -g 202411 +``` + +### Etag + +```shell +Usage: mds.py etag [OPTIONS] + +Options: + -e, --s3_file TEXT Path to a specific s3 file - if present, other + parameters are ignored. + -p, --product TEXT The product name + -d, --dataset_id TEXT The datasetID + -v, --version TEXT Force the selection of a specific dataset version + -s, --subdir TEXT Subdir structure on mds (i.e. {year}/{month}) + -f, --mds_filter TEXT Pattern to filter data (no regex) + --help Show this message and exit. +``` + +**Example** + +With a specific file: + +```shell +mds etag -e s3://mdl-native-12/native/MEDSEA_ANALYSISFORECAST_PHY_006_013/cmems_mod_med_phy-tem_anfc_4.2km_P1D-m_202411/2023/08/20230820_d-CMCC--TEMP-MFSeas9-MEDATL-b20240607_an-sv10.00.nc +``` + +Or: + +```shell +mds etag -p MEDSEA_ANALYSISFORECAST_PHY_006_013 -i cmems_mod_med_phy-cur_anfc_4.2km_PT15M-i -g 202411 -f '*b20241212*' -s 2024/12 +``` + +--- + +## Authors + +* Antonio Mariani - antonio.mariani@cmcc.it diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..7ba43d9 --- /dev/null +++ b/environment.yml @@ -0,0 +1,9 @@ +name: mdsenv +channels: + - conda-forge + - defaults +dependencies: + - python=3.10 + - copernicusmarine=1.2.3 + - boto3 >=1.37.4 + - click >=8.1.8 diff --git a/mds/__init__.py b/mds/__init__.py new file mode 100644 index 0000000..7154bda --- /dev/null +++ b/mds/__init__.py @@ -0,0 +1,21 @@ +from mds.conf import settings +from mds.core import mds_s3 +from mds.core import wrapper +from mds.utils.log import configure_logging + + +def setup(**kwargs) -> None: + """ + General mds-toolbox setup + + Args: + **kwargs: extra arguments to apply as app settings + """ + settings.configure(**kwargs) + configure_logging(settings.LOGGING_CONFIG, settings.LOGGING, settings.LOG_LEVEL) + + +__all__ = [ + mds_s3.__name__, + wrapper.__name__, +] diff --git a/mds/conf/__init__.py b/mds/conf/__init__.py new file mode 100644 index 0000000..6d377b4 --- /dev/null +++ b/mds/conf/__init__.py @@ -0,0 +1,34 @@ +from mds.conf import global_settings + +# List of modules to load settings from +TO_LOAD = [global_settings] + + +class Settings: + def __init__(self, *modules): + """ + Initialize the Settings instance with the provided modules. + + Args: + *modules: Variable length argument list of modules to load settings from. + """ + for module in modules: + for setting in dir(module): + if setting.isupper(): + setattr(self, setting, getattr(module, setting)) + + def configure(self, **ext_settings): + """ + Configure the settings instance by setting new values or overriding existing ones. + + Args: + **ext_settings: Arbitrary keyword arguments representing settings to be configured. + Only capital keywords are considered. + """ + for key, value in ext_settings.items(): + if key.isupper(): + setattr(self, key, value) + + +# Create a Settings instance as unique entry point to the app settings +settings = Settings(*TO_LOAD) diff --git a/mds/conf/global_settings.py b/mds/conf/global_settings.py new file mode 100644 index 0000000..1845bb2 --- /dev/null +++ b/mds/conf/global_settings.py @@ -0,0 +1,11 @@ +######################## +# LOG +######################## + +# The callable to use to configure logging +LOGGING_CONFIG = "logging.config.dictConfig" + +# Custom logging configuration. +LOGGING = {} + +LOG_LEVEL = "INFO" diff --git a/mds/core/__init__.py b/mds/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mds/core/copernicus.py b/mds/core/copernicus.py new file mode 100644 index 0000000..0e005d2 --- /dev/null +++ b/mds/core/copernicus.py @@ -0,0 +1,52 @@ +import pathlib +import copernicusmarine +import requests + +from typing import List +from mds.core import utils + + +GET_MANDATORY_ATTRS = ["filter", "output_directory", "dataset_id"] +SUBSET_MANDATORY_ATTRS = [ + "output_filename", + "output_directory", + "dataset_id", + "start_datetime", + "end_datetime", +] + + +def subset(**subset_kwargs) -> pathlib.Path: + subset_kwargs.update({"force_download": True, "disable_progress_bar": True}) + utils.check_dict_validity(subset_kwargs, SUBSET_MANDATORY_ATTRS) + + # download + utils.pprint_dict(subset_kwargs) + result = copernicusmarine.subset( + **subset_kwargs, + ) + return result + + +def get(**get_kwargs) -> List[pathlib.Path]: + utils.check_dict_validity(get_kwargs, GET_MANDATORY_ATTRS) + + # download + utils.pprint_dict(get_kwargs) + result = copernicusmarine.get(**get_kwargs, no_metadata_cache=True) + return result + + +def get_s3_native(product, dataset, version) -> str: + stac_url = f"https://stac.marine.copernicus.eu/metadata/{product}/{dataset}_{version}/dataset.stac.json" + response = requests.get(stac_url) + + # Check if the request was successful (status code 200) + if response.status_code != 200: + raise ValueError( + f"Unable to get native from: {stac_url} - {response.status_code}: {response.text}" + ) + + dataset_stac = response.json() + + return dataset_stac["assets"]["native"]["href"] diff --git a/mds/core/mds_s3.py b/mds/core/mds_s3.py new file mode 100644 index 0000000..cda9ee7 --- /dev/null +++ b/mds/core/mds_s3.py @@ -0,0 +1,158 @@ +import logging +import os +from multiprocessing import Pool +from typing import List + +from mds.core import s3_singleton +from mds.core import utils +from mds.core.s3file import S3File + +# conf +logger = logging.getLogger("mds") +THREADS_TIMEOUT = 10 * 60 + + +@utils.elapsed_time +def get_file_list( + s3_bucket: str, + product: str, + dataset_id: str, + dataset_version: str, + file_filter: str, + subdir: str, + recursive: bool, +): + my_s3 = s3_singleton.S3() + return my_s3.file_list( + s3_bucket, product, dataset_id, dataset_version, file_filter, subdir, recursive + ) + + +@utils.elapsed_time +def download_files( + s3_bucket: str, + product: str, + dataset_id: str, + dataset_version: str, + file_filter: str, + output_directory: str, + overwrite: bool = False, + keep_timestamps: bool = False, + subdir: str = None, + sync_etag: bool = False, + sync_time: bool = False, + n_threads=None, + recursive: bool = False, +): + files_list = get_file_list( + s3_bucket, product, dataset_id, dataset_version, file_filter, subdir, recursive + ) + + if len(files_list) == 0: + logger.error( + f"No match found for {file_filter} in {product}/{dataset_id}_{dataset_version}" + ) + raise FileNotFoundError( + f"No match found for {file_filter} in {product}/{dataset_id}_{dataset_version}" + ) + + if not n_threads: + _download( + files_list, + keep_timestamps, + output_directory, + overwrite, + sync_etag, + sync_time, + ) + else: + # if user select more threads than the files to download + if n_threads > len(files_list): + logger.warning(f"Resize number of threads to {len(files_list)}") + + files_list_split = utils.split_list_into_parts(files_list, n_threads) + if n_threads != len(files_list_split): + raise ValueError( + f"Mismatch between number of threads ({n_threads}) " + f"and threads parameters ({len(files_list_split)})" + ) + + # build input args for each threads + threads_args = [ + [ + files_list_split[i], + keep_timestamps, + output_directory, + overwrite, + sync_etag, + sync_time, + ] + for i in range(n_threads) + ] + + with Pool(processes=10) as pool: + # start threads + results: List = [ + pool.apply_async(_download, thread_args) for thread_args in threads_args + ] + try: + for r in results: + r.get(timeout=THREADS_TIMEOUT) + except TimeoutError: + logger.critical("Expired timeout") + raise + except Exception as e: + logger.critical(f"Pool error: '{e}'") + raise + + +def _download( + files_list, keep_timestamps, output_directory, overwrite, sync_etag, sync_time +): + """Download files from S3 to local filesystem""" + try: + my_s3 = s3_singleton.S3() + for s3_file in files_list: + filename = os.path.basename(s3_file.file) + dest_file = str(os.path.join(output_directory, filename)) + + # check if file must me download + if not file_can_be_downloaded( + s3_file, dest_file, overwrite, sync_time, sync_etag + ): + logger.warning(f"Skipping {s3_file} - local: {dest_file}") + continue + + os.makedirs(os.path.dirname(dest_file), exist_ok=True) + + logger.info(f"Downloading: {s3_file.file} as {dest_file}") + my_s3.download(s3_file.bucket, s3_file.file, dest_file) + + if keep_timestamps: + logger.info( + f"Set original last modified: {s3_file.last_modified} to {dest_file}" + ) + last_modified_timestamp = s3_file.last_modified.timestamp() + os.utime(dest_file, (last_modified_timestamp, last_modified_timestamp)) + + except BaseException as e: + logger.critical(f"Error: {e}") + return + + +def file_can_be_downloaded( + s3_file: S3File, dest_file: str, overwrite: bool, sync_time: bool, sync_etag: bool +) -> bool: + """Check if the destination file can be downloaded/overwritten""" + if not os.path.exists(dest_file) or overwrite: + return True + + if sync_time and not utils.timestamp_match(dest_file, s3_file.last_modified): + logger.info(f"{dest_file} is outdated") + return True + + if sync_etag and not utils.etag_match(dest_file, s3_file.etag): + logger.info(f"{dest_file} is outdated") + return True + + return False diff --git a/mds/core/s3_singleton.py b/mds/core/s3_singleton.py new file mode 100644 index 0000000..3903936 --- /dev/null +++ b/mds/core/s3_singleton.py @@ -0,0 +1,144 @@ +import fnmatch +import glob +import logging +import multiprocessing +import os + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + +from mds.core.s3file import S3File + +# conf +logger = logging.getLogger("mds") +lock = multiprocessing.Lock() +S3_ENDPOINT = "https://s3.waw3-1.cloudferro.com" + + +class Singleton(type): + """Multiprocessing safe implementation of a singleton class""" + + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + with lock: + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__( + *args, **kwargs + ) + return cls._instances[cls] + + +def clean_corrupted_file(dest_file: str): + """Removes corrupted file if it exists""" + # generally s3_client download the file as dest_file.STRING + files_to_clean = glob.glob(f"{dest_file}.*") + for file_to_clean in files_to_clean: + logger.info(f"Cleaning {file_to_clean}") + os.remove(file_to_clean) + + +def build_s3_path(dataset, products, subdir, version): + """Build the s3 path with the provided information""" + s3_path = f"native/{products}" + if dataset: + s3_path += f"/{dataset}" + if version: + s3_path += f"_{version}" + if subdir: + s3_path += f"/{subdir}" + return s3_path + + +class S3(metaclass=Singleton): + """ + Multiprocessing safe implementation of a singleton class to provide a unique client connection to a s3 endpoint. + """ + + _instance = None + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super().__new__(cls, *args, **kwargs) + return cls._instance + + def __init__(self, s3_endpoint=S3_ENDPOINT): + s3_endpoint = s3_endpoint + self.__s3 = boto3.client( + "s3", endpoint_url=s3_endpoint, config=Config(signature_version=UNSIGNED) + ) + self.__paginator = self.__s3.get_paginator("list_objects_v2") + + @property + def paginator(self): + return self.__paginator + + @property + def s3(self): + return self.__s3 + + def file_list( + self, + s3_bucket: str, + products: str, + dataset: str, + version: str, + file_filter: str, + subdir: str = None, + recursive: bool = False, + ) -> list[S3File]: + """ + Listing file on s3 bucket and return the list of file that match the provided filter + :param s3_bucket: Name of s3 bucket + :param products: Product ID + :param dataset: Dataset ID + :param version: Dataset version + :param file_filter: A pattern that must match the absolute paths of the files to download. + :param subdir: Dataset subdirectory + :param recursive: If True, recursively list the files + :return: A list of S3Files found in the s3 paths that match the file_filter + :raise: A FileNotFoundError if the path on s3 doesn't exist or if no file are found + """ + files_found = [] + paginator = self.__paginator + delimiter = "*" if recursive else "/" + + s3_path = build_s3_path(dataset, products, subdir, version) + logger.info(f"Listing files in {s3_bucket}/{s3_path}") + + for s3_result in paginator.paginate( + Bucket=s3_bucket, Prefix=f"{s3_path}/", Delimiter=delimiter + ): + if "Contents" not in s3_result: + raise FileNotFoundError(f"No result found for {s3_bucket}/{s3_path}") + + for content in s3_result["Contents"]: + etag = content["ETag"].replace('"', "") + s3_file = content["Key"] + last_modified = content["LastModified"] + if fnmatch.fnmatch(s3_file, f"*{file_filter}*"): + files_found.append(S3File(s3_bucket, s3_file, etag, last_modified)) + + return files_found + + def download(self, s3_bucket: str, s3_file: str, dest_file: str) -> None: + """ + Download a file from s3 bucket + :param s3_bucket: Name of s3 bucket + :param s3_file: Absolute path to s3 file to download (i.e. s3://bucket/product/dataset_version/yyyy/mm/file.nc) + :param dest_file: Destination file to download + """ + s3 = self.s3 + try: + clean_corrupted_file(dest_file) + s3.download_file(s3_bucket, s3_file, dest_file) + if not os.path.isfile(dest_file): + raise RuntimeError( + f"Unable to download {s3_file} as {dest_file}, unknown error" + ) + except BaseException as e: + logger.critical(f"An error occurs during the download: {e}") + clean_corrupted_file(dest_file) + raise e diff --git a/mds/core/s3file.py b/mds/core/s3file.py new file mode 100644 index 0000000..d461852 --- /dev/null +++ b/mds/core/s3file.py @@ -0,0 +1,15 @@ +import datetime +from typing import NamedTuple + + +class S3File(NamedTuple): + bucket: str + file: str + etag: str + last_modified: datetime.datetime + + def __repr__(self): + return f" str: + """returns the current working directory""" + return os.getcwd() + + +def another_instance_in_execution(pid_file: str) -> bool: + """Try to read pid from a pid file to check if there is another instance in execution""" + if os.path.exists(pid_file): + with open(pid_file) as f: + pid = int(f.read().strip()) + return pid_is_running(pid) + else: + return False + + +def pid_is_running(pid: int) -> bool: + """Check For the existence of a unix pid. 0 signal has no effect on the process""" + try: + # 0 signal doesn't have any effect + os.kill(pid, 0) + except OSError: + return False + else: + return True + + +def pprint_dict(my_dict: dict) -> None: + """use json format to pretty print a dictionary""" + pretty_dict = json.dumps(my_dict, indent=4) + logger.info(pretty_dict) + + +def check_dict_validity(my_dict: dict, mandatory_attrs: list) -> None: + # check if all values are correct + for k, v in my_dict.items(): + if v is None and k in mandatory_attrs: + raise ValueError(f"Missing value for '{k}'") + + # in case of list arguments check if they contain values + if isinstance(v, Sequence) and len(v) == 0: + raise ValueError(f"Empty value for '{k}'") + + # check if mandatory attrs are present + my_dict_attrs = set(my_dict.keys()) + missing_attrs = set(mandatory_attrs) - my_dict_attrs + if missing_attrs: + raise ValueError(f"Missing attributes: {', '.join(missing_attrs)}") + + +def get_temporary_directory(output_filename: str, base_directory: str) -> str: + """ + Given a filename, a uniquely identifiable temporary directory is generated within the desired directory + + :param output_filename: The output filename used to generate the temporary directory + :param base_directory: The path where to generate the base_directory + """ + md5_filename = hashlib.md5(output_filename.encode()).hexdigest() + + return os.path.join(base_directory, f".{md5_filename}") + + +def mv_overwrite(file: str, output_directory: str): + filename = os.path.basename(file) + dest_file = str(os.path.join(output_directory, filename)) + shutil.move(file, dest_file) + + +def compute_md5(file_path: str): + md5_hash = hashlib.md5() + with open(file_path, "rb") as f: + # Read and update hash string value in blocks of 4K + for byte_block in iter(lambda: f.read(4096), b""): + md5_hash.update(byte_block) + return md5_hash.hexdigest() + + +def factor_of_1_mb(filesize, num_parts): + x = filesize / int(num_parts) + y = x % 1048576 + return int(x + 1048576 - y) + + +def calc_etag(input_file, part_size): + md5_digests = [] + with open(input_file, "rb") as f: + for chunk in iter(lambda: f.read(part_size), b""): + md5_digests.append(hashlib.md5(chunk).digest()) + return hashlib.md5(b"".join(md5_digests)).hexdigest() + "-" + str(len(md5_digests)) + + +def possible_partsizes(filesize, num_parts): + return ( + lambda part_size: part_size < filesize + and (float(filesize) / float(part_size)) <= num_parts + ) + + +def compute_etag(input_file, etag): + filesize = os.path.getsize(input_file) + num_parts = int(etag.split("-")[1]) + + partsizes = [ # Default Partsizes Map + 8388608, # aws_cli/boto3 + 15728640, # s3cmd + factor_of_1_mb( + filesize, num_parts + ), # Used by many clients to upload large files + ] + + for part_size in filter(possible_partsizes(filesize, num_parts), partsizes): + if etag == calc_etag(input_file, part_size): + return True + + return False + + +def elapsed_time(func: Callable): + def decorator(*args, **kwargs): + start_time = time.perf_counter() + func_args = f"args={args}" if args else "args=None" + func_kwargs = f"kwargs={kwargs}" if kwargs else "kwargs=None" + result = func(*args, **kwargs) + elaps_time = time.perf_counter() - start_time + logger.info( + f"Elapsed time: {elaps_time} (s) - {func.__name__}({func_args}, {func_kwargs})" + ) + return result + + return decorator + + +def etag_match(dest_file, digest: str): + if "-" in digest: + logger.debug("Comparing Etag") + local_digest = compute_etag() + else: + logger.debug("Comparing md5") + local_digest = compute_md5(dest_file) + + return local_digest == digest + # use local file to store local etag info + # output_directory = os.path.dirname(dest_file) + # filename = os.path.basename(dest_file) + # + # sync_file = str(os.path.join(output_directory, SYNC_FILE)) + # db_file = str(os.path.join(output_directory, DB_FILE)) + # + # if not os.path.exists(dest_file): + # return False + # + # with open(sync_file, 'a') as s: + # with file_lock(s): + # try: + # with open(db_file, 'r') as f_read: + # data = json.load(f_read) + # except FileNotFoundError: + # data = {} + # + # if filename not in data: + # return False + # + # return data[filename] == etag + + +def timestamp_match(dest_file, remote_last_modified: datetime.datetime): + local_last_modified = os.path.getmtime(dest_file) + return local_last_modified == remote_last_modified.timestamp() + + +def split_list_into_parts(input_list: list, m): + # Calculate the length of each part + part_length = len(input_list) // m + # Initialize an empty list to store parts + parts = [] + # Split the list into parts + for i in range(m): + start = i * part_length + # For the last part, include remaining elements + end = (i + 1) * part_length if i < m - 1 else None + parts.append(input_list[start:end]) + return parts diff --git a/mds/core/wrapper.py b/mds/core/wrapper.py new file mode 100644 index 0000000..a7ad8e1 --- /dev/null +++ b/mds/core/wrapper.py @@ -0,0 +1,325 @@ +import contextlib +import fcntl +import fnmatch +import glob +import json +import logging +import os +import shutil +import tempfile +from collections import namedtuple +from typing import List + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + +from mds.core import utils, copernicus +from mds.core.utils import etag_match + +logger = logging.getLogger("mds") + +# conf +DOWNLOAD_MODES = ["subset", "get"] +S3_ENDPOINT = "https://s3.waw3-1.cloudferro.com" +S3_FILE = namedtuple("S3_FILE", ["name", "etag", "last_modified"]) +SYNC_FILE = ".sync" +DB_FILE = ".etag.json" + + +def extract_s3_info_from_path(s3_file: str): + s3_file = s3_file.removeprefix("s3://") + s3_bucket = s3_file.split("/")[0] + s3_path = s3_file.removeprefix(f"{s3_bucket}/") + return s3_bucket, s3_path + + +@utils.elapsed_time +def mds_etag( + s3_file, product, dataset_id, version, subdir, mds_filter +) -> List[S3_FILE]: + s3_endpoint = S3_ENDPOINT + s3 = boto3.client( + "s3", endpoint_url=s3_endpoint, config=Config(signature_version=UNSIGNED) + ) + paginator = s3.get_paginator("list_objects_v2") + + if s3_file: + s3_bucket, s3_path = extract_s3_info_from_path(s3_file) + else: + s3_bucket, s3_path = get_s3_info( + s3_endpoint, product, dataset_id, version, subdir + ) + + logger.debug(f"Listing files in {s3_bucket}/{s3_path}") + + files_found = [] + for s3_result in paginator.paginate(Bucket=s3_bucket, Prefix=s3_path): + if "Contents" not in s3_result: + raise ValueError(f"No result found for {s3_bucket}/{s3_path}") + + contents = s3_result["Contents"] + for content in contents: + etag = content["ETag"].replace('"', "") + s3_file = content["Key"] + last_modified = content["LastModified"] + if not mds_filter or fnmatch.fnmatch(s3_file, mds_filter): + files_found.append(S3_FILE(s3_file, etag, last_modified)) + + return files_found + + +def get_s3_info(s3_endpoint, product, dataset, version, subdir): + """ + Query copernicium stat web site to retrieve info about s3 bucket and s3 path + """ + href_native = copernicus.get_s3_native(product, dataset, version) + native_complete_no_endpoint = href_native.replace(f"{s3_endpoint}/", "") + s3_bucket = native_complete_no_endpoint.split("/")[0] + s3_path = native_complete_no_endpoint.removeprefix(f"{s3_bucket}/") + s3_path = f"{s3_path}/{subdir}" + return s3_bucket, s3_path + + +@utils.elapsed_time +def mds_list(dataset_id, mds_filter: str, quiet=True, dataset_version=None) -> List: + # mds write as default this file - name cannot be chosen via python + mds_output_filename = "files_to_download.txt" + tempdir = tempfile.mkdtemp() + output_file = f"{tempdir}/{mds_output_filename}" + + # mds_output_filename is ignored + mds_get_list_attrs = { + "dataset_id": dataset_id, + "filter": mds_filter, + "output_directory": tempdir, + "create_file_list": mds_output_filename, + "force_download": True, + "disable_progress_bar": True, + "dataset_version": dataset_version, + } + + try: + if quiet: + # shutdown copernicus logger + logging.getLogger("copernicus_marine_root_logger").setLevel("ERROR") + # logging.getLogger("mds").setLevel("ERROR") + copernicus.get(**mds_get_list_attrs) + except SystemExit: + pass + + if not os.path.exists(output_file): + raise ValueError("An error occurred") + + with open(output_file, "r") as f: + data = f.readlines() + + shutil.rmtree(tempdir, ignore_errors=True) + + return [f.strip() for f in data] + + +@utils.elapsed_time +def mds_download( + download_mode: str, + dry_run: bool = False, + overwrite: bool = False, + **kwargs, +): + """ + Wrapper around copernicusmarine too to add missing features: + - don't download a file if already exists locally + - download only missing files + - sync multiple attempts to download the same file multiple times + - re-download the file in case of a previous failed download + + A temporary directory is used for each download. The temporary directory is unique for each output_filename, + and it's obtained using the md5 of the output_filename. Only when the downloaded file is ok the temporary directory + is deleted and the file moved in output directory. + + + :param download_mode: copernicus download mode: subset|get + :param dry_run: Print only the downloading info without actually downloading the file + :param overwrite: Force the download of the file also if it already exists locally + """ + # check if download mode is supported + if download_mode not in DOWNLOAD_MODES: + raise ValueError(f"Download mode not supported: '{download_mode}'") + logger.info(f"Download mode: {download_mode}") + + # get mandatory args + output_filename = ( + kwargs["output_filename"] if download_mode == "subset" else kwargs["filter"] + ) + output_directory = kwargs["output_directory"] + + # set output directory + if output_directory is None: + output_directory = utils.cwd() + logger.info(f"Output directory: {output_directory}") + + # check if the file is already present + logger.info(f"Output filename: {output_filename}") + destination_file = os.path.join(output_directory, output_filename) + files_found = glob.glob(destination_file) + if not overwrite and len(files_found) > 0: + logger.info(f"File already exists: {', '.join(files_found)}") + return + + # get temporary directory where to download the file + temporary_dl_directory = utils.get_temporary_directory( + output_filename=output_filename, + base_directory=output_directory, + ) + logger.debug(f"Temporary directory: {temporary_dl_directory}") + + # pid + pid_file = os.path.join(temporary_dl_directory, ".pid") + + # check if another download is ongoing or a zombie directory is present + if os.path.exists(temporary_dl_directory): + logger.debug(f"Found temporary directory: {temporary_dl_directory}") + if os.path.exists(pid_file) and utils.another_instance_in_execution(pid_file): + logger.info( + f"Another download process already exists: {pid_file}, nothing to do..." + ) + return + + # an error must occur in the previous download, restart it + logger.info("Zombie download dir found, cleaning it") + shutil.rmtree(temporary_dl_directory) + + # safe mkdir and write pid + try: + os.makedirs(temporary_dl_directory, exist_ok=False) + with open(pid_file, "w") as f: + f.write(f"{os.getpid()}\n") + except OSError as e: + # if two processes start in the same moment the previous pid check can fail + logger.error( + f"Cannot create temporary directory: {temporary_dl_directory}, possible conflict ongoing" + ) + raise e + + if dry_run: + return + + kwargs["output_directory"] = temporary_dl_directory + download_func = get_download_func(download_mode) + try: + download_func(**kwargs) + except SystemExit as e: + logger.error(f"An error occurs during download: {e}") + + # check if the file is not on mds + dataset_id = kwargs["dataset_id"] + mds_filter = kwargs["filter"] + file_list = mds_list(dataset_id, mds_filter, quiet=False) + if len(file_list) == 0: + shutil.rmtree(temporary_dl_directory) + raise FileNotFoundError(f"No match found for {mds_filter} if {dataset_id}") + shutil.rmtree(temporary_dl_directory) + raise e + except Exception as e: + logger.error(f"An error occurs during downloading {kwargs}: {e}") + shutil.rmtree(temporary_dl_directory) + raise e + + # move in output_directory + for file in glob.glob(os.path.join(temporary_dl_directory, "*")): + logger.info(f"mv {file} to {output_directory}") + utils.mv_overwrite(file, output_directory) + + logger.info(f"Removing temporary directory: {temporary_dl_directory}") + shutil.rmtree(temporary_dl_directory) + + +def get_download_func(download_mode): + if download_mode == "subset": + return copernicus.subset + if download_mode == "get": + return copernicus.get + + raise ValueError(f"Unknown download mode: {download_mode}") + + +@utils.elapsed_time +def mds_update_download(**kwargs): + mds_filter = kwargs["filter"] + output_directory = kwargs["output_directory"] + + # get list of files + dataset_id = kwargs["dataset_id"] + s3_files_list = mds_list(dataset_id, mds_filter, quiet=False) + + if len(s3_files_list) == 0: + raise FileNotFoundError( + f"No matching files found for {dataset_id}/{mds_filter} on mds" + ) + + # for each file get etag + s3_files = [] + for s3_file in s3_files_list: + logger.info(f"Try to obtain Etag for: {s3_file}") + s3_files.extend(mds_etag(s3_file, *[None for _ in range(5)])) + + # download + for s3_file in s3_files: + filename = os.path.basename(s3_file.name) + dest_file = str(os.path.join(output_directory, filename)) + + if os.path.exists(dest_file) and etag_match(dest_file, s3_file.etag): + logger.info(f"{s3_file} already updated, nothing to do...") + continue + + bk_file = f"{dest_file}.bk" + if os.path.exists(dest_file): + logger.info(f"New version available for {s3_file.name}") + logger.info(f"Creating backup file: {bk_file}") + shutil.move(dest_file, bk_file) + + kwargs["filter"] = filename + mds_download("get", **kwargs) + + # update json file + # update_etag(filename, output_directory, s3_file.etag) + + if os.path.exists(bk_file): + logger.info(f"Removing backup file: {bk_file}") + os.remove(bk_file) + + +@utils.elapsed_time +def update_etag(filename, output_directory, etag): + sync_file = str(os.path.join(output_directory, SYNC_FILE)) + with open(sync_file, "a") as s: + with file_lock(s): + db_file = str(os.path.join(output_directory, DB_FILE)) + try: + with open(db_file, "r") as f_read: + data = json.load(f_read) + except FileNotFoundError: + data = {} + + data[filename] = etag + + with open(db_file, "w") as f_write: + json.dump(data, f_write, indent=4) + + +@contextlib.contextmanager +def file_lock(file): + try: + fcntl.lockf(file, fcntl.LOCK_EX) + yield + finally: + fcntl.lockf(file, fcntl.LOCK_UN) + + +@utils.elapsed_time +def download_file(*args, **kwargs): + print("Downloading") + + +def log(): + logger.info("I'm wrapper") diff --git a/mds/mng/__init__.py b/mds/mng/__init__.py new file mode 100644 index 0000000..6a94282 --- /dev/null +++ b/mds/mng/__init__.py @@ -0,0 +1,16 @@ +import logging +import traceback + +from mds.mng.cli import cli + +logger = logging.getLogger("mds") + + +def start_from_command_line_interface(): + """Access point to CLI API""" + try: + cli() + except Exception as e: + logger.debug(traceback.format_exc()) + logger.error(e) + exit(1) diff --git a/mds/mng/cli.py b/mds/mng/cli.py new file mode 100644 index 0000000..987421c --- /dev/null +++ b/mds/mng/cli.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +""" +CLI Interface for mds-toolbox. +""" + +__author__ = "Antonio Mariani" +__email__ = "antonio.mariani@cmcc.it" + +import click + +from mds.mng.s3_group import s3_cli +from mds.mng.wrapper_group import wrapper_cli + + +@click.command( + cls=click.CommandCollection, + sources=[s3_cli, wrapper_cli], + context_settings=dict(help_option_names=["-h", "--help"]), +) +def cli(): + pass + + +if __name__ == "__main__": + cli() diff --git a/mds/mng/initializer.py b/mds/mng/initializer.py new file mode 100644 index 0000000..8915980 --- /dev/null +++ b/mds/mng/initializer.py @@ -0,0 +1,39 @@ +"""Entry point to mds-toolbox API""" + +from functools import update_wrapper + +import click + +import mds + + +def init_app(f): + """Initialize the application functions before the execution""" + + @click.pass_context + def setup(ctx, **kwargs): + """ + Extract app settings and configure application setup, then start mds functions. + + Args: + **kwargs: Arbitrary keyword arguments passed to the function. + """ + user_settings = dict() + # Assume that upper settings are app settings related + upper_keys = [k for k in kwargs.keys() if k.isupper()] + + # Remove upper settings from plot arguments to avoid crash in PlotSettings class + for k in upper_keys: + user_settings[k] = kwargs.pop(k) + + # Perform general app initialization + mds.setup(**user_settings) + + # Start application + return ctx.invoke(f, **kwargs) + + # Return the setup function which wraps the original function + return update_wrapper(setup, f) + + +# Return the decorator function diff --git a/mds/mng/s3_group.py b/mds/mng/s3_group.py new file mode 100644 index 0000000..7232e81 --- /dev/null +++ b/mds/mng/s3_group.py @@ -0,0 +1,170 @@ +import click + +from mds import mds_s3 +from mds.core import wrapper +from mds.mng import initializer + +verbose = click.option( + "--log-level", + "LOG_LEVEL", + type=click.Choice(["DEBUG", "INFO", "WARN", "ERROR", "CRITICAL", "QUIET"]), + default="INFO", + help="Verbosity level based on standard logging library", +) + + +@click.group(help="Manage S3 groups") +def s3_cli() -> None: + pass + + +@s3_cli.command() +@click.option( + "-e", + "--s3_file", + type=str, + default=None, + help="Path to a specific s3 file - if present, other parameters are ignored.", +) +@click.option("-p", "--product", type=str, default=None, help="The product name") +@click.option("-i", "--dataset_id", type=str, default=None, help="The datasetID") +@click.option( + "-g", + "--version", + type=str, + default=None, + help="Force the selection of a specific dataset version", +) +@click.option( + "-s", + "--subdir", + type=str, + default=None, + help="Subdir structure on mds (i.e. {year}/{month})", +) +@click.option( + "-f", + "--mds_filter", + type=str, + default=None, + help="Pattern to filter data (no regex)", +) +@verbose +@initializer.init_app +def etag(**kwargs): + """Get the etag of a give S3 file""" + s3_files = wrapper.mds_etag(**kwargs) + for s3_file in s3_files: + print(f"{s3_file.name} {s3_file.etag}") + + +@s3_cli.command() +@click.option( + "-b", "--bucket", "s3_bucket", required=True, type=str, help="Bucket name" +) +@click.option( + "-f", + "--filter", + "file_filter", + required=True, + type=str, + help="Filter on the online files", +) +@click.option( + "-o", "--output-directory", required=True, type=str, help="Output directory" +) +@click.option("-p", "--product", required=True, type=str, help="The product name") +@click.option("-i", "--dataset-id", required=True, type=str, help="Dataset Id") +@click.option( + "-g", "--dataset-version", type=str, default=None, help="Dataset version or tag" +) +@click.option( + "-r", "--recursive", is_flag=True, default=False, help="List recursive all s3 files" +) +@click.option( + "--threads", + "n_threads", + type=int, + default=None, + help="Downloading file using threads", +) +@click.option( + "-s", + "--subdir", + type=str, + default=None, + help="Dataset directory on mds (i.e. {year}/{month}) - If present boost the connection", +) +@click.option( + "--overwrite", + required=False, + is_flag=True, + default=False, + help="Force overwrite of the file", +) +@click.option( + "--keep-timestamps", + required=False, + is_flag=True, + default=False, + help="After the download, set the correct timestamp to the file", +) +@click.option( + "--sync-time", + required=False, + is_flag=True, + default=False, + help="Update the file if it changes on the server using last update information", +) +@click.option( + "--sync-etag", + required=False, + is_flag=True, + default=False, + help="Update the file if it changes on the server using etag information", +) +@verbose +@initializer.init_app +def s3_get(**kwargs): + """Download files with direct access to MDS using S3""" + mds_s3.download_files(**kwargs) + + +@s3_cli.command() +@click.option( + "-b", + "--bucket", + "s3_bucket", + required=True, + type=str, + help="Filter on the online files", +) +@click.option( + "-f", + "--filter", + "file_filter", + required=True, + type=str, + help="Filter on the online files", +) +@click.option("-p", "--product", required=True, type=str, help="The product name") +@click.option("-i", "--dataset-id", required=False, type=str, help="Dataset Id") +@click.option( + "-g", "--dataset-version", type=str, default=None, help="Dataset version or tag" +) +@click.option( + "-s", + "--subdir", + type=str, + default=None, + help="Dataset directory on mds (i.e. {year}/{month}) - If present boost the connection", +) +@click.option( + "-r", "--recursive", is_flag=True, default=False, help="List recursive all s3 files" +) +@verbose +@initializer.init_app +def s3_list(**kwargs): + """Listing file on MDS using S3""" + s3_files = mds_s3.get_file_list(**kwargs) + print(f"{' '.join([f.file for f in s3_files])}") diff --git a/mds/mng/wrapper_group.py b/mds/mng/wrapper_group.py new file mode 100644 index 0000000..d41479e --- /dev/null +++ b/mds/mng/wrapper_group.py @@ -0,0 +1,151 @@ +import click + +from mds import wrapper +from mds.mng import initializer + +verbose = click.option( + "--log-level", + "LOG_LEVEL", + type=click.Choice(["DEBUG", "INFO", "WARN", "ERROR", "CRITICAL", "QUIET"]), + default="INFO", + help="Verbosity level based on standard logging library", +) + + +@click.group() +def wrapper_cli() -> None: + pass + + +@wrapper_cli.command() +@click.argument("dataset_id", type=str) +@click.argument("mds_filter", type=str) +@click.option( + "-g", "--dataset-version", type=str, default=None, help="Dataset version or tag" +) +@verbose +@initializer.init_app +def file_list(*args, **kwargs): + """Wrapper to copernicus marine toolbox file list""" + mds_file_list = wrapper.mds_list(*args, **kwargs) + print(f"{' '.join(mds_file_list)}") + + +@wrapper_cli.command() +@click.option( + "-o", "--output-directory", required=True, type=str, help="Output directory" +) +@click.option( + "-f", "--output-filename", required=True, type=str, help="Output filename" +) +@click.option("-i", "--dataset-id", required=True, type=str, help="Dataset Id") +@click.option( + "-v", "--variables", multiple=True, type=str, help="Variables to download" +) +@click.option( + "-x", "--minimum-longitude", type=float, help="Minimum longitude for the subset." +) +@click.option( + "-X", "--maximum-longitude", type=float, help="Maximum longitude for the subset. " +) +@click.option( + "-y", + "--minimum-latitude", + type=float, + help="Minimum latitude for the subset. Requires a float within this range: [-90<=x<=90]", +) +@click.option( + "-Y", + "--maximum-latitude", + type=float, + help="Maximum latitude for the subset. Requires a float within this range: [-90<=x<=90]", +) +@click.option( + "-z", + "--minimum-depth", + type=float, + help="Minimum depth for the subset. Requires a float within this range: [x>=0]", +) +@click.option( + "-Z", + "--maximum-depth", + type=float, + help="Maximum depth for the subset. Requires a float within this range: [x>=0]", +) +@click.option( + "-t", + "--start-datetime", + type=str, + default=False, + help="Start datetime as: %Y|%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S|%Y-%m-%dT%H:%M:%S.%fZ", +) +@click.option( + "-T", + "--end-datetime", + type=str, + default=False, + help="End datetime as: %Y|%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S|%Y-%m-%dT%H:%M:%S.%fZ", +) +@click.option("-r", "--dry-run", is_flag=True, default=False, help="Dry run") +@click.option( + "-g", "--dataset-version", type=str, default=None, help="Dataset version or tag" +) +@click.option("-n", "--username", type=str, default=None, help="Username") +@click.option("-w", "--password", type=str, default=None, help="Password") +@verbose +@initializer.init_app +def subset(**kwargs): + """Wrapper to copernicusmarine subset""" + wrapper.mds_download("subset", **kwargs) + + +@wrapper_cli.command() +@click.option( + "-f", "--filter", required=False, type=str, help="Filter on the online files" +) +@click.option( + "-o", "--output-directory", required=True, type=str, help="Output directory" +) +@click.option("-i", "--dataset-id", required=True, type=str, help="Dataset Id") +@click.option( + "-g", "--dataset-version", type=str, default=None, help="Dataset version or tag" +) +# @click.option('-s', '--service', type=str, default='files', +# help="Force download through one of the available services using the service name among " +# "['original-files', 'ftp'] or its short name among ['files', 'ftp'].") +@click.option("-d", "--dry-run", is_flag=True, default=False, help="Dry run") +@click.option( + "-u", + "--update", + is_flag=True, + default=False, + help="If the file not exists, download it, otherwise update it it changed on mds", +) +@click.option("-v", "--dataset-version", type=str, default=None, help="Dry run") +@click.option( + "-nd", + "--no-directories", + type=str, + default=True, + help="Option to not recreate folder hierarchy in output directory", +) +@click.option( + "--force-download", + type=str, + default=True, + help="Flag to skip confirmation before download", +) +@click.option( + "--disable-progress-bar", type=str, default=True, help="Flag to hide progress bar" +) +@click.option("-n", "--username", type=str, default=None, help="Username") +@click.option("-w", "--password", type=str, default=None, help="Password") +@verbose +@initializer.init_app +def get(**kwargs): + """Wrapper to copernicusmarine get""" + update = kwargs.pop("update") + if update: + wrapper.mds_update_download(**kwargs) + else: + wrapper.mds_download("get", **kwargs) diff --git a/mds/utils/__init__.py b/mds/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mds/utils/log.py b/mds/utils/log.py new file mode 100644 index 0000000..a7260ff --- /dev/null +++ b/mds/utils/log.py @@ -0,0 +1,61 @@ +import logging.config + +from mds.conf import settings +from mds.utils.module_loading import import_string + +LOGGER_NAME = "mds" + +DEFAULT_LOGGING = { + "disable_existing_loggers": False, + "formatters": { + "blank": {"format": "%(message)s"}, + "simple": { + "datefmt": "%Y-%m-%dT%H:%M:%SZ", + "format": "[%(asctime)s] - %(levelname)s - %(message)s", + }, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "formatter": "simple", + "level": "DEBUG", + "stream": "ext://sys.stderr", + }, + }, + "loggers": { + LOGGER_NAME: { + "handlers": ["console"], + "level": settings.LOG_LEVEL, + }, + }, + "version": 1, +} + + +def configure_logging( + logging_config: str, logging_settings: dict, log_level: str +) -> None: + """ + + Args: + logging_config: Callable to use to configure logging + logging_settings: Dictionary of logging settings + log_level: Logging level + """ + if logging_config: + logging_config_func = import_string(logging_config) + + # Call custom logging config function first to avoid overrides + if logging_settings: + logging_config_func(logging_settings) + + if log_level: + # Update logging level before applying default config + DEFAULT_LOGGING["loggers"][LOGGER_NAME]["level"] = log_level + + # Apply the updated logging config + logging.config.dictConfig(DEFAULT_LOGGING) + + # Ensure logger picks up the new level + logger = logging.getLogger(LOGGER_NAME) + logger.setLevel(log_level) diff --git a/mds/utils/module_loading.py b/mds/utils/module_loading.py new file mode 100644 index 0000000..ac485e0 --- /dev/null +++ b/mds/utils/module_loading.py @@ -0,0 +1,32 @@ +import sys +from importlib import import_module + + +def cached_import(module_path, class_name): + # Check whether module is loaded and fully initialized. + if not ( + (module := sys.modules.get(module_path)) + and (spec := getattr(module, "__spec__", None)) + and getattr(spec, "_initializing", False) is False + ): + module = import_module(module_path) + return getattr(module, class_name) + + +def import_string(dotted_path): + """ + Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import failed. + """ + try: + module_path, class_name = dotted_path.rsplit(".", 1) + except ValueError as err: + raise ImportError("%s doesn't look like a module path" % dotted_path) from err + + try: + return cached_import(module_path, class_name) + except AttributeError as err: + raise ImportError( + 'Module "%s" does not define a "%s" attribute/class' + % (module_path, class_name) + ) from err diff --git a/mds/utils/str_parser.py b/mds/utils/str_parser.py new file mode 100644 index 0000000..3f83bab --- /dev/null +++ b/mds/utils/str_parser.py @@ -0,0 +1,64 @@ +from collections import namedtuple + +FN_PLACEHOLDERS = namedtuple( + "FN_PLACEHOLDERS", + [ + "base_dir", + "prodDate", + "refDate", + "DR", + "DRyyyy", + "DRmm", + "DRdd", + "DPyyyy", + "DPmm", + "DPdd", + ], +) + + +def build_fn_placeholders(base_dir: str, ref_date: str, prod_date: str): + ref_yyyy, ref_mm, ref_dd = ref_date[0:4], ref_date[4:6], ref_date[6:8] + prod_yyyy, prod_mm, prod_dd = prod_date[0:4], prod_date[4:6], prod_date[6:8] + + return FN_PLACEHOLDERS( + base_dir=base_dir, + prodDate=prod_date, + refDate=ref_date, + DR=ref_date, + DRyyyy=ref_yyyy, + DRmm=ref_mm, + DRdd=ref_dd, + DPyyyy=prod_yyyy, + DPmm=prod_mm, + DPdd=prod_dd, + ) + + +def replace_placeholders( + str_template: str, placeholders: FN_PLACEHOLDERS, other_placeholders: dict = None +): + """ + Replace placeholders in a string template with values from named tuple placeholders and + optionally from a dictionary of other placeholders. The other placeholders has higher priority. + + Args: + - str_template (str): The string template where placeholders will be replaced. + - placeholders (namedtuple): Named tuple containing values for placeholders. + - other_placeholders (dict, optional): Dictionary containing additional placeholders and their values. + It overwrites value in placeholders dict if a common value is present + + Returns: + - str: The modified string with placeholders replaced. + """ + dict_placeholders = placeholders._asdict() + # If other_placeholders is provided, update dict_placeholders with its values - overwrite common parameters + if other_placeholders is not None: + dict_placeholders.update(other_placeholders) + + new_str = str_template + # Replace each placeholder in the template with its corresponding value + for placeholder, value in dict_placeholders.items(): + new_str = new_str.replace(f"{{{placeholder}}}", value) + + return new_str diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9403ec3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[tool.poetry] +name = "mds-toolbox" +version = "1.0.0" +description = "A custom CMCC library to list and download data from the Marine Data Store" +authors = ["Antonio Mariani "] +readme = ["README.md"] +packages = [ + { include = "mds" }, + { include = "mds/**/*.py" }, +] +keywords = ["copernicus", "copernicusmarine", "mds", "marine data store"] + +[tool.poetry.dependencies] +python = ">=3.10,<3.13" +boto3 = "^1.37.4" +copernicusmarine = "1.3.5" +click = "^8.1.8" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + + +[tool.poetry.scripts] +mds = "mds.mng:start_from_command_line_interface" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..085d99d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,61 @@ +aiohappyeyeballs==2.4.6 ; python_version >= "3.10" and python_version < "3.13" +aiohttp==3.11.13 ; python_version >= "3.10" and python_version < "3.13" +aiosignal==1.3.2 ; python_version >= "3.10" and python_version < "3.13" +asciitree==0.3.3 ; python_version >= "3.10" and python_version < "3.13" +async-timeout==5.0.1 ; python_version >= "3.10" and python_version < "3.11" +attrs==25.1.0 ; python_version >= "3.10" and python_version < "3.13" +beautifulsoup4==4.13.3 ; python_version >= "3.10" and python_version < "3.13" +boto3==1.37.4 ; python_version >= "3.10" and python_version < "3.13" +botocore==1.37.4 ; python_version >= "3.10" and python_version < "3.13" +cachier==3.1.2 ; python_version >= "3.10" and python_version < "3.13" +certifi==2025.1.31 ; python_version >= "3.10" and python_version < "3.13" +cftime==1.6.4.post1 ; python_version >= "3.10" and python_version < "3.13" +charset-normalizer==3.4.1 ; python_version >= "3.10" and python_version < "3.13" +click==8.1.8 ; python_version >= "3.10" and python_version < "3.13" +cloudpickle==3.1.1 ; python_version >= "3.10" and python_version < "3.13" +colorama==0.4.6 ; python_version >= "3.10" and python_version < "3.13" and platform_system == "Windows" +copernicusmarine==1.3.5 ; python_version >= "3.10" and python_version < "3.13" +dask==2025.2.0 ; python_version >= "3.10" and python_version < "3.13" +fasteners==0.19 ; python_version >= "3.10" and python_version < "3.13" and sys_platform != "emscripten" +frozenlist==1.5.0 ; python_version >= "3.10" and python_version < "3.13" +fsspec==2025.2.0 ; python_version >= "3.10" and python_version < "3.13" +idna==3.10 ; python_version >= "3.10" and python_version < "3.13" +importlib-metadata==8.6.1 ; python_version >= "3.10" and python_version < "3.12" +jmespath==1.0.1 ; python_version >= "3.10" and python_version < "3.13" +locket==1.0.0 ; python_version >= "3.10" and python_version < "3.13" +lxml==5.3.1 ; python_version >= "3.10" and python_version < "3.13" +motuclient==3.0.0 ; python_version >= "3.10" and python_version < "3.13" +multidict==6.1.0 ; python_version >= "3.10" and python_version < "3.13" +nest-asyncio==1.6.0 ; python_version >= "3.10" and python_version < "3.13" +netcdf4==1.7.2 ; python_version >= "3.10" and python_version < "3.13" +numcodecs==0.13.1 ; python_version >= "3.10" and python_version < "3.13" +numpy==2.2.3 ; python_version >= "3.10" and python_version < "3.13" +packaging==24.2 ; python_version >= "3.10" and python_version < "3.13" +pandas==2.2.3 ; python_version >= "3.10" and python_version < "3.13" +partd==1.4.2 ; python_version >= "3.10" and python_version < "3.13" +portalocker==3.1.1 ; python_version >= "3.10" and python_version < "3.13" +propcache==0.3.0 ; python_version >= "3.10" and python_version < "3.13" +pydap==3.5.3 ; python_version >= "3.10" and python_version < "3.13" +pystac==1.12.2 ; python_version >= "3.10" and python_version < "3.13" +python-dateutil==2.9.0.post0 ; python_version >= "3.10" and python_version < "3.13" +pytz==2025.1 ; python_version >= "3.10" and python_version < "3.13" +pywin32==308 ; python_version >= "3.10" and python_version < "3.13" and platform_system == "Windows" +pyyaml==6.0.2 ; python_version >= "3.10" and python_version < "3.13" +requests==2.32.3 ; python_version >= "3.10" and python_version < "3.13" +s3transfer==0.11.3 ; python_version >= "3.10" and python_version < "3.13" +scipy==1.15.2 ; python_version >= "3.10" and python_version < "3.13" +semver==3.0.4 ; python_version >= "3.10" and python_version < "3.13" +setuptools==75.8.2 ; python_version >= "3.10" and python_version < "3.13" +six==1.17.0 ; python_version >= "3.10" and python_version < "3.13" +soupsieve==2.6 ; python_version >= "3.10" and python_version < "3.13" +toolz==1.0.0 ; python_version >= "3.10" and python_version < "3.13" +tqdm==4.67.1 ; python_version >= "3.10" and python_version < "3.13" +typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "3.13" +tzdata==2025.1 ; python_version >= "3.10" and python_version < "3.13" +urllib3==2.3.0 ; python_version >= "3.10" and python_version < "3.13" +watchdog==6.0.0 ; python_version >= "3.10" and python_version < "3.13" +webob==1.8.9 ; python_version >= "3.10" and python_version < "3.13" +xarray==2025.1.2 ; python_version >= "3.10" and python_version < "3.13" +yarl==1.18.3 ; python_version >= "3.10" and python_version < "3.13" +zarr==2.18.3 ; python_version >= "3.10" and python_version < "3.13" +zipp==3.21.0 ; python_version >= "3.10" and python_version < "3.12" diff --git a/run_cli.py b/run_cli.py new file mode 100755 index 0000000..f9f5a1e --- /dev/null +++ b/run_cli.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +"""Temporary script to start mds-toolbox from CLI""" + +from mds.mng import start_from_command_line_interface + + +def main(): + start_from_command_line_interface() + + +if __name__ == "__main__": + main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_mds.py b/tests/test_mds.py new file mode 100644 index 0000000..e52e9c6 --- /dev/null +++ b/tests/test_mds.py @@ -0,0 +1,78 @@ +import fnmatch +import os +import pytest + +import mds.core.wrapper +from mds.core import utils + + +def test_get_temporary_directory(): + output_filename = "test.nc" + destination_dir = "/my_dir" + result = utils.get_temporary_directory(output_filename, destination_dir) + result2 = utils.get_temporary_directory(output_filename, destination_dir) + + assert result == f"{destination_dir}/.c69843c60260734a065df6f9bcaca942" + assert result == result2 + + +def test_cwf(): + home = os.environ["HOME"] + os.chdir(home) + + os.chdir(home) + cwd = utils.cwd() + + assert cwd == home + + +def test_mds_download_wrong_mode(tmp_path): + with pytest.raises(ValueError): + mds.download.wrapper.mds_download("wrong") + + +def test_mds_get_not_found(tmp_path): + with pytest.raises(FileNotFoundError): + dataset = "cmems_mod_med_phy-tem_anfc_4.2km_P1D-m" + output_filename = ( + "20231231_d-CMCC--TEMP-MFSeas8-MEDATL-b2024gsagaga109_an-sv09.00.nc" + ) + output_path = f"{tmp_path}" + + mds.download.wrapper.mds_download( + "get", + filter=output_filename, + output_directory=output_path, + dataset_id=dataset, + ) + + +def test_mds_get_download(tmp_path): + dataset = "cmems_mod_med_phy-tem_anfc_4.2km_P1D-m" + output_filename = "20231231_d-CMCC--TEMP-MFSeas8-MEDATL-b20240109_an-sv09.00.nc" + output_path = f"{tmp_path}" + + mds.download.wrapper.mds_download( + "get", filter=output_filename, output_directory=output_path, dataset_id=dataset + ) + + assert os.path.exists(os.path.join(output_path, output_filename)) + + +def test_mds_get_list(): + dataset = "cmems_mod_med_phy-tem_anfc_4.2km_P1D-m" + mds_filter = "*-CMCC--TEMP-MFSeas8-MEDATL-b20240109_an-sv09.00.nc" + + result = mds.download.wrapper.mds_list(dataset, mds_filter) + + assert isinstance(result, list) + # best analysis keep on mds + assert len(result) == 7 + + +def test_fnmatch(): + s3_file = "_2.5km_PT1H-m_202311/2023/12/20231201_h-CMCC--TEMP-BSeas6-BS-b20231212_an-sv12.00.nc" + + assert fnmatch.fnmatch(s3_file, "*20231201_h-CMCC*") + assert not fnmatch.fnmatch(s3_file, "*20231201_h-cmcc*") + assert not fnmatch.fnmatch(s3_file, "*2023201_h*") diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..f1e55cf --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,21 @@ +from mds.core import utils + + +def test_split_list_into_parts(): + # Test case 1: Splitting a list into 3 different parts + lst1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + num_threads1 = 3 + expected_parts1 = [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]] + assert utils.split_list_into_parts(lst1, num_threads1) == expected_parts1 + + # Test case 2: Splitting a list into 2 equal parts + lst2 = [1, 2, 3, 4, 5, 6] + num_threads2 = 2 + expected_parts2 = [[1, 2, 3], [4, 5, 6]] + assert utils.split_list_into_parts(lst2, num_threads2) == expected_parts2 + + # Test case 3: Splitting an empty list + lst3 = [] + num_threads3 = 3 + expected_parts3 = [[], [], []] + assert utils.split_list_into_parts(lst3, num_threads3) == expected_parts3