Skip to content

Commit 642bf55

Browse files
authored
Merge pull request #49 from CMCC-Foundation/version-2025.06
Version 2025.06
2 parents b7b0169 + f2100bd commit 642bf55

File tree

5 files changed

+135
-14
lines changed

5 files changed

+135
-14
lines changed

.github/workflows/build_on_release.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
name: Build geolake docker images for geolake components and push to the repository
22

33
on:
4-
push:
5-
tags:
6-
- 'v*'
4+
release:
5+
types: [published]
6+
workflow_dispatch:
7+
78
jobs:
89
build:
910
runs-on: ubuntu-latest

drivers/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
ARG REGISTRY=rg.fr-par.scw.cloud/geokube
22
#ARG TAG=2025.03.25.10.56
3-
ARG TAG=v0.2.7.10
3+
ARG TAG=2025.06
44
FROM $REGISTRY/geokube:$TAG
55

66
ADD . /opt/intake_geokube
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""geokube driver for intake."""
2+
import logging
3+
from typing import Mapping, Optional
4+
from .base import GeokubeSource
5+
from geokube import open_dataset, open_datacube
6+
from geokube.core.datacube import DataCube
7+
import pickle
8+
import os
9+
import xarray as xr
10+
import numpy as np
11+
import glob
12+
13+
_PROJECTION = {"grid_mapping_name": "latitude_longitude"}
14+
15+
class NetCDFAncillarySource(GeokubeSource):
16+
name = "geokube_netcdf_ancillary"
17+
18+
def add_projection(self, dset: xr.Dataset, **kwargs) -> xr.Dataset:
19+
"""Add projection information to the dataset"""
20+
coords = dset.coords
21+
coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION)
22+
for var in dset.data_vars.values():
23+
enc = var.encoding
24+
enc["grid_mapping"] = "crs"
25+
return dset
26+
27+
def __init__(
28+
self,
29+
path: str,
30+
ancillary_path: str,
31+
pattern: str = None,
32+
field_id: str = None,
33+
delay_read_cubes: bool = False,
34+
metadata_caching: bool = False,
35+
metadata_cache_path: str = None,
36+
storage_options: dict = None,
37+
xarray_kwargs: dict = None,
38+
metadata=None,
39+
mapping: Optional[Mapping[str, Mapping[str, str]]] = None,
40+
load_files_on_persistance: Optional[bool] = True,
41+
**kwargs
42+
):
43+
self._kube = None
44+
self.path = path
45+
self.ancillary_path = ancillary_path
46+
self.pattern = pattern
47+
self.field_id = field_id
48+
self.delay_read_cubes = delay_read_cubes
49+
self.metadata_caching = metadata_caching
50+
self.metadata_cache_path = metadata_cache_path
51+
self.storage_options = storage_options
52+
self.mapping = mapping
53+
self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs
54+
self.load_files_on_persistance = load_files_on_persistance
55+
# self.xarray_kwargs.update({'engine' : 'netcdf'})
56+
super(NetCDFAncillarySource, self).__init__(metadata=metadata, **kwargs)
57+
58+
def _open_dataset(self):
59+
60+
if self.metadata_caching:
61+
cached = None
62+
if os.path.exists(self.metadata_cache_path):
63+
with open(self.metadata_cache_path, "rb") as f:
64+
cached = pickle.load(f)
65+
self._kube = cached
66+
return self._kube
67+
68+
afilepaths = glob.glob(self.ancillary_path)
69+
filepaths = glob.glob(self.path)
70+
ancillary = xr.open_mfdataset(afilepaths, compat='override')
71+
ds = xr.open_mfdataset(filepaths, **self.xarray_kwargs)
72+
finalds = xr.merge([ancillary, ds])
73+
74+
finalds.xgrid.attrs['standard_name'] = 'projection_grid_x_centers'
75+
finalds.ygrid.attrs['standard_name'] = 'projection_grid_y_centers'
76+
77+
finalds2 = self.add_projection(finalds)
78+
finalds3 = finalds2.assign_coords(tdim=np.arange(finalds2.sizes['tdim']))
79+
time = finalds3.time.values
80+
finalds4 = finalds3.assign_coords(time=("tdim", time)).swap_dims({"tdim": "time"})
81+
finalds5 = finalds4.sortby("time")
82+
83+
for var in finalds5.data_vars.values():
84+
if "grid_mapping" in var.attrs:
85+
del var.attrs["grid_mapping"]
86+
87+
self._kube = DataCube.from_xarray(finalds5, mapping=self.mapping)
88+
89+
if self.metadata_caching:
90+
with open(self.metadata_cache_path, "wb") as f:
91+
pickle.dump(self._kube, f)
92+
93+
return self._kube

drivers/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"intake.drivers": [
1919
"geokube_netcdf = intake_geokube.netcdf:NetCDFSource",
2020
"cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource",
21-
"cmcc_sentinel_geokube = intake_geokube.sentinel:CMCCSentinelSource"
21+
"cmcc_sentinel_geokube = intake_geokube.sentinel:CMCCSentinelSource",
22+
"geokube_netcdf_ancillary = intake_geokube.netcdf_with_ancillary:NetCDFAncillarySource",
2223
]
2324
},
2425
classifiers=[

executor/app/main.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import signal
23
import tempfile
34
import time
45
import datetime
@@ -14,6 +15,7 @@
1415
from geokube.core.datacube import DataCube
1516
from geokube.core.dataset import Dataset
1617
from geokube.core.field import Field
18+
from pika.exceptions import ChannelClosed
1719

1820
from datastore.datastore import Datastore
1921
from workflow import Workflow
@@ -277,6 +279,8 @@ def __init__(self, broker, store_path, dask_cluster_opts):
277279
self._channel = broker_conn.channel()
278280
self._db = DBManager()
279281
self.dask_cluster_opts = dask_cluster_opts
282+
self.to_exit = False
283+
self.processing = False
280284

281285
def create_dask_cluster(self, dask_cluster_opts: dict = None):
282286
if dask_cluster_opts is None:
@@ -307,21 +311,21 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None):
307311

308312
def maybe_restart_cluster(self, status: RequestStatus):
309313
if status is RequestStatus.TIMEOUT:
310-
self._LOG.info("recreating the cluster due to timeout")
314+
self._LOG.info("recreating the cluster due to timeout", extra={"track_id": "N/A"})
311315
self._dask_client.cluster.close()
312316
self.create_dask_cluster()
313317
if self._dask_client.cluster.status is Status.failed:
314-
self._LOG.info("attempt to restart the cluster...")
318+
self._LOG.info("attempt to restart the cluster...", extra={"track_id": "N/A"})
315319
try:
316320
asyncio.run(self._nanny.restart())
317321
except Exception as err:
318322
self._LOG.error(
319-
"couldn't restart the cluster due to an error: %s", err
323+
"couldn't restart the cluster due to an error: %s", err, extra={"track_id": "N/A"}
320324
)
321-
self._LOG.info("closing the cluster")
325+
self._LOG.info("closing the cluster", extra={"track_id": "N/A"})
322326
self._dask_client.cluster.close()
323-
if self._dask_client.cluster.status is Status.closed:
324-
self._LOG.info("recreating the cluster")
327+
if self._dask_client.cluster.status is Status.closed and not self.to_exit:
328+
self._LOG.info("recreating the cluster", extra={"track_id": "N/A"})
325329
self.create_dask_cluster()
326330

327331
def ack_message(self, channel, delivery_tag):
@@ -330,9 +334,13 @@ def ack_message(self, channel, delivery_tag):
330334
"""
331335
if channel.is_open:
332336
channel.basic_ack(delivery_tag)
337+
self.processing = False
338+
if self.to_exit:
339+
channel.stop_consuming()
333340
else:
334341
self._LOG.info(
335-
"cannot acknowledge the message. channel is closed!"
342+
"cannot acknowledge the message. channel is closed!",
343+
extra={"track_id": "N/A"},
336344
)
337345
pass
338346

@@ -391,6 +399,7 @@ def retry_until_timeout(
391399
return location_path, status, fail_reason
392400

393401
def handle_message(self, connection, channel, delivery_tag, body):
402+
self.processing = True
394403
message: Message = Message(body)
395404
self._LOG.debug(
396405
"executing query: `%s`",
@@ -468,8 +477,21 @@ def subscribe(self, etype):
468477
)
469478

470479
def listen(self):
471-
while True:
472-
self._channel.start_consuming()
480+
while not self.to_exit:
481+
try:
482+
self._channel.start_consuming()
483+
except ChannelClosed as cc:
484+
self._LOG.error("Channel closed exiting...", extra={"track_id": "N/A"})
485+
self._LOG.info(f'Shutting down Dask...', extra={"track_id": "N/A"})
486+
self._dask_client.shutdown()
487+
self._LOG.info(f'Exiting...', extra={"track_id": "N/A"})
488+
exit(0)
489+
490+
def stop_listening(self, signo, frame):
491+
self._LOG.info(f'received signal {signo}:', extra={"track_id": "N/A"})
492+
self.to_exit = True
493+
if not self.processing:
494+
self._channel.stop_consuming()
473495

474496
def get_size(self, location_path):
475497
if location_path and os.path.exists(location_path):
@@ -503,5 +525,9 @@ def get_size(self, location_path):
503525

504526
executor.subscribe(etype)
505527

528+
print('registering signal handlers')
529+
signal.signal(signal.SIGTERM, executor.stop_listening)
530+
signal.signal(signal.SIGINT, executor.stop_listening)
531+
506532
print("waiting for requests ...")
507533
executor.listen()

0 commit comments

Comments
 (0)