Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,76 @@ jobs:
run: poetry install --no-interaction --no-ansi
- name: Run pre-commit on all files
run: poetry run pre-commit run --all-files
tests:
name: pytest
runs-on: ubuntu-latest
needs: lint

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: '3.13'
cache: 'pip'
- name: Install NetCDF
run: sudo apt-get update && sudo apt-get install -y libnetcdf-dev libnetcdff-dev
- name: Install ESMF
uses: esmf-org/install-esmf-action@v1
env:
ESMF_NETCDF: nc-config
ESMF_INSTALL_PREFIX: $HOME/ESMF
with:
version: latest
esmpy: false
- name: Pin esmpy to ESMF tag
run: |
set -euo pipefail
MK="$HOME/ESMF/lib/esmf.mk"
VER=$(awk -F= '/^ESMF_VERSION_STRING[[:space:]]*=/{gsub(/[[:space:]]*/,"",$2); print $2}' "$MK")
# Normalize to plain semver if needed (drop anything after first non-semver char)
VER_CLEAN=$(python - "$VER" <<'PY'
import re, sys
v=sys.argv[1]
m=re.match(r'^(\d+\.\d+\.\d+)', v)
print(m.group(1) if m else v)
PY
)
TAG="v${VER_CLEAN}"
echo "Detected ESMF ${VER} -> using ESMPy tag ${TAG}"

# remove any existing esmpy dep and add the exact git subdir tag
poetry remove esmpy || true
poetry add --lock --no-interaction \
"esmpy@git+https://github.com/esmf-org/esmf.git@${TAG}#subdirectory=src/addon/esmpy"

- name: Install Poetry
run: |
python -m pip install --upgrade pip pipx
pipx install poetry
poetry --version
poetry config virtualenvs.in-project true
poetry env use "$(python -c 'import sys; print(sys.executable)')"
poetry install --no-interaction --no-ansi
poetry run python -c "import esmpy; print('ESMPy:', esmpy.__version__)"
- name: Cache virtualenv
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-py${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }}
restore-keys: |
venv-${{ runner.os }}-py${{ matrix.python-version }}-
venv-${{ runner.os }}-

- name: Run pytest
env:
ESMFMKFILE: ${{ env.ESMFMKFILE }}
LD_LIBRARY_PATH: ${{ env.ESMF_INSTALL_PREFIX }}/lib:${{ env.LD_LIBRARY_PATH }}
run: poetry run pytest -q
# the following can be used by developers to login to the github server in case of errors
# see https://github.com/marketplace/actions/debugging-with-tmate for further details
# - name: Setup tmate session
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3
127 changes: 127 additions & 0 deletions cmip7_prep/cmor_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,31 @@ def _resolve_table_filename(tables_path: Path, key: str) -> str:
DatasetJsonLike = Union[str, Path, AbstractContextManager]


def _fx_glob_pattern(name: str) -> str:
# CMOR filenames vary; this finds most fx files for this var
# e.g., *_sftlf_fx_*.nc or sftlf_fx_*.nc
return f"**/*_{name}_fx_*.nc"


def _open_existing_fx(outdir: Path, name: str) -> xr.DataArray | None:
# Search recursively for an existing fx file for this var
for p in outdir.rglob(_fx_glob_pattern(name)):
try:
ds = xr.open_dataset(p, engine="netcdf4")
if name in ds:
return ds[name]
except FileNotFoundError:
return None
except (OSError, ValueError) as e:
# OSError: unreadable/corrupt file, low-level I/O; ValueError: engine/decoding issues
warnings.warn(f"[fx] failed to open {p} with netcdf4: {e}", RuntimeWarning)
except (ImportError, ModuleNotFoundError) as e:
# netCDF4 backend not installed
warnings.warn(f"[fx] netcdf4 backend unavailable: {e}", RuntimeWarning)

return None


# ---------------------------------------------------------------------
# CMOR session
# ---------------------------------------------------------------------
Expand Down Expand Up @@ -424,6 +449,12 @@ def __init__(
self._pending_ps = None
self._outdir = Path(outdir) if outdir is not None else Path.cwd() / "CMIP7"
self._outdir.mkdir(parents=True, exist_ok=True)
self._fx_written: set[str] = (
set()
) # remembers which fx vars were written this run
self._fx_cache: dict[str, xr.DataArray] = (
{}
) # regridded fx fields cached in-memory

def __enter__(self) -> "CmorSession":
# Resolve logfile path if requested
Expand Down Expand Up @@ -765,6 +796,100 @@ def _get_1d_with_bounds(dsi: xr.Dataset, name: str, units_default: str):
axes_ids.extend([lat_id, lon_id])
return axes_ids

def _write_fx_2d(self, ds: xr.Dataset, name: str, units: str) -> None:
if name not in ds:
return
table_filename = _resolve_table_filename(self.tables_path, "fx")
cmor.load_table(table_filename)

lat = ds["lat"].values
lon = ds["lon"].values
lat_b = ds.get("lat_bnds")
lon_b = ds.get("lon_bnds")
lat_b = (
lat_b.values
if isinstance(lat_b, xr.DataArray)
else _bounds_from_centers_1d(lat, "lat")
)
lon_b = (
lon_b.values
if isinstance(lon_b, xr.DataArray)
else _bounds_from_centers_1d(lon, "lon")
)

lat_id = cmor.axis(
"latitude", "degrees_north", coord_vals=lat, cell_bounds=lat_b
)
lon_id = cmor.axis(
"longitude", "degrees_east", coord_vals=lon, cell_bounds=lon_b
)
data_filled, fillv = _filled_for_cmor(ds[name])

var_id = cmor.variable(name, units, [lat_id, lon_id], missing_value=fillv)
print(f"write fx variable {name}")
cmor.write(
var_id,
np.asarray(data_filled),
)
cmor.close(var_id)

def ensure_fx_written_and_cached(self, ds_regr: xr.Dataset) -> xr.Dataset:
"""Ensure sftlf and areacella exist in ds_regr and are written once as fx.
If not present in ds_regr, try to read from existing CMOR fx files in outdir.
If present in ds_regr but not yet written this run, write and cache them.
Returns ds_regr augmented with any missing fx fields.
"""
need = [("sftlf", "%"), ("areacella", "m2")]
out = ds_regr

for name, units in need:
# 1) Already cached this run?
if name in self._fx_cache:
if name not in out:
out = out.assign({name: self._fx_cache[name]})
continue

# 2) Present in regridded dataset? (best case)
if name in out:
self._fx_cache[name] = out[name]
if name not in self._fx_written:
# Convert landfrac to % if needed
if name == "sftlf":
v = out[name]
if (np.nanmax(v.values) <= 1.0) and v.attrs.get(
"units", ""
) not in ("%", "percent"):
out = out.assign(
{
name: (v * 100.0).assign_attrs(
v.attrs | {"units": "%"}
)
}
)
self._fx_cache[name] = out[name]
self._write_fx_2d(out, name, units)
self._fx_written.add(name)
continue

# 3) Not present in ds_regr → try reading existing CMOR fx output
if self._outdir:
fx_da = _open_existing_fx(self._outdir, name)
if fx_da is not None:
# Verify grid match (simple equality on lat/lon values)
if (
"lat" in out
and "lon" in out
and np.array_equal(out["lat"].values, fx_da["lat"].values)
and np.array_equal(out["lon"].values, fx_da["lon"].values)
):
out = out.assign({name: fx_da})
self._fx_cache[name] = out[name]
self._fx_written.add(name) # already exists on disk
continue
# If grid mismatch, you could regrid fx_da here; for now, skip.
# 4) Last resort: leave missing; caller may compute it later
return out

# public API
# -------------------------
def write_variable(
Expand Down Expand Up @@ -801,6 +926,8 @@ def write_variable(
time_da = ds.get("time")
nt = 0

self.ensure_fx_written_and_cached(ds)

# ---- Main variable write ----

cmor.write(
Expand Down
87 changes: 80 additions & 7 deletions cmip7_prep/data/cesm_to_cmip7.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ variables:
- cesm_var: QFLX
scale: -1.0 # flip sign if CESM positive upward

evspsblsoi:
table: Lmon
units: kg m-2 s-1
long_name: Water Evaporation from Soil
dims: [time, lat, lon]
positive: up
sources:
- cesm_var: QSOIL

evspsblveg:
table: Lmon
units: kg m-2 s-1
long_name: Water Evaporation from Soil
dims: [time, lat, lon]
positive: up
sources:
- cesm_var: QVEGE

hfls:
table: Amon
units: "W m-2"
Expand Down Expand Up @@ -139,6 +157,51 @@ variables:
sources:
- cesm_var: QREFHT

lai:
table: Lmon
units: "1"
dims: [time, lat, lon]
sources:
- cesm_var: TLAI

mrfso:
table: Lmon
units: kg m-2
dims: [time, lat, lon]
formula: verticalSum(SOILICE, capped_at=5000)
sources:
- cesm_var: SOILICE

mrro:
table: Lmon
units: kg m-2 s-1
dims: [time, lat, lon]
sources:
- cesm_var: QRUNOFF

mrros:
table: Lmon
units: kg m-2 s-1
dims: [time, lat, lon]
sources:
- cesm_var: QOVER

mrso:
table: Lmon
units: kg m-2
dims: [time, lat, lon]
formula: verticalSum(SOILICE + SOILLIQ, capped_at=5000)
sources:
- cesm_var: SOILICE
- cesm_var: SOILLIQ

mrsos:
table: Lmon
units: kg m-2
dims: [time, lat, lon]
sources:
- cesm_var: SOILWATER_10CM

pr:
table: Amon
units: kg m-2 s-1
Expand Down Expand Up @@ -432,10 +495,20 @@ variables:
sources:
- cesm_var: Z3

fx:
areacella:
units: m2
source: cell_area
sftlf:
units: '%'
source: sftlf
# areacella:
# table: fx
# units: m2
# formula: area * 1.e6
# standard_name: cell_area
# dims: [lat, lon]
# sources:
# - cesm_var: ZBOT
#
#
# sftlf:
# table: fx
# units: '%'
# formula: landfrac * 100
# dims: [lat, lon]
# sources:
# - cesm_var: ZBOT
22 changes: 21 additions & 1 deletion cmip7_prep/mapping_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,22 @@ def _safe_eval(expr: str, local_names: Dict[str, Any]) -> Any:
2.0
"""
safe_globals = {"__builtins__": {}}
locals_safe = {"np": np, "xr": xr}
# Add custom formula functions here
def verticalSum(arr, capped_at=None, dim='levsoi'):
# arr can be a DataArray or an expression
if isinstance(arr, xr.DataArray):
summed = arr.sum(dim=dim, skipna=True)
else:
summed = arr # fallback, should be DataArray
if capped_at is not None:
summed = xr.where(summed > capped_at, capped_at, summed)
return summed

locals_safe = {
"np": np,
"xr": xr,
"verticalSum": verticalSum,
}
locals_safe.update(local_names)
# pylint: disable=eval-used
return eval(expr, safe_globals, locals_safe)
Expand Down Expand Up @@ -274,6 +289,11 @@ def _realize_core(ds: xr.Dataset, vc: VarConfig) -> xr.DataArray:
raise KeyError(f"source variable {vc.source!r} not found in dataset")
return ds[vc.source]

if vc.name == "sftlf":
vc.raw_variables = "landfrac"
elif vc.name == "areacella":
vc.raw_variables = "area"

# 2) identity mapping from a single raw variable
if (
vc.raw_variables
Expand Down
Loading