1212"""
1313from __future__ import annotations
1414import argparse
15+ from email import parser
1516import os
1617from pathlib import Path
1718import logging
@@ -48,6 +49,18 @@ def parse_args():
4849 parser = argparse .ArgumentParser (
4950 description = "CMIP7 monthly processing for atm/lnd realms"
5051 )
52+ parser .add_argument (
53+ "--workers" ,
54+ type = int ,
55+ default = 128 ,
56+ help = "Number of Dask workers (default: 128, set to 1 for serial execution)" ,
57+ )
58+ parser .add_argument (
59+ "--overwrite" ,
60+ action = "store_true" ,
61+ help = "Overwrite existing timeseries outputs (default: False)" ,
62+ )
63+
5164 parser .add_argument (
5265 "--realm" , choices = ["atm" , "lnd" ], required = True , help = "Realm to process"
5366 )
@@ -56,11 +69,19 @@ def parse_args():
5669 parser .add_argument (
5770 "--test" , action = "store_true" , help = "Run in test mode with default paths"
5871 )
72+ scratch = os .getenv ("SCRATCH" )
73+ default_outdir = scratch + "/CMIP7" if scratch else "./CMIP7"
5974 parser .add_argument (
6075 "--run-freq" ,
6176 type = str ,
6277 default = "10y" ,
63- help = "Minimum run frequency required (e.g. '10y' or '120m'), default 10y" ,
78+ help = "Request run frequency (e.g. '10y' or '120m'), default 10y" ,
79+ )
80+ parser .add_argument (
81+ "--outdir" ,
82+ type = str ,
83+ default = default_outdir ,
84+ help = "Output directory for CMORized files (default: $SCRATCH/CMIP7)" ,
6485 )
6586 args = parser .parse_args ()
6687 # Parse run_freq argument
@@ -89,16 +110,16 @@ def parse_args():
89110 return args
90111
91112
92- @delayed
93- def process_one_var (varname : str , mapping , ds_native , OUTDIR ) -> tuple [str , str ]:
113+ def process_one_var (
114+ varname : str , mapping , ds_native , tables_path , outdir
115+ ) -> tuple [str , str ]:
94116 """Compute+write one CMIP variable. Returns (varname, 'ok' or error message)."""
95117 try :
96- # Realize → verticalize (if needed) → regrid for a single variable
97118 ds_cmor = realize_regrid_prepare (
98119 mapping ,
99120 ds_native ,
100121 varname ,
101- tables_path = TABLES ,
122+ tables_path = tables_path ,
102123 time_chunk = 12 ,
103124 regrid_kwargs = {
104125 "output_time_chunk" : 12 ,
@@ -111,15 +132,14 @@ def process_one_var(varname: str, mapping, ds_native, OUTDIR) -> tuple[str, str]
111132 ),
112133 },
113134 )
114- # Unique log per *run* is in your CmorSession; still fine to reuse here.
115- log_dir = Path (OUTDIR ) / "logs"
135+ log_dir = Path (outdir ) / "logs"
116136 log_dir .mkdir (parents = True , exist_ok = True )
117137 with CmorSession (
118- tables_path = TABLES ,
138+ tables_path = tables_path ,
119139 log_dir = log_dir ,
120140 log_name = f"cmor_{ datetime .now (UTC ).strftime ('%Y%m%dT%H%M%SZ' )} _{ varname } .log" ,
121141 dataset_attrs = {"institution_id" : "NCAR" },
122- outdir = OUTDIR ,
142+ outdir = outdir ,
123143 ) as cm :
124144 cfg = mapping .get_cfg (varname )
125145 vdef = type (
@@ -143,6 +163,9 @@ def process_one_var(varname: str, mapping, ds_native, OUTDIR) -> tuple[str, str]
143163 return (varname , f"ERROR: { e !r} " )
144164
145165
166+ process_one_var_delayed = delayed (process_one_var )
167+
168+
146169def latest_monthly_file (
147170 directory : Path , * , require_consistent_style : bool = True
148171) -> Optional [Tuple [Path , int , int ]]:
@@ -178,28 +201,9 @@ def latest_monthly_file(
178201
179202
180203def main ():
181- parser = argparse .ArgumentParser (
182- description = "CMIP7 monthly processing for atm/lnd realms"
183- )
184- parser .add_argument (
185- "--realm" , choices = ["atm" , "lnd" ], required = True , help = "Realm to process"
186- )
187- parser .add_argument ("--caseroot" , type = str , help = "Case root directory" )
188- parser .add_argument ("--cimeroot" , type = str , help = "CIME root directory" )
189- parser .add_argument (
190- "--test" , action = "store_true" , help = "Run in test mode with default paths"
191- )
192- parser .add_argument (
193- "--run-freq" ,
194- type = str ,
195- default = "10y" ,
196- help = "Requested run frequency (e.g. '10y' or '120m'), default 10y" ,
197- )
198- args = parser .parse_args ()
199-
200204 args = parse_args ()
201205 scratch = os .getenv ("SCRATCH" )
202- OUTDIR = scratch + "/CMIP7"
206+ OUTDIR = args . outdir
203207 # Set realm-specific parameters
204208 if args .realm == "atm" :
205209 include_pattern = "*cam.h0a*"
@@ -258,8 +262,13 @@ def main():
258262 if not os .path .exists (str (TSDIR )):
259263 os .makedirs (str (TSDIR ))
260264 # Dask cluster setup
261- cluster = LocalCluster (n_workers = 128 , threads_per_worker = 1 , memory_limit = "235GB" )
262- client = cluster .get_client ()
265+ if args .workers == 1 :
266+ client = None
267+ else :
268+ cluster = LocalCluster (
269+ n_workers = args .workers , threads_per_worker = 1 , memory_limit = "235GB"
270+ )
271+ client = cluster .get_client ()
263272 input_head_dir = INPUTDIR
264273 output_head_dir = TSDIR
265274 hf_collection = HFCollection (input_head_dir , dask_client = client )
@@ -268,9 +277,11 @@ def main():
268277 ts_collection = TSCollection (
269278 hf_collection , output_head_dir , ts_orders = None , dask_client = client
270279 )
271- ts_collection = ts_collection .apply_overwrite ("*" )
280+ if args .overwrite :
281+ ts_collection = ts_collection .apply_overwrite ("*" )
272282 ts_collection .execute ()
273283 # Load mapping
284+ print ("Timeseries processing complete, starting CMORization..." )
274285 mapping = Mapping .from_packaged_default ()
275286 cmip_vars = find_variables_by_prefix (
276287 None , var_prefix , include_groups = {"baseline_monthly" }
@@ -284,11 +295,20 @@ def main():
284295 use_cftime = True ,
285296 parallel = True ,
286297 )
287- futs = [process_one_var (v , mapping , ds_native , TABLES , OUTDIR ) for v in cmip_vars ]
288- results = dask .compute (* futs )
298+ if args .workers == 1 :
299+ results = [
300+ process_one_var (v , mapping , ds_native , TABLES , OUTDIR ) for v in cmip_vars
301+ ]
302+ else :
303+ futs = [
304+ process_one_var_delayed (v , mapping , ds_native , TABLES , OUTDIR )
305+ for v in cmip_vars
306+ ]
307+ results = dask .compute (* futs )
308+ if client :
309+ client .close ()
289310 for v , status in results :
290311 print (v , "→" , status )
291- client .close ()
292312
293313
294314if __name__ == "__main__" :
0 commit comments