Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ jobs:
strategy:
matrix:
python_version:
- '3.7'
- '3.8'
- '3.9'
- '3.10'
- '3.11'
env:
BASE_IMAGE: "${{ vars.DOCKER_ORG }}/geospaas:2.5.2-python${{ matrix.python_version }}"
BASE_IMAGE: "${{ vars.DOCKER_ORG }}/geospaas:3.0.0.dev1-python${{ matrix.python_version }}"
IMAGE_NAME_WORKER: "${{ vars.DOCKER_ORG }}/geospaas_processing_worker"
IMAGE_NAME_CLI: "${{ vars.DOCKER_ORG }}/geospaas_processing_cli"
IDF_CONVERTER_VERSION: '0.1.426'
GEOSPAAS_DB_HOST: 'db'
GEOSPAAS_DB_USER: 'test'
GEOSPAAS_DB_PASSWORD: ${{ secrets.GEOSPAAS_DB_PASSWORD }}
latest: ${{ matrix.python_version == '3.11' && 'true' || '' }}
HARVESTING_VERSION: "4.0.0.dev1"
steps:
- name: 'Checkout repository'
uses: actions/checkout@v4
Expand Down Expand Up @@ -67,6 +66,7 @@ jobs:
target: base
build-args: |
BASE_IMAGE=${{ env.BASE_IMAGE }}
HARVESTING_VERSION=${{ env.HARVESTING_VERSION }}
push: false
load: true
tags: ${{ env.IMAGE_NAME_WORKER }}
Expand Down Expand Up @@ -108,6 +108,7 @@ jobs:
build-args: |
BASE_IMAGE=${{ env.BASE_IMAGE }}
IDF_CONVERTER_VERSION=${{ env.IDF_CONVERTER_VERSION }}
HARVESTING_VERSION=${{ env.HARVESTING_VERSION }}
push: ${{ github.event_name == 'release' }}
tags: |
${{ env.IMAGE_NAME_WORKER }}:${{ github.ref_name }}-python${{ matrix.python_version }}
Expand Down
4 changes: 1 addition & 3 deletions Dockerfile_worker
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ RUN pip install --upgrade --no-cache-dir \
'nco'

ARG HARVESTING_VERSION=3.7.0.dev3
ARG METANORM_VERSION=4.1.1
RUN pip install --upgrade --no-cache-dir \
https://github.com/nansencenter/django-geo-spaas-harvesting/archive/refs/tags/${HARVESTING_VERSION}.tar.gz \
https://github.com/nansencenter/metanorm/releases/download/${METANORM_VERSION}/metanorm-${METANORM_VERSION}-py3-none-any.whl
"git+https://github.com/nansencenter/django-geo-spaas-harvesting@${HARVESTING_VERSION}"

FROM base as full

Expand Down
2 changes: 1 addition & 1 deletion geospaas_processing/cli/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def create_cumulative_query(arg):
"""find the requested datasets based on the time, geometry, and query of the arguments of cli"""
cumulative_query = json.loads(arg.query) if arg.query else {}
if arg.geometry:
cumulative_query['geographic_location__geometry__intersects'] = GEOSGeometry(arg.geometry)
cumulative_query['location__intersects'] = GEOSGeometry(arg.geometry)
designated_begin, designated_end = find_designated_time(
arg.rel_time_flag, arg.begin, arg.end)
cumulative_query['time_coverage_start__gte'] = designated_begin
Expand Down
4 changes: 1 addition & 3 deletions geospaas_processing/converters/syntool/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from pathlib import Path
from typing import Tuple, Dict, Union

from geospaas.catalog.managers import LOCAL_FILE_SERVICE

from ..base import ConversionError, ConversionManager, Converter, ParameterSelector


Expand Down Expand Up @@ -120,7 +118,7 @@ def post_ingest(self, results, out_dir, kwargs):
@staticmethod
def _extract_url(dataset):
"""Get the first URL which is not a local path"""
dataset_uri = dataset.dataseturi_set.exclude(service=LOCAL_FILE_SERVICE).first()
dataset_uri = dataset.dataseturi_set.exclude(uri__startswith='file').first()
return '' if dataset_uri is None else dataset_uri.uri

def run(self, in_file, out_dir, **kwargs):
Expand Down
34 changes: 18 additions & 16 deletions geospaas_processing/copiers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
import shutil
import logging
import time
from os.path import exists
from urllib.parse import urlparse

import django

from geospaas.catalog.models import Dataset
from geospaas.catalog.managers import LOCAL_FILE_SERVICE


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'geospaas_processing.settings')
django.setup()
Expand Down Expand Up @@ -34,9 +36,7 @@ def write_flag_file(type_in_flag_file, source_path, dataset, destination_filenam
string_to_write = f"type: {type_in_flag_file}{os.linesep}"
string_to_write += f"entry_id: {dataset.entry_id}{os.linesep}"
string_to_write += f"entry_title: {dataset.entry_title}{os.linesep}"
string_to_write += f"source: {dataset.source}{os.linesep}"
string_to_write += f"data_center: {dataset.data_center}{os.linesep}"
for urlname in dataset.dataseturi_set.exclude(service=LOCAL_FILE_SERVICE):
for urlname in dataset.dataseturi_set.exclude(uri__startswith='file'):
string_to_write += f"- url: {urlname.uri}{os.linesep}"
string_to_write += f"summary: {dataset.summary}{os.linesep}"
flag_file.write(string_to_write)
Expand All @@ -45,9 +45,10 @@ def file_or_symlink_copy(self, source_paths, dataset):
"""copy the file or a symlink of the file/folder of dataset based on its stored local
address in the database."""
for source_path in source_paths:
source_path_uri = urlparse(source_path.uri).path
destination_file_or_folder_name = os.path.join(
self._destination_path, os.path.basename(source_path.uri))
if exists(source_path.uri):
self._destination_path, os.path.basename(source_path_uri))
if exists(source_path_uri):
# below if condition prevents "shutil.copy" or "os.symlink" from replacing the file
# or folder in the destination in a repetitive manner.
if not exists(destination_file_or_folder_name):
Expand All @@ -60,28 +61,29 @@ def file_or_symlink_copy(self, source_paths, dataset):
LOGGER.warning(
"For stored address of dataset with id = %s,"
" there is no file or no folder in the stored address: %s.", dataset.id,
source_path.uri)
source_path_uri)

def copy_item(self, source_path, destination_file_or_folder_name, dataset):
""" Copy the 'source_path.uri' (regardless of being folder or file) or a symlink of it
into destination. Moreover, write a flag file if requested. """
source_path_uri = urlparse(source_path.uri).path
if self._link_request:
os.symlink(src=source_path.uri, dst=destination_file_or_folder_name)
os.symlink(src=source_path_uri, dst=destination_file_or_folder_name)
else:
if os.path.isfile(source_path.uri):
shutil.copy(src=source_path.uri, dst=self._destination_path)
elif os.path.isdir(source_path.uri):
shutil.copytree(src=source_path.uri, dst=os.path.join(
self._destination_path, os.path.basename(source_path.uri)))
if os.path.isfile(source_path_uri):
shutil.copy(src=source_path_uri, dst=self._destination_path)
elif os.path.isdir(source_path_uri):
shutil.copytree(src=source_path_uri, dst=os.path.join(
self._destination_path, os.path.basename(source_path_uri)))
if self._flag_file_request:
self.write_flag_file(self._type_in_flag_file,
source_path, dataset, destination_file_or_folder_name)

def copy(self):
""" Tries to copy all datasets based on their stored local addresses in the database."""
for dataset in self._datasets:
if dataset.dataseturi_set.filter(service=LOCAL_FILE_SERVICE).exists():
source_paths = dataset.dataseturi_set.filter(service=LOCAL_FILE_SERVICE)
if dataset.dataseturi_set.filter(uri__startswith='file').exists():
source_paths = dataset.dataseturi_set.filter(uri__startswith='file')
self.file_or_symlink_copy(source_paths=source_paths, dataset=dataset)
else:
LOGGER.warning("For dataset with id = %s, there is no local file/folder address in "
Expand Down
24 changes: 12 additions & 12 deletions geospaas_processing/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
except ImportError: # pragma: no cover
Redis = None

import geospaas.catalog.managers
from geospaas.catalog.models import Dataset

import geospaas_processing.utils as utils
Expand Down Expand Up @@ -560,10 +559,9 @@ class DownloadManager():
"""Downloads datasets based on some criteria, using the right downloaders"""

DOWNLOADERS = {
geospaas.catalog.managers.OPENDAP_SERVICE: HTTPDownloader,
geospaas.catalog.managers.HTTP_SERVICE: HTTPDownloader,
'http': HTTPDownloader,
'ftp': FTPDownloader,
geospaas.catalog.managers.LOCAL_FILE_SERVICE: LocalDownloader,
'file': LocalDownloader,
}

def __init__(self, download_directory='.', provider_settings_path=None, max_downloads=100,
Expand Down Expand Up @@ -629,12 +627,14 @@ def _download_from_uri(self, dataset_uri, directory):
raise TooManyDownloadsError(
f"Too many downloads in progress for {dataset_uri_prefix}")
# Try to find a downloader
try:
downloader = self.DOWNLOADERS[dataset_uri.service]
except KeyError:
LOGGER.error("No downloader found for %s service",
dataset_uri.service, exc_info=True)
raise
downloader = None
for prefix, dl_class in self.DOWNLOADERS.items():
if dataset_uri.uri.startswith(prefix):
downloader = dl_class
break
if downloader is None:
LOGGER.error("No downloader found for %s", dataset_uri.uri, exc_info=True)
raise RuntimeError(f'Could not find downloader for {dataset_uri.uri}')

LOGGER.debug("Attempting to download from '%s'", dataset_uri.uri)
file_name = None
Expand All @@ -660,7 +660,7 @@ def _download_from_uri(self, dataset_uri, directory):
def download_dataset(self, dataset, download_directory):
"""
Attempt to download a dataset by trying its URIs one by one. For each `DatasetURI`, it
selects the appropriate Dowloader based on the `service` property.
selects the appropriate Dowloader based on the URL scheme.
Returns the downloaded file path if the download succeeds, an empty string otherwise.
"""
errors = []
Expand Down Expand Up @@ -691,7 +691,7 @@ def download_dataset(self, dataset, download_directory):
if self.save_path:
dataset.dataseturi_set.get_or_create(
dataset=dataset,
uri=os.path.join(os.path.realpath(full_dataset_directory), file_name))
uri='file://'+os.path.join(os.path.realpath(full_dataset_directory), file_name))
return dataset_path
else:
shutil.rmtree(full_dataset_directory, ignore_errors=True)
Expand Down
9 changes: 3 additions & 6 deletions geospaas_processing/tasks/harvesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import os

import celery
import celery.result
import celery.utils

from geospaas_harvesting.cli import refresh_vocabularies, retry_ingest
from geospaas_harvesting.config import ProvidersConfiguration, SearchConfiguration
import geospaas_harvesting.config as harvesting_config

from geospaas_processing.tasks import FaultTolerantTask
from . import app
Expand All @@ -21,9 +20,7 @@
@app.task(base=FaultTolerantTask, bind=True, track_started=True)
def start_harvest(self, search_config_dict):
"""Launch harvesting according to the search configuration"""
config = ProvidersConfiguration.from_file(HARVEST_CONFIG_PATH)
search_config = SearchConfiguration.from_dict(search_config_dict) \
.with_providers(config.providers) # pylint: disable=no-member
search_config = harvesting_config.SearchConfiguration.from_dict(search_config_dict)
searches = search_config.create_provider_searches()
logger.info("Running the following searches: %s", searches)
tasks_to_run = celery.group(
Expand All @@ -44,7 +41,7 @@ def save_search_results(self, search_results):
@app.task(base=FaultTolerantTask, bind=True, track_started=True)
def update_vocabularies(self):
"""Update vocabularies in the GeoSPaaS database"""
config = ProvidersConfiguration.from_file(HARVEST_CONFIG_PATH)
config = harvesting_config.GeneralConfiguration.from_file(HARVEST_CONFIG_PATH)
refresh_vocabularies(config)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: POSIX :: Linux",
]
requires-python = ">=3.7"
requires-python = ">=3.9"
dependencies = [
'django-geo-spaas',
'django',
Expand Down
Loading