Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 74 additions & 35 deletions e3sm_to_cmip/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
from tqdm import tqdm

from e3sm_to_cmip import ROOT_HANDLERS_DIR, __version__, resources
from e3sm_to_cmip._logger import _setup_logger, _setup_root_logger
from datetime import datetime, timezone

from e3sm_to_cmip._logger import e2c_logger

from e3sm_to_cmip.cmor_handlers.utils import (
instantiate_h_utils_logger,
MPAS_REALMS,
REALMS,
Frequency,
Expand All @@ -32,7 +36,10 @@
derive_handlers,
load_all_handlers,
)
from e3sm_to_cmip.cmor_handlers.handler import instantiate_handler_logger

from e3sm_to_cmip.util import (
instantiate_util_logger,
FREQUENCIES,
_get_table_info,
get_handler_info_msg,
Expand All @@ -50,11 +57,6 @@
warnings.filterwarnings("ignore")


# Setup the root logger and this module's logger.
log_filename = _setup_root_logger()
logger = _setup_logger(__name__, propagate=True)


@dataclass
class CLIArguments:
"""A data class storing the command line arguments for e3sm_to_cmip.
Expand Down Expand Up @@ -97,9 +99,14 @@ class CLIArguments:

class E3SMtoCMIP:
def __init__(self, args: Optional[List[str]] = None):
global logger
# A dictionary of command line arguments.
parsed_args = self._parse_args(args)

# Setup this module's logger AFTER args parsed in E3SMtoCMIP init, so that
# default log file is NOT created for mere "--help" or "--version" calls.
logger = e2c_logger(name=__name__, set_log_level="INFO", to_logfile=True)

# NOTE: The order of these attributes align with class CLIArguments.
# ======================================================================
# Run Mode settings.
Expand All @@ -115,8 +122,6 @@ def __init__(self, args: Optional[List[str]] = None):
self.debug: bool = parsed_args.debug
self.timeout: int = parsed_args.timeout

# ======================================================================
# CMOR settings.
# ======================================================================
self.var_list: List[str] = self._get_var_list(parsed_args.var_list)
self.realm: Union[Realm, MPASRealm] = parsed_args.realm
Expand All @@ -140,6 +145,9 @@ def __init__(self, args: Optional[List[str]] = None):
if self.precheck_path is not None:
self._run_precheck()

self.handlers = self._get_handlers()

def echo_settings(self):
logger.info("--------------------------------------")
logger.info("| E3SM to CMIP Configuration")
logger.info("--------------------------------------")
Expand All @@ -149,9 +157,7 @@ def __init__(self, args: Optional[List[str]] = None):
logger.info(f" * precheck_path='{self.precheck_path}'")
logger.info(f" * freq='{self.freq}'")
logger.info(f" * realm='{self.realm}'")
logger.info(f" * Writing log output file to: {log_filename}")

self.handlers = self._get_handlers()
# logger.info(f" * Writing log output file to: {log_filename}")

def run(self):
# Setup logger information and print out e3sm_to_cmip CLI arguments.
Expand All @@ -161,17 +167,16 @@ def run(self):
self.output_path, "user_metadata.json"
)

# Setup directories using the CLI argument paths (e.g., output dir).
# ======================================================================
if not self.info_mode:
self._setup_dirs_with_paths()

# Run e3sm_to_cmip with info mode.
# ======================================================================
if self.info_mode:
self._run_info_mode()
sys.exit(0)

# Setup directories using the CLI argument paths (e.g., output dir).
# ======================================================================
self._setup_dirs_with_paths()

# Run e3sm_to_cmip to CMORize serially or in parallel.
# ======================================================================
timer = None
Expand Down Expand Up @@ -306,7 +311,9 @@ def _parse_args(self, args: Optional[List[str]]) -> CLIArguments:
sys.exit(1)

# Parse the arguments and perform validation.
# NOTE: exits here if args == "-h" or "--help" or "--version", else validate
parsed_args = argparser.parse_args(args_to_parse)

self._validate_parsed_args(parsed_args)

# Convert to this data class for type checking to work.
Expand Down Expand Up @@ -531,7 +538,7 @@ def _setup_argparser(self) -> argparse.ArgumentParser:
"-h",
"--help",
action="help",
default=argparse.SUPPRESS,
# default=argparse.SUPPRESS,
help="show this help message and exit",
)

Expand Down Expand Up @@ -635,13 +642,13 @@ def _setup_dirs_with_paths(self):
copy_user_metadata(self.user_metadata, self.output_path)

# Setup temp storage directory
temp_path = os.environ.get("TMPDIR")
if temp_path is None:
temp_path = f"{self.output_path}/tmp"
if not os.path.exists(temp_path):
os.makedirs(temp_path)
# temp_path = os.environ.get("TMPDIR")
# if temp_path is None:
# temp_path = f"{self.output_path}/tmp"
# if not os.path.exists(temp_path):
# os.makedirs(temp_path)

tempfile.tempdir = temp_path
# tempfile.tempdir = temp_path

def _run_info_mode(self): # noqa: C901
logger.info("--------------------------------------")
Expand Down Expand Up @@ -778,12 +785,17 @@ def _run_serial(self) -> int: # noqa: C901
int
1 if an error occurs, else 0
"""

# TODO: Make this a command-line flag.
do_pbar = False

try:
num_handlers = len(self.handlers)
num_success = 0
num_failure = 0
name = None

if self.realm != "atm":
if self.realm != "atm" and do_pbar:
pbar = tqdm(total=len(self.handlers))

for _, handler in enumerate(self.handlers):
Expand Down Expand Up @@ -816,7 +828,7 @@ def _run_serial(self) -> int: # noqa: C901
}

msg = f"Trying to CMORize with handler: {handler}"
logger.info(msg)
logger.critical(msg)

# NOTE: We need a try and except statement here for TypeError because
# the VarHandler.cmorize method does not use **kwargs, while the handle
Expand All @@ -838,17 +850,18 @@ def _run_serial(self) -> int: # noqa: C901
except Exception as e:
print_debug(e)

if name is not None:
if name is not None and name is not "":
num_success += 1
msg = f"Finished {name}, {num_success}/{num_handlers} jobs complete"
msg = f"Finished {name}, {num_success}/{num_handlers} jobs complete (via run_serial)"
logger.info(msg)
else:
num_failure += 1
msg = f"Error running handler {handler['name']}"
logger.info(msg)

if self.realm != "atm":
if self.realm != "atm" and do_pbar:
pbar.update(1)
if self.realm != "atm":
if self.realm != "atm" and do_pbar:
pbar.close()

except Exception as error:
Expand All @@ -858,6 +871,8 @@ def _run_serial(self) -> int: # noqa: C901
msg = f"{num_success} of {num_handlers} handlers complete"
logger.info(msg)

if num_failure > 0:
return 1
return 0

def _run_parallel(self) -> int: # noqa: C901
Expand All @@ -872,6 +887,9 @@ def _run_parallel(self) -> int: # noqa: C901
pool_res = list()
will_run = []

# NOTE: Make this a command-line flag.
do_pbar = False

for idx, handler in enumerate(self.handlers):
handler_method = handler["method"]
handler_variables = handler["raw_variables"]
Expand Down Expand Up @@ -926,8 +944,10 @@ def _run_parallel(self) -> int: # noqa: C901
pool_res.append(res)

# wait for each result to complete
pbar = tqdm(total=len(pool_res))
if do_pbar:
pbar = tqdm(total=len(pool_res))
num_success = 0
num_failure = 0
num_handlers = len(self.handlers)
finished_success = []
for idx, res in enumerate(pool_res):
Expand All @@ -936,38 +956,57 @@ def _run_parallel(self) -> int: # noqa: C901
finished_success.append(out)
if out:
num_success += 1
msg = f"Finished {out}, {idx + 1}/{num_handlers} jobs complete"
msg = f"Finished {out}, {idx + 1}/{num_handlers} jobs complete (via run_parallel)"
else:
num_failure += 1
msg = f'Error running handler {self.handlers[idx]["name"]}'
logger.error(msg)

logger.info(msg)
except Exception as e:
print_debug(e)
pbar.update(1)
if do_pbar:
pbar.update(1)

pbar.close()
if do_pbar:
pbar.close()
pool.shutdown()

msg = f"{num_success} of {num_handlers} handlers complete"
logger.info(msg)

failed = set(will_run) - set(finished_success)
if failed:
if failed or num_failure > 0:
logger.error(f"{', '.join(list(failed))} failed to complete")
logger.error(msg)
return 1

return 0

def _timeout_exit(self):
print_message("Hit timeout limit, exiting")
os.kill(os.getpid(), signal.SIGINT)

logger=None

def main(args: Optional[List[str]] = None):
global logger
def main(args: Optional[List[str]] = None):
global logger

app = E3SMtoCMIP(args)
app.run()

# These calls allow module loggers that create default logfiles to avoid being
# instantiated by arguments "--help" or "--version" upon import.
instantiate_util_logger()
instantiate_h_utils_logger()
instantiate_handler_logger()

app.echo_settings()
return app.run()


if __name__ == "__main__":


main()
Loading