Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ Install using the container image:
$ docker pull ghcr.io/openclimatefix/satellite-consumer
```

or, if you prefer a CLI:

```bash
$ pip install git+https://github.com/openclimatefix/satellite-consumer.git
```

This will put the `sat-consumer-cli` command in your virtual environments `bin` directory.

## Example usage

```bash
Expand Down
107 changes: 60 additions & 47 deletions src/satellite_consumer/cmd/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,22 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
(11 low and one high resolution).
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:MSG15-RSS"""
file_filter_regex="\S+\.nat$"
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
chunks=[1, 348, 464, -1]
shards=[1, 1392, 3712, -1]
encoding {
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
# Data variables
data { dtype=float16, fill_value=nan, chunks=[1, 348, 464, -1], shards=[1, 1392, 3712, -1], compressors=yes }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
}
}
iodc {
region="india"
Expand All @@ -70,21 +83,21 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
(11 low and one high resolution).
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:HRSEVIRI-IODC"""
file_filter_regex="\S+\.nat$"
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
encoding {
# Data variables
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1] }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
# Data variables
data { dtype=float16, fill_value=nan, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1], compressors=yes }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
}
}
odegree {
Expand All @@ -98,21 +111,21 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
(11 low and one high resolution).
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:HRSEVIRI"""
file_filter_regex="\S+\.nat$"
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
encoding {
# Data variables
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1] }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
encoding {
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
# Data variables
data { dtype=float16, fill_value=nan, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1], compressors=yes }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
}
}
odegree-12 {
Expand All @@ -127,21 +140,21 @@ The data is transmitted as High Rate transmissions in 16 spectral channels
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:0662"""
# Return only files covering the top of the UK
file_filter_regex="\S+BODY\S+00(?:[3][2-9]|40).nc$"
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
encoding {
# Data variables
data { dtype=float16, fill_value=NaN, chunks=[1, 400, 400, -1], shards=[1, -1, -1, -1] }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
encoding {
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
# Data variables
data { dtype=float16, fill_value=nan, chunks=[1, 400, 400, -1], shards=[1, -1, -1, -1], compressors=yes }
instrument { dtype=<U26, chunks=[10000] }
satellite_actual_longitude { dtype=float64, chunks=[10000] }
satellite_actual_latitude { dtype=float64, chunks=[10000] }
satellite_actual_altitude { dtype=float64, chunks=[10000] }
cal_offset { dtype=float64, chunks=[10000] }
cal_slope { dtype=float64, chunks=[10000] }
projection_longitude { dtype=float64, chunks=[10000] }
projection_latitude { dtype=float64, chunks=[10000] }
# Coordinates
channel { dtype=str }
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/satellite_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ def main() -> None:
conf.get_string("credentials.aws.region", None),
),
gcs_credentials=conf.get_string("credentials.gcs.application_credentials", None),
dims_chunks_shards=(
conf.get_list(f"satellites.{sat}.dimensions"),
conf.get_list(f"satellites.{sat}.chunks"),
conf.get_list(f"satellites.{sat}.shards"),
),
encoding=conf.get_config(f"satellites.{sat}.encoding"),
buffer_size=conf.get_int("consumer.buffer_size"),
max_workers=conf.get_int("consumer.max_workers"),
accum_writes=conf.get_int("consumer.accum_writes"),
Expand Down
18 changes: 7 additions & 11 deletions src/satellite_consumer/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from itertools import islice
from typing import TYPE_CHECKING, Literal, TypeVar
from typing import TYPE_CHECKING, Any, Literal, TypeVar

import eumdac.product
import numpy as np
Expand Down Expand Up @@ -158,7 +158,7 @@ async def consume_to_store(
channels: list[models.SpectralChannel],
resolution_meters: int,
crop_region_lonlat: tuple[float, float, float, float] | None,
dims_chunks_shards: tuple[list[str], list[int], list[int]],
encoding: dict[str, Any],
eumetsat_credentials: tuple[str, str],
buffer_size: int,
max_workers: int,
Expand Down Expand Up @@ -233,22 +233,20 @@ def _not_stored(product: eumdac.product.Product) -> bool:
total_num += 1

if isinstance(item, xr.Dataset):
log.info(f"pulled image for timestamp {pd.Timestamp(item.time.item())}")
log.debug(f"pulled image for timestamp {pd.Timestamp(item.time.item())}")
results.append(item)

# If we've reached the write block size, concat the datasets and write out
if len(results) == accum_writes:
ds = xr.concat(results, dim="time") if accum_writes > 1 else results[0]

log.info(f"saving last {accum_writes} accumulated images")
log.debug(f"saving last {accum_writes} accumulated images")

storage.write_to_store(
ds=ds,
dst=dst,
append_dim="time",
dims=dims_chunks_shards[0],
chunks=dims_chunks_shards[1],
shards=dims_chunks_shards[2],
encoding=encoding,
)
results = []

Expand Down Expand Up @@ -294,15 +292,13 @@ def _not_stored(product: eumdac.product.Product) -> bool:
if len(results) > 0:
ds = xr.concat(results, dim="time") if accum_writes > 1 else results[0]

log.info(f"saving last {accum_writes} accumulated images")
log.debug(f"saving last {accum_writes} accumulated images")

storage.write_to_store(
ds=ds,
dst=dst,
append_dim="time",
dims=dims_chunks_shards[0],
chunks=dims_chunks_shards[1],
shards=dims_chunks_shards[2],
encoding=encoding,
)

log.info(
Expand Down
96 changes: 47 additions & 49 deletions src/satellite_consumer/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import datetime as dt
import logging
import re
from typing import Any
from typing import Any, TypeVar, overload

import fsspec
import gcsfs
import icechunk
import numpy as np
import pandas as pd
import s3fs
import xarray as xr
Expand All @@ -22,66 +21,65 @@
log = logging.getLogger(__name__)


def encoding(
T = TypeVar("T")


@overload
def _sanitize_encoding(ds: xr.Dataset, dims: list[str], data: dict[Any, Any]) -> dict[str, Any]: ...


@overload
def _sanitize_encoding[T](
ds: xr.Dataset,
dims: list[str],
data: T,
) -> T: ...


def _sanitize_encoding(
ds: xr.Dataset,
dims: list[str],
chunks: list[int],
shards: list[int],
) -> dict[str, Any]:
data: Any,
) -> Any:
"""Get the encoding dictionary for writing the dataset to Zarr."""
# Replace -1's with full dimension sizes
chunks = [
cd[0] if cd[0] > 0 else len(ds.coords[cd[1]].values)
for cd in zip(chunks, dims, strict=True)
]
shards = [
sd[0] if sd[0] > 0 else len(ds.coords[sd[1]].values)
for sd in zip(shards, dims, strict=True)
]
if isinstance(data, dict):
sanitized_data: dict[str, Any] = {}
for key, value in data.items():
if key in ["chunks", "shards"] and isinstance(value, list):
# Replace all -1's with the correspoinding dimension length
# Might not be chunked along all dims, hence strict=False
sanitized_data[key] = [
cd[0] if cd[0] > 0 else len(ds.coords[cd[1]].values)
for cd in zip(value, dims, strict=False)
]
elif key == "compressors":
# Replace any mention of compressors with a standard Blosc Zstd compressor
sanitized_data[key] = zarr.codecs.BloscCodec(
cname="zstd",
clevel=3,
shuffle=zarr.codecs.BloscShuffle.bitshuffle,
)
elif key == "_ARRAY_DIMENSIONS":
# Remove _ARRAY_DIMENSIONS key as it is not a valid Zarr encoding key, if exists
continue
else:
sanitized_data[key] = _sanitize_encoding(ds=ds, dims=dims, data=value)

return {
# Data variables
"data": {
"compressors": zarr.codecs.BloscCodec(
cname="zstd",
clevel=3,
shuffle=zarr.codecs.BloscShuffle.bitshuffle,
),
"fill_value": np.float32(np.nan),
"chunks": chunks,
"shards": shards,
},
"instrument": {"dtype": "<U26", "chunks": (10000,)},
"satellite_actual_longitude": {"dtype": "float64", "chunks": (10000,)},
"satellite_actual_latitude": {"dtype": "float64", "chunks": (10000,)},
"satellite_actual_altitude": {"dtype": "float64", "chunks": (10000,)},
"cal_offset": {"dtype": "float64", "chunks": (10000,)},
"cal_slope": {"dtype": "float64", "chunks": (10000,)},
"projection_longitude": {"dtype": "float64", "chunks": (10000,)},
"projection_latitude": {"dtype": "float64", "chunks": (10000,)},
# Coordinates
"channel": {"dtype": "str"},
"time": {
"dtype": "int",
"units": "nanoseconds since 1970-01-01",
"calendar": "proleptic_gregorian",
"chunks": (10000,),
},
}
return sanitized_data
return data


def write_to_store(
ds: xr.Dataset,
dst: str | icechunk.repository.Repository,
append_dim: str,
chunks: list[int],
shards: list[int],
dims: list[str],
encoding: dict[str, Any],
) -> None:
"""Write the given dataset to the destination.

If a store already exists at the given path, the dataset will be appended to it.
"""
dims = encoding["_ARRAY_DIMENSIONS"]
if dims != list(ds.dims):
raise ValueError(
"Provided dimensions do not match dataset dimensions."
Expand All @@ -102,7 +100,7 @@ def write_to_store(
obj=ds,
session=session,
mode="w-",
encoding=encoding(ds=ds, dims=dims, chunks=chunks, shards=shards),
encoding=_sanitize_encoding(ds=ds, dims=dims, data=encoding),
)
_ = session.commit(message="initial commit")
elif isinstance(dst, str):
Expand All @@ -113,7 +111,7 @@ def write_to_store(
write_empty_chunks=False,
zarr_format=3,
compute=True,
encoding=encoding(ds=ds, dims=dims, chunks=chunks, shards=shards),
encoding=_sanitize_encoding(ds=ds, dims=dims, data=encoding),
)
return None

Expand Down
Loading