Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 91 additions & 60 deletions e3sm_to_cmip/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
A python command line tool to turn E3SM model output into CMIP6 compatable data.
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os
import signal
import sys
import tempfile
import threading
import warnings
from concurrent.futures import ProcessPoolExecutor as Pool
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -22,7 +19,12 @@
from tqdm import tqdm

from e3sm_to_cmip import ROOT_HANDLERS_DIR, __version__, resources
from e3sm_to_cmip._logger import _setup_logger, _setup_root_logger
from e3sm_to_cmip._logger import (
LOG_FILENAME,
_add_filehandler,
_setup_child_logger,
_setup_root_logger,
)
from e3sm_to_cmip.cmor_handlers.utils import (
MPAS_REALMS,
REALMS,
Expand All @@ -42,18 +44,13 @@
find_mpas_files,
get_handler_info_msg,
precheck,
print_debug,
print_message,
)

os.environ["CDAT_ANONYMOUS_LOG"] = "false"

warnings.filterwarnings("ignore")
# Set up the root logger and module level logger. The module level logger is
# a child of the root logger.
_setup_root_logger()


# Setup the root logger and this module's logger.
log_filename = _setup_root_logger()
logger = _setup_logger(__name__, propagate=True)
logger = _setup_child_logger(__name__)


@dataclass
Expand Down Expand Up @@ -136,37 +133,52 @@ def __init__(self, args: Optional[List[str]] = None):
self.cmor_log_dir: Optional[str] = parsed_args.logdir
self.user_metadata: Optional[str] = parsed_args.user_metadata
self.custom_metadata: Optional[str] = parsed_args.custom_metadata

if self.output_path is not None:
self.output_path = os.path.abspath(self.output_path)
os.makedirs(self.output_path, exist_ok=True)

# Setup directories using the CLI argument paths (e.g., output dir).
# ======================================================================
self._setup_dirs_with_paths()

# Run the pre-check to determine if any of the variables have already
# been CMORized.
# ======================================================================
if self.precheck_path is not None:
self._run_precheck()

# Setup logger information and print out e3sm_to_cmip CLI arguments.
# ======================================================================
logger.info("--------------------------------------")
logger.info("| E3SM to CMIP Configuration")
logger.info("--------------------------------------")
logger.info(f" * var_list='{self.var_list}'")
logger.info(f" * input_path='{self.input_path}'")
logger.info(f" * output_path='{self.output_path}'")
logger.info(f" * precheck_path='{self.precheck_path}'")
logger.info(f" * freq='{self.freq}'")
logger.info(f" * realm='{self.realm}'")
logger.info(f" * Writing log output file to: {log_filename}")

config_details = {
"Mode": (
"Info"
if self.info_mode
else "Serial"
if self.serial_mode
else "Parallel"
),
"Variable List": self.var_list,
"Input Path": self.input_path,
"Output Path": self.output_path,
"Precheck Path": self.precheck_path,
"Log Path": self.log_path,
"CMOR Log Path": self.cmor_log_dir,
"Frequency": self.freq,
"Realm": self.realm,
}

for key, value in config_details.items():
logger.info(f" * {key}: {value}")

# Load the CMOR handlers based on the realm and variable list.
self.handlers = self._get_handlers()

def run(self):
# Setup logger information and print out e3sm_to_cmip CLI arguments.
# ======================================================================
if self.output_path is not None:
self.new_metadata_path = os.path.join(
self.output_path, "user_metadata.json"
)

# Setup directories using the CLI argument paths (e.g., output dir).
# ======================================================================
if not self.info_mode:
self._setup_dirs_with_paths()

# Run e3sm_to_cmip with info mode.
# ======================================================================
if self.info_mode:
Expand All @@ -183,7 +195,7 @@ def run(self):
status = self._run()

if status != 0:
print_message(
logger.error(
f"Error running handlers: { ' '.join([x['name'] for x in self.handlers]) }"
)
return 1
Expand Down Expand Up @@ -497,8 +509,11 @@ def _setup_argparser(self) -> argparse.ArgumentParser:
optional.add_argument(
"--logdir",
type=str,
default="./cmor_logs",
help="Where to put the logging output from CMOR.",
default="cmor_logs",
help=(
"The sub-directory that stores the CMOR logs. This sub-directory will "
"be stored under --output-path."
),
Comment on lines 562 to +569
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we deprecate the --logdir parameter? It seems unnecessary for the user to specify the name of the directory since it should be stored under --output-path.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@TonyB9000 TonyB9000 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomvothecoder Honestly, I am uncertain (at the moment) what is represented by "the output path". When I run e2c as a function of a supporting application (dsm_generate_CMIP), I force the creation of (CWD)/tmp/<case_id>/[subdirs], and these are:

    caselogs/       [timestamped logs for each E2C invocation]
    metadata/
    native_data/    [symlinks to native soruce files]
    native_out/     [produced by pre-E2C NCO stuff]
    product/        [FINAL Cmorized output from E2C]
    rgr/            [produced by pre-E2C NCO stuff]
    rgr_fixed_vars/ [produced by pre-E2C NCO stuff]
    rgr_vert/       [produced by pre-E2C NCO stuff]
    scripts/        [dataset_specific call scripts to E2C]

I believe I set "--output-path to "product/", with the intent of moving these to "STAGING_DATA" (the warehouse, pre-publication"). The logs of my calling scripts (in "scripts/") are directed to "caselogs", but I think the cmor-logs may be directed to something like (CWD)/tmp/cmor_logs/. These logs are only named by timestamp (I believe), and not by any useful "job-name" or ID, and their - um - "colorful" and flashy format (remember the early HTML <BLINK>text</BLINK> that probably induced seizures in some) cannot be usefully combined with other logs, until we can negotiate with the "cmor_setup()" devs to reformat that stuff.

)
required_no_simple.add_argument(
"-u",
Expand Down Expand Up @@ -620,18 +635,35 @@ def _run_precheck(self):
self.input_path, self.precheck_path, self.var_list, self.realm
)
if not new_var_list:
print("All variables previously computed")
logger.info("All variables previously computed")
if self.output_path is not None:
os.mkdir(os.path.join(self.output_path, "CMIP6"))
return 0
else:
print_message(f"Setting up conversion for {' '.join(new_var_list)}", "ok")
logger.info(f"Setting up conversion for {' '.join(new_var_list)}", "ok")
self.var_list = new_var_list

def _setup_dirs_with_paths(self):
# Create the output directory if it doesn't exist.
if not os.path.exists(self.output_path): # type: ignore
os.makedirs(self.output_path) # type: ignore
"""Sets up various directories and paths required for e3sm_to_cmip.

This method initializes paths for metadata, logs, and temporary storage.
It also updates the root logger's file path and copies user metadata
to the output directory if not in simple mode.

Notes
-----
If the environment variable `TMPDIR` is not set, a temporary directory
is created under the output path.
"""
self.new_metadata_path = os.path.join(self.output_path, "user_metadata.json") # type: ignore
self.cmor_log_dir = os.path.join(self.output_path, self.cmor_log_dir) # type: ignore

# NOTE: Any warnings that appear before the log filehandler is
# instantiated will not be captured (e.g,. esmpy VersionWarning).
# However, they will still be captured by the console via a
# StreamHandler.
self.log_path = os.path.join(self.output_path, LOG_FILENAME) # type: ignore
_add_filehandler(self.log_path)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an FYI, trade-off mentioned in the PR description.


# Copy the user's metadata json file with the updated output directory
if not self.simple_mode:
Expand All @@ -647,10 +679,6 @@ def _setup_dirs_with_paths(self):
tempfile.tempdir = temp_path

def _run_info_mode(self): # noqa: C901
logger.info("--------------------------------------")
logger.info("| Running E3SM to CMIP in Info Mode")
logger.info("--------------------------------------")

messages = []

# if the user just asked for the handler info
Expand All @@ -665,8 +693,11 @@ def _run_info_mode(self): # noqa: C901
for handler in self.handlers:
table_info = _get_table_info(self.tables_path, handler["table"])
if handler["name"] not in table_info["variable_entry"]:
msg = f"Variable {handler['name']} is not included in the table {handler['table']}"
print_message(msg, status="error")
logger.error(
"Variable {handler['name']} is not included in the table "
f"{handler['table']}"
)

continue
else:
if self.freq == "mon" and handler["table"] == "CMIP6_day.json":
Expand All @@ -675,6 +706,7 @@ def _run_info_mode(self): # noqa: C901
"table"
] == "CMIP6_Amon.json":
continue

hand_msg = get_handler_info_msg(handler)
messages.append(hand_msg)

Expand All @@ -684,6 +716,7 @@ def _run_info_mode(self): # noqa: C901
with xr.open_dataset(file_path) as ds:
for handler in self.handlers:
table_info = _get_table_info(self.tables_path, handler["table"])

if handler["name"] not in table_info["variable_entry"]:
continue

Expand All @@ -694,8 +727,9 @@ def _run_info_mode(self): # noqa: C901
if raw_var not in ds.data_vars:
has_vars = False

msg = f"Variable {handler['name']} is not present in the input dataset"
print_message(msg, status="error")
logger.error(
f"Variable {handler['name']} is not present in the input dataset"
)

break

Expand Down Expand Up @@ -737,29 +771,26 @@ def _run_info_mode(self): # noqa: C901
with open(self.info_out_path, "w") as outstream:
yaml.dump(messages, outstream)
elif self.output_path is not None:
with open(self.output_path, "w") as outstream:
yaml_filepath = os.path.join(self.output_path, "info.yaml")

with open(yaml_filepath, "w") as outstream:
yaml.dump(messages, outstream)
else:
pprint(messages)

def _run(self):
if self.serial_mode:
mode_str = "Serial"
run_func = self._run_serial
else:
mode_str = "Parallel"
run_func = self._run_parallel

try:
logger.info("--------------------------------------")
logger.info(f"| Running E3SM to CMIP in {mode_str}")
logger.info("--------------------------------------")
status = run_func()
except KeyboardInterrupt:
print_message(" -- keyboard interrupt -- ", "error")
logger.error(" -- keyboard interrupt -- ")
return 1
except Exception as e:
print_debug(e)
logger.error(e)
return 1

return status
Expand Down Expand Up @@ -830,7 +861,7 @@ def _run_serial(self) -> int: # noqa: C901
self.new_metadata_path,
)
except Exception as e:
print_debug(e)
logger.error(e)

if name is not None:
num_success += 1
Expand All @@ -846,7 +877,7 @@ def _run_serial(self) -> int: # noqa: C901
pbar.close()

except Exception as error:
print_debug(error)
logger.error(error)
return 1
else:
msg = f"{num_success} of {num_handlers} handlers complete"
Expand Down Expand Up @@ -937,7 +968,7 @@ def _run_parallel(self) -> int: # noqa: C901

logger.info(msg)
except Exception as e:
print_debug(e)
logger.error(e)
pbar.update(1)

pbar.close()
Expand All @@ -954,7 +985,7 @@ def _run_parallel(self) -> int: # noqa: C901
return 0

def _timeout_exit(self):
print_message("Hit timeout limit, exiting")
logger.info("Hit timeout limit, exiting")
os.kill(os.getpid(), signal.SIGINT)


Expand Down
Loading
Loading