diff --git a/cupid/run.py b/cupid/run.py index d1e578a7..2ff927c7 100755 --- a/cupid/run.py +++ b/cupid/run.py @@ -25,8 +25,11 @@ import os import click +import gents import intake import ploomber +from dask.distributed import Client +from dask.distributed import LocalCluster import cupid.timeseries import cupid.util @@ -83,110 +86,75 @@ def run( "glc": landice, } - # Automatically run all if no components specified - if True not in [atmosphere, ocean, land, seaice, landice]: all = True for key in component_options.keys(): component_options[key] = True - ##################################################################### - # Managing global parameters - global_params = dict() - if "global_params" in control: global_params = control["global_params"] - global_params["serial"] = serial - #################################################################### - if time_series: timeseries_params = control["timeseries"] + client = None + if not global_params["serial"]: + cluster = LocalCluster( + n_workers=1, + processes=1, + threads_per_worker=1, + memory_limit="2GB", + ) - # general timeseries arguments for all components - num_procs = timeseries_params["num_procs"] - - for component, comp_bool in component_options.items(): - if comp_bool: - - # set time series input and output directory: - # ----- - if isinstance(timeseries_params["case_name"], list): - ts_input_dirs = [] - for cname in timeseries_params["case_name"]: - ts_input_dirs.append(global_params["CESM_output_dir"]+"/"+cname+f"/{component}/hist/") + cluster.scale(timeseries_params["num_procs"]) + client = Client(cluster) + + if "ts_output_dir" not in timeseries_params: + timeseries_params["ts_output_dir"] = "./" + + for component in component_options: + if component not in timeseries_params: + continue + include_vars = None + if "vars" in timeseries_params[component]: + include_vars = timeseries_params[component]["vars"] + if include_vars == [] or include_vars == ["process_all"]: + include_vars = None + + year_start = None + if "start_years" in timeseries_params[component]: + year_start = timeseries_params[component]["start_years"] + if year_start == []: + year_start = None else: - ts_input_dirs = [ - global_params["CESM_output_dir"] + "/" + - timeseries_params["case_name"] + f"/{component}/hist/", - ] - - if "ts_output_dir" in timeseries_params: - if isinstance(timeseries_params["ts_output_dir"], list): - ts_output_dirs = [] - for ts_outdir in timeseries_params["ts_output_dir"]: - ts_output_dirs.append([ - os.path.join( - ts_outdir, - f"{component}", "proc", "tseries", - ), - ]) - else: - ts_output_dirs = [ - os.path.join( - timeseries_params["ts_output_dir"], - f"{component}", "proc", "tseries", - ), - ] + year_start = int(year_start[0]) + + year_end = None + if "end_years" in timeseries_params[component]: + year_end = timeseries_params[component]["end_years"] + if year_end == []: + year_end = None else: - if isinstance(timeseries_params["case_name"], list): - ts_output_dirs = [] - for cname in timeseries_params["case_name"]: - ts_output_dirs.append( - os.path.join( - global_params["CESM_output_dir"], - cname, - f"{component}", "proc", "tseries", - ), - ) - else: - ts_output_dirs = [ - os.path.join( - global_params["CESM_output_dir"], - timeseries_params["case_name"], - f"{component}", "proc", "tseries", - ), - ] - # ----- - - # fmt: off - # pylint: disable=line-too-long - cupid.timeseries.create_time_series( - component, - timeseries_params[component]["vars"], - timeseries_params[component]["derive_vars"], - timeseries_params["case_name"], - timeseries_params[component]["hist_str"], - ts_input_dirs, - ts_output_dirs, - # Note that timeseries output will eventually go in - # /glade/derecho/scratch/${USER}/archive/${CASE}/${component}/proc/tseries/ - timeseries_params["ts_done"], - timeseries_params["overwrite_ts"], - timeseries_params[component]["start_years"], - timeseries_params[component]["end_years"], - timeseries_params[component]["level"], - num_procs, - serial, - logger, - ) - # fmt: on - # pylint: enable=line-too-long + year_end = int(year_end[0]) + + modb = gents.ModelOutputDatabase( + hf_head_dir=global_params["CESM_output_dir"] + "/" + timeseries_params["case_name"], + ts_head_dir=timeseries_params["ts_output_dir"], + dir_name_swaps={"hist": "proc/tseries"}, + dir_exclusions=[comp for comp in component_options if comp != component], + timeseries_year_length=10, + overwrite=timeseries_params["overwrite_ts"][0], + include_variables=include_vars, + compression_level=0, + year_start=year_start, + year_end=year_end, + verbosity_level=1, + ) + modb.run(client=client, serial=global_params["serial"]) + client.shutdown() # Grab paths - run_dir = os.path.realpath(os.path.expanduser(control["data_sources"]["run_dir"])) output_dir = run_dir + "/computed_notebooks/" + control["data_sources"]["sname"] temp_data_path = run_dir + "/temp_data" diff --git a/environments/dev-environment.yml b/environments/dev-environment.yml index 30aec307..84839dbf 100644 --- a/environments/dev-environment.yml +++ b/environments/dev-environment.yml @@ -18,4 +18,5 @@ dependencies: - pyyaml - xarray - pip: + - gents - -e ../