|
| 1 | +import os |
| 2 | +from pathlib import Path |
| 3 | +import logging |
| 4 | + |
| 5 | +import xarray as xr |
| 6 | +from cmip7_prep.mapping_compat import Mapping |
| 7 | +from cmip7_prep.pipeline import realize_regrid_prepare, open_native_for_cmip_vars |
| 8 | +from cmip7_prep.cmor_writer import CmorSession |
| 9 | +from cmip7_prep.dreq_search import find_variables_by_prefix |
| 10 | +from gents.hfcollection import HFCollection |
| 11 | +from gents.timeseries import TSCollection |
| 12 | +from dask.distributed import LocalCluster |
| 13 | +from dask.distributed import Client |
| 14 | +from dask import delayed |
| 15 | +from datetime import datetime, UTC |
| 16 | + |
| 17 | +import dask |
| 18 | +from dask import delayed |
| 19 | + |
| 20 | +TABLES = "/glade/work/cmip7/e3sm_to_cmip/cmip6-cmor-tables/Tables" |
| 21 | +OUTDIR = Path("/glade/derecho/scratch/cmip7/CMIP7") |
| 22 | + |
| 23 | +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") |
| 24 | + |
| 25 | + |
| 26 | +@delayed |
| 27 | +def process_one_var(varname: str) -> tuple[str, str]: |
| 28 | + """Compute+write one CMIP variable. Returns (varname, 'ok' or error message).""" |
| 29 | + try: |
| 30 | + # Realize → verticalize (if needed) → regrid for a single variable |
| 31 | + ds_cmor = realize_regrid_prepare( |
| 32 | + mapping, |
| 33 | + ds_native, |
| 34 | + varname, |
| 35 | + tables_path=TABLES, |
| 36 | + time_chunk=12, |
| 37 | + regrid_kwargs={ |
| 38 | + "output_time_chunk": 12, |
| 39 | + "dtype": "float32", |
| 40 | + "bilinear_map": Path( |
| 41 | + "/glade/campaign/cesm/cesmdata/inputdata/cpl/gridmaps/ne30pg3/map_ne30pg3_to_1x1d_bilin.nc" |
| 42 | + ), |
| 43 | + "conservative_map": Path( |
| 44 | + "/glade/campaign/cesm/cesmdata/inputdata/cpl/gridmaps/ne30pg3/map_ne30pg3_to_1x1d_aave.nc" |
| 45 | + ), |
| 46 | + }, |
| 47 | + ) |
| 48 | + |
| 49 | + # Unique log per *run* is in your CmorSession; still fine to reuse here. |
| 50 | + log_dir = OUTDIR / "logs" |
| 51 | + log_dir.mkdir(parents=True, exist_ok=True) |
| 52 | + |
| 53 | + with CmorSession( |
| 54 | + tables_path=TABLES, |
| 55 | + # one log file per worker/run (timestamp + var suffix helps debugging) |
| 56 | + log_dir=log_dir, |
| 57 | + log_name=f"cmor_{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}_{varname}.log", |
| 58 | + dataset_attrs={"institution_id": "NCAR"}, # plus your other attrs if needed |
| 59 | + ) as cm: |
| 60 | + # vdef from mapping cfg |
| 61 | + cfg = mapping.get_cfg(varname) |
| 62 | + vdef = type( |
| 63 | + "VDef", |
| 64 | + (), |
| 65 | + { |
| 66 | + "name": varname, |
| 67 | + "table": cfg.get("table", "Amon"), |
| 68 | + "units": cfg.get("units", ""), |
| 69 | + "dims": cfg.get("dims", []), |
| 70 | + "positive": cfg.get("positive", None), |
| 71 | + "cell_methods": cfg.get("cell_methods", None), |
| 72 | + "long_name": cfg.get("long_name", None), |
| 73 | + "standard_name": cfg.get("standard_name", None), |
| 74 | + "levels": cfg.get("levels", None), |
| 75 | + }, |
| 76 | + )() |
| 77 | + |
| 78 | + # Your writer expects a dataset with varname present: |
| 79 | + cm.write_variable(ds_cmor, varname, vdef, outdir=OUTDIR) |
| 80 | + |
| 81 | + return (varname, "ok") |
| 82 | + except Exception as e: # keep task alive; report failure |
| 83 | + return (varname, f"ERROR: {e!r}") |
| 84 | + |
| 85 | + |
| 86 | +if __name__ == "__main__": |
| 87 | + |
| 88 | + # Only atm monthly 32 bit |
| 89 | + include_pattern = "*cam.h0a.*" |
| 90 | + # Only atm monthly 64 bit |
| 91 | + # include_pattern = "*cam.h0a*" |
| 92 | + |
| 93 | + cluster = LocalCluster(n_workers=128, threads_per_worker=1, memory_limit="235GB") |
| 94 | + client = cluster.get_client() |
| 95 | + input_head_dir = "/glade/derecho/scratch/cmip7/archive/b.e30_beta06.B1850C_LTso.ne30_t232_wgx3.192.wrkflw.1/atm/hist_amon32" |
| 96 | + output_head_dir = "/glade/derecho/scratch/cmip7/archive/timeseries/b.e30_beta06.B1850C_LTso.ne30_t232_wgx3.192.wrkflw.1/atm/hist" |
| 97 | + hf_collection = HFCollection(input_head_dir, dask_client=client) |
| 98 | + hf_collection = hf_collection.include_patterns([include_pattern]) |
| 99 | + |
| 100 | + hf_collection.pull_metadata() |
| 101 | + ts_collection = TSCollection( |
| 102 | + hf_collection, output_head_dir, ts_orders=None, dask_client=client |
| 103 | + ) |
| 104 | + ts_collection = ts_collection.apply_overwrite("*") |
| 105 | + ts_collection.execute() |
| 106 | + |
| 107 | + # 0) Load mapping (uses packaged data/cesm_to_cmip7.yaml by default) |
| 108 | + mapping = Mapping.from_packaged_default() |
| 109 | + |
| 110 | + cmip_vars = find_variables_by_prefix( |
| 111 | + None, "Amon.", include_groups={"baseline_monthly"} |
| 112 | + ) |
| 113 | + print(f"CMORIZING {len(cmip_vars)} variables") |
| 114 | + basedir = Path(output_head_dir) |
| 115 | + # 1) Load requested variables |
| 116 | + ds_native, cmip_vars = open_native_for_cmip_vars( |
| 117 | + cmip_vars, |
| 118 | + os.path.join(basedir, include_pattern), |
| 119 | + mapping, |
| 120 | + use_cftime=True, |
| 121 | + parallel=True, |
| 122 | + ) |
| 123 | + |
| 124 | + futs = [process_one_var(v) for v in cmip_vars] |
| 125 | + results = dask.compute(*futs) # blocks until all finish |
| 126 | + |
| 127 | + for v, status in results: |
| 128 | + print(v, "→", status) |
0 commit comments