Skip to content

Commit 490f7f4

Browse files
committed
chore(config): Pass encoding as entire config option
1 parent 42d5723 commit 490f7f4

File tree

6 files changed

+164
-141
lines changed

6 files changed

+164
-141
lines changed

src/satellite_consumer/cmd/application.conf

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,22 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
5555
(11 low and one high resolution).
5656
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:MSG15-RSS"""
5757
file_filter_regex="\S+\.nat$"
58-
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
59-
chunks=[1, 348, 464, -1]
60-
shards=[1, 1392, 3712, -1]
58+
encoding {
59+
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
60+
# Data variables
61+
data { dtype=float16, fill_value=NaN, chunks=[1, 348, 464, -1], shards=[1, 1392, 3712, -1], compressors=yes }
62+
instrument { dtype=<U26, chunks=[10000] }
63+
satellite_actual_longitude { dtype=float64, chunks=[10000] }
64+
satellite_actual_latitude { dtype=float64, chunks=[10000] }
65+
satellite_actual_altitude { dtype=float64, chunks=[10000] }
66+
cal_offset { dtype=float64, chunks=[10000] }
67+
cal_slope { dtype=float64, chunks=[10000] }
68+
projection_longitude { dtype=float64, chunks=[10000] }
69+
projection_latitude { dtype=float64, chunks=[10000] }
70+
# Coordinates
71+
channel { dtype=str }
72+
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
73+
}
6174
}
6275
iodc {
6376
region="india"
@@ -70,21 +83,21 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
7083
(11 low and one high resolution).
7184
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:HRSEVIRI-IODC"""
7285
file_filter_regex="\S+\.nat$"
73-
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
7486
encoding {
75-
# Data variables
76-
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1] }
77-
instrument { dtype=<U26, chunks=[10000] }
78-
satellite_actual_longitude { dtype=float64, chunks=[10000] }
79-
satellite_actual_latitude { dtype=float64, chunks=[10000] }
80-
satellite_actual_altitude { dtype=float64, chunks=[10000] }
81-
cal_offset { dtype=float64, chunks=[10000] }
82-
cal_slope { dtype=float64, chunks=[10000] }
83-
projection_longitude { dtype=float64, chunks=[10000] }
84-
projection_latitude { dtype=float64, chunks=[10000] }
85-
# Coordinates
86-
channel { dtype=str }
87-
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
87+
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
88+
# Data variables
89+
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1], compressors=yes }
90+
instrument { dtype=<U26, chunks=[10000] }
91+
satellite_actual_longitude { dtype=float64, chunks=[10000] }
92+
satellite_actual_latitude { dtype=float64, chunks=[10000] }
93+
satellite_actual_altitude { dtype=float64, chunks=[10000] }
94+
cal_offset { dtype=float64, chunks=[10000] }
95+
cal_slope { dtype=float64, chunks=[10000] }
96+
projection_longitude { dtype=float64, chunks=[10000] }
97+
projection_latitude { dtype=float64, chunks=[10000] }
98+
# Coordinates
99+
channel { dtype=str }
100+
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
88101
}
89102
}
90103
odegree {
@@ -98,21 +111,21 @@ The data is transmitted as High Rate transmissions in 12 spectral channels
98111
(11 low and one high resolution).
99112
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:MSG:HRSEVIRI"""
100113
file_filter_regex="\S+\.nat$"
101-
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
102-
encoding {
103-
# Data variables
104-
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1] }
105-
instrument { dtype=<U26, chunks=[10000] }
106-
satellite_actual_longitude { dtype=float64, chunks=[10000] }
107-
satellite_actual_latitude { dtype=float64, chunks=[10000] }
108-
satellite_actual_altitude { dtype=float64, chunks=[10000] }
109-
cal_offset { dtype=float64, chunks=[10000] }
110-
cal_slope { dtype=float64, chunks=[10000] }
111-
projection_longitude { dtype=float64, chunks=[10000] }
112-
projection_latitude { dtype=float64, chunks=[10000] }
113-
# Coordinates
114-
channel { dtype=str }
115-
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
114+
encoding {
115+
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
116+
# Data variables
117+
data { dtype=float16, fill_value=NaN, chunks=[1, 464, 464, -1], shards=[1, 3712, 3712, -1], compressors=yes }
118+
instrument { dtype=<U26, chunks=[10000] }
119+
satellite_actual_longitude { dtype=float64, chunks=[10000] }
120+
satellite_actual_latitude { dtype=float64, chunks=[10000] }
121+
satellite_actual_altitude { dtype=float64, chunks=[10000] }
122+
cal_offset { dtype=float64, chunks=[10000] }
123+
cal_slope { dtype=float64, chunks=[10000] }
124+
projection_longitude { dtype=float64, chunks=[10000] }
125+
projection_latitude { dtype=float64, chunks=[10000] }
126+
# Coordinates
127+
channel { dtype=str }
128+
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
116129
}
117130
}
118131
odegree-12 {
@@ -127,21 +140,21 @@ The data is transmitted as High Rate transmissions in 16 spectral channels
127140
See https://user.eumetsat.int/catalogue/EO:EUM:DAT:0662"""
128141
# Return only files covering the top of the UK
129142
file_filter_regex="\S+BODY\S+00(?:[3][2-9]|40).nc$"
130-
dimensions=["time", "y_geostationary", "x_geostationary", "channel"]
131-
encoding {
132-
# Data variables
133-
data { dtype=float16, fill_value=NaN, chunks=[1, 400, 400, -1], shards=[1, -1, -1, -1] }
134-
instrument { dtype=<U26, chunks=[10000] }
135-
satellite_actual_longitude { dtype=float64, chunks=[10000] }
136-
satellite_actual_latitude { dtype=float64, chunks=[10000] }
137-
satellite_actual_altitude { dtype=float64, chunks=[10000] }
138-
cal_offset { dtype=float64, chunks=[10000] }
139-
cal_slope { dtype=float64, chunks=[10000] }
140-
projection_longitude { dtype=float64, chunks=[10000] }
141-
projection_latitude { dtype=float64, chunks=[10000] }
142-
# Coordinates
143-
channel { dtype=str }
144-
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
143+
encoding {
144+
_ARRAY_DIMENSIONS=["time", "y_geostationary", "x_geostationary", "channel"]
145+
# Data variables
146+
data { dtype=float16, fill_value=NaN, chunks=[1, 400, 400, -1], shards=[1, -1, -1, -1], compressors=yes }
147+
instrument { dtype=<U26, chunks=[10000] }
148+
satellite_actual_longitude { dtype=float64, chunks=[10000] }
149+
satellite_actual_latitude { dtype=float64, chunks=[10000] }
150+
satellite_actual_altitude { dtype=float64, chunks=[10000] }
151+
cal_offset { dtype=float64, chunks=[10000] }
152+
cal_slope { dtype=float64, chunks=[10000] }
153+
projection_longitude { dtype=float64, chunks=[10000] }
154+
projection_latitude { dtype=float64, chunks=[10000] }
155+
# Coordinates
156+
channel { dtype=str }
157+
time { dtype=int, units="nanoseconds since 1970-01-01", calendar="proleptic_gregorian", chunks=[10000] }
145158
}
146159
}
147160
}

src/satellite_consumer/cmd/main.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,7 @@ def main() -> None:
101101
conf.get_string("credentials.aws.region", None),
102102
),
103103
gcs_credentials=conf.get_string("credentials.gcs.application_credentials", None),
104-
dims_chunks_shards=(
105-
conf.get_list(f"satellites.{sat}.dimensions"),
106-
conf.get_list(f"satellites.{sat}.chunks"),
107-
conf.get_list(f"satellites.{sat}.shards"),
108-
),
104+
encoding=conf.get_config(f"satellites.{sat}.encoding"),
109105
buffer_size=conf.get_int("consumer.buffer_size"),
110106
max_workers=conf.get_int("consumer.max_workers"),
111107
accum_writes=conf.get_int("consumer.accum_writes"),

src/satellite_consumer/consume.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
1414
from functools import partial
1515
from itertools import islice
16-
from typing import TYPE_CHECKING, Literal, TypeVar
16+
from typing import TYPE_CHECKING, Any, Literal, TypeVar
1717

1818
import eumdac.product
1919
import numpy as np
@@ -158,7 +158,7 @@ async def consume_to_store(
158158
channels: list[models.SpectralChannel],
159159
resolution_meters: int,
160160
crop_region_lonlat: tuple[float, float, float, float] | None,
161-
dims_chunks_shards: tuple[list[str], list[int], list[int]],
161+
encoding: dict[str, Any],
162162
eumetsat_credentials: tuple[str, str],
163163
buffer_size: int,
164164
max_workers: int,
@@ -246,9 +246,7 @@ def _not_stored(product: eumdac.product.Product) -> bool:
246246
ds=ds,
247247
dst=dst,
248248
append_dim="time",
249-
dims=dims_chunks_shards[0],
250-
chunks=dims_chunks_shards[1],
251-
shards=dims_chunks_shards[2],
249+
encoding=encoding,
252250
)
253251
results = []
254252

@@ -300,9 +298,7 @@ def _not_stored(product: eumdac.product.Product) -> bool:
300298
ds=ds,
301299
dst=dst,
302300
append_dim="time",
303-
dims=dims_chunks_shards[0],
304-
chunks=dims_chunks_shards[1],
305-
shards=dims_chunks_shards[2],
301+
encoding=encoding,
306302
)
307303

308304
log.info(

src/satellite_consumer/storage.py

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import datetime as dt
44
import logging
55
import re
6-
from typing import Any
6+
from typing import Any, TypeVar, overload
77

88
import fsspec
99
import gcsfs
1010
import icechunk
11-
import numpy as np
1211
import pandas as pd
1312
import s3fs
1413
import xarray as xr
@@ -22,66 +21,65 @@
2221
log = logging.getLogger(__name__)
2322

2423

25-
def encoding(
24+
T = TypeVar("T")
25+
26+
27+
@overload
28+
def _sanitize_encoding(ds: xr.Dataset, dims: list[str], data: dict[Any, Any]) -> dict[str, Any]: ...
29+
30+
31+
@overload
32+
def _sanitize_encoding[T](
33+
ds: xr.Dataset,
34+
dims: list[str],
35+
data: T,
36+
) -> T: ...
37+
38+
39+
def _sanitize_encoding(
2640
ds: xr.Dataset,
2741
dims: list[str],
28-
chunks: list[int],
29-
shards: list[int],
30-
) -> dict[str, Any]:
42+
data: Any,
43+
) -> Any:
3144
"""Get the encoding dictionary for writing the dataset to Zarr."""
32-
# Replace -1's with full dimension sizes
33-
chunks = [
34-
cd[0] if cd[0] > 0 else len(ds.coords[cd[1]].values)
35-
for cd in zip(chunks, dims, strict=True)
36-
]
37-
shards = [
38-
sd[0] if sd[0] > 0 else len(ds.coords[sd[1]].values)
39-
for sd in zip(shards, dims, strict=True)
40-
]
45+
if isinstance(data, dict):
46+
sanitized_data: dict[str, Any] = {}
47+
for key, value in data.items():
48+
if key in ["chunks", "shards"] and isinstance(value, list):
49+
# Replace all -1's with the correspoinding dimension length
50+
# Might not be chunked along all dims, hence strict=False
51+
sanitized_data[key] = [
52+
cd[0] if cd[0] > 0 else len(ds.coords[cd[1]].values)
53+
for cd in zip(value, dims, strict=False)
54+
]
55+
elif key == "compressors":
56+
# Replace any mention of compressors with a standard Blosc Zstd compressor
57+
sanitized_data[key] = zarr.codecs.BloscCodec(
58+
cname="zstd",
59+
clevel=3,
60+
shuffle=zarr.codecs.BloscShuffle.bitshuffle,
61+
)
62+
elif key == "_ARRAY_DIMENSIONS":
63+
# Remove _ARRAY_DIMENSIONS key as it is not a valid Zarr encoding key, if exists
64+
continue
65+
else:
66+
sanitized_data[key] = _sanitize_encoding(ds=ds, dims=dims, data=value)
4167

42-
return {
43-
# Data variables
44-
"data": {
45-
"compressors": zarr.codecs.BloscCodec(
46-
cname="zstd",
47-
clevel=3,
48-
shuffle=zarr.codecs.BloscShuffle.bitshuffle,
49-
),
50-
"fill_value": np.float32(np.nan),
51-
"chunks": chunks,
52-
"shards": shards,
53-
},
54-
"instrument": {"dtype": "<U26", "chunks": (10000,)},
55-
"satellite_actual_longitude": {"dtype": "float64", "chunks": (10000,)},
56-
"satellite_actual_latitude": {"dtype": "float64", "chunks": (10000,)},
57-
"satellite_actual_altitude": {"dtype": "float64", "chunks": (10000,)},
58-
"cal_offset": {"dtype": "float64", "chunks": (10000,)},
59-
"cal_slope": {"dtype": "float64", "chunks": (10000,)},
60-
"projection_longitude": {"dtype": "float64", "chunks": (10000,)},
61-
"projection_latitude": {"dtype": "float64", "chunks": (10000,)},
62-
# Coordinates
63-
"channel": {"dtype": "str"},
64-
"time": {
65-
"dtype": "int",
66-
"units": "nanoseconds since 1970-01-01",
67-
"calendar": "proleptic_gregorian",
68-
"chunks": (10000,),
69-
},
70-
}
68+
return sanitized_data
69+
return data
7170

7271

7372
def write_to_store(
7473
ds: xr.Dataset,
7574
dst: str | icechunk.repository.Repository,
7675
append_dim: str,
77-
chunks: list[int],
78-
shards: list[int],
79-
dims: list[str],
76+
encoding: dict[str, Any],
8077
) -> None:
8178
"""Write the given dataset to the destination.
8279
8380
If a store already exists at the given path, the dataset will be appended to it.
8481
"""
82+
dims = encoding["_ARRAY_DIMENSIONS"]
8583
if dims != list(ds.dims):
8684
raise ValueError(
8785
"Provided dimensions do not match dataset dimensions."
@@ -102,7 +100,7 @@ def write_to_store(
102100
obj=ds,
103101
session=session,
104102
mode="w-",
105-
encoding=encoding(ds=ds, dims=dims, chunks=chunks, shards=shards),
103+
encoding=_sanitize_encoding(ds=ds, dims=dims, data=encoding),
106104
)
107105
_ = session.commit(message="initial commit")
108106
elif isinstance(dst, str):
@@ -113,7 +111,7 @@ def write_to_store(
113111
write_empty_chunks=False,
114112
zarr_format=3,
115113
compute=True,
116-
encoding=encoding(ds=ds, dims=dims, chunks=chunks, shards=shards),
114+
encoding=_sanitize_encoding(ds=ds, dims=dims, data=encoding),
117115
)
118116
return None
119117

0 commit comments

Comments
 (0)