Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 112 additions & 87 deletions Jinja2Filters/form_remap_dep.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
import re
import os
import metomi.rose.config
import ast
"""! form_remap_dep parses the remap_pp_components rose-app.conf and uses input from rose-suite.conf in the form of
env variables and returns the pp component and source name dependencies for remap_pp_components task execution. For
instance, for an atmos PP component that requires the regridded atmos_month and regridded atmos_daily history
files, this JinjaFilter when called within flow.cylc helps identify this dependency to complete the corresponding
task graph. This JinjaFilter ensures a remap-pp-component only waits for the dependent make-timeseries tasks such
that the succeeded components output are made available in the final destination.
"""
form_remap_dep:
- parses the remap_pp_components rose-app.conf
- uses input from rose-suite.conf in the form of env variables
- returns the pp component and source name dependencies for
remap_pp_components task execution.

For instance, for an atmos PP component that requires the regridded
atmos_month and regridded atmos_daily history files, this JinjaFilter
when called within flow.cylc helps identify this dependency to
complete the corresponding task graph.

This JinjaFilter ensures a remap-pp-component only waits for the
dependent make-timeseries tasks such that the succeeded components
output are made available in the final destination.

See form_remap_dep invocations from flow.cylc
"""
# @file form_remap_dep.py
# Author(s)
# Created by A.Radhakrishnan on 06/27/2022
# Credit MSD workflow team

# Function parameter type hints, PEP 484

def form_remap_dep(grid_type: str, temporal_type: str, chunk: str, pp_components_str: str, output_type: str, history_segment: str=None) -> str:

""" Form the task parameter list based on the grid type, the temporal type, and the desired pp component(s)
import os
from pathlib import Path
import yaml

def form_remap_dep(grid_type: str,
temporal_type: str,
chunk: str,
pp_components_str: str,
output_type: str,
yamlfile: str,
history_segment: str=None) -> str:
"""
Form the task parameter list based on the grid type,
the temporal type, and the desired pp component(s)

Arguments:
@param grid_type (str): One of: native or regrid-xy
@param temporal_type (str): One of: temporal or static
@param chunk (str): e.g P5Y for 5-year time series
@param pp_component (str): all, or a space-separated list
@param output_type (str): ts or av
@param history_segment (str): if given, handle special case where history segment equals pp-chunk-a
@param yamlfile (str): yaml configuration file passed through workflow
@param history_segment (str): if given, handle special case where history
segment equals pp-chunk-a

@return remap_dep (multiline str) with Jinja formatting listing source-pp dependencies
@return remap_dep (multiline str) with Jinja formatting listing source-pp
dependencies
"""
pp_components = pp_components_str.split(' ')
if(grid_type == "regrid-xy"):
grid = "regrid"
if grid_type == "regrid-xy":
grid = "regrid"
else:
grid = grid_type
grid = grid_type

# Determine the task needed to run before remap-pp-components
# Note: history_segment should be specified for primary chunk generation,
Expand All @@ -51,94 +66,104 @@ def form_remap_dep(grid_type: str, temporal_type: str, chunk: str, pp_components
raise Exception("output type not supported")

#print(pp_components)
#print(chunk)
#print(chunk)
########################
dict_group_source={}
remap_comp = None
remap_comp = None
#print("DEBUG: Passed args ",grid_type, temporal_type, chunk, pp_components_str)
remap_dep = ""
remap_dep = ""
#print("DEBUG: desired pp components:", pp_components)
path_to_conf = os.path.dirname(os.path.abspath(__file__)) + '/../app/remap-pp-components/rose-app.conf'
node = metomi.rose.config.load(path_to_conf)
results = []

# Path to yaml configuration
exp_dir = Path(__file__).resolve().parents[1]
path_to_yamlconfig = os.path.join(exp_dir, yamlfile)

# Load and read yaml configuration
with open(path_to_yamlconfig,'r') as yml:
yml_info = yaml.safe_load(yml)

makets_stmt = ""
regex_pp_comp = re.compile('^\w+')
for keys, sub_node in node.walk():
# only target the keys
if len(keys) != 1:
continue
# Loop through pp components; check components passd in script are defined in yaml config
for comp_info in yml_info["postprocess"]["components"]:
comp = comp_info.get("type")

# skip env and command keys
item = keys[0]
if item == "env" or item == "command":
continue
comp = regex_pp_comp.match(item).group()
#print("DEBUG: Examining", item, comp)
#res = [pp_comp for pp_comp in pp_components if(pp_comp in comp)]
if comp not in pp_components:
#print(comp, " not in", pp_components)
continue
#print("DEBUG: Examining", item, comp)
#print(comp, " not in", pp_components)
continue

# skip if grid type is not desired
# some grid types (i.e. regrid-xy) have subtypes (i.e. 1deg, 2deg)
# in remap-pp-components/rose-app.conf the grid type and subgrid is specified as "regrid-xy/1deg" (e.g.).
# So we will strip off after the slash and the remainder is the grid type
candidate_grid_type = re.sub('\/.*', '', node.get_value(keys=[item, 'grid']))
# Set grid type if component has xyInterp defined or not
if "xyInterp" not in comp_info.keys():
candidate_grid_type = "native"
else:
candidate_grid_type = "regrid-xy"

if candidate_grid_type != grid_type:
#print("DEBUG: Skipping as not right grid; got", candidate_grid_type, "and wanted", grid_type)
continue

# skip if temporal type is not desired
# freq is optional, so if it does not exist then continue on
freq = node.get_value(keys=[item, 'freq'])
##########NOT SURE YET
# # skip if temporal type is not desired
# # freq is optional, so if it does not exist then continue on
# freq = node.get_value(keys=[item, 'freq'])
freq = comp_info.get("freq")
if temporal_type == "static":
if freq and 'P0Y' not in freq:
if freq is not None and 'P0Y' not in freq:
#print("DEBUG: Skipping as static is requested, no P0Y here", freq)
continue
elif (temporal_type == "temporal"):
if freq and 'P0Y' in freq:
elif temporal_type == "temporal":
if freq is not None and 'P0Y' in freq:
#print("DEBUG: Skipping as temporal is requested, P0Y here", freq)
continue
else:
raise Exception("Unknown temporal type:", temporal_type)
# chunk is optional, so if it does not exist then continue on
chunk_from_config = node.get_value(keys=[item, 'chunk'])
if chunk_from_config and chunk not in chunk_from_config:
#print("DEBUG: Skipping as {} is requested, but not in rose-app config {}:".format(chunk, chunk_from_config))
continue

results = ast.literal_eval(node.get_value(keys=[item, 'sources']))
raise Exception(f"Unknown temporal type: {temporal_type}")
#########
# # chunk is optional, so if it does not exist then continue on
chunk_from_config = comp_info.get("chunk")
if chunk_from_config is not None and chunk not in chunk_from_config:
#print("DEBUG: Skipping as {} is requested, but not in rose-app config {}:".format(chunk, comp_info["chunk"]))
continue
##########
results=[]

# Get source list; append to results
for hist_file in comp_info["sources"]:
results.append(hist_file.get("history_file"))

remap_comp = comp
answer = sorted(list(set(results)))

if remap_comp is not None:
#If the same PP component is mapped to several sources per rose-app.conf, we make it a list and append values so we don't replace the key's value
if remap_comp in dict_group_source.keys():
dict_group_source[remap_comp].append(answer[0])
else:
dict_group_source[remap_comp] = answer
#If the same PP component is mapped to several sources per rose-app.conf, we make it a list and append values so we don't replace the key's value
if remap_comp in dict_group_source.keys():
dict_group_source[remap_comp].append(answer[0])
else:
dict_group_source[remap_comp] = answer

if dict_group_source:
for key, value in dict_group_source.items():
makets_stmt = ""
for src in value:
if(makets_stmt != ''):
# make-timeseries and make-timeavgs tasks have the chunksize in the task name,
# but rename-split-to-pp does not
if prereq_task == 'rename-split-to-pp':
makets_stmt = f"{makets_stmt} & {prereq_task}-{grid}_{src}"
else:
makets_stmt = f"{makets_stmt} & {prereq_task}-{grid}-{chunk}_{src}"
else:
if prereq_task == 'rename-split-to-pp':
makets_stmt = f"{prereq_task}-{grid}_{src}"
else:
makets_stmt = f"{prereq_task}-{grid}-{chunk}_{src}"
remap_stmt = f"remap-pp-components-{output_type}-{chunk}_{key}"
remap_dep_stmt = f"{makets_stmt} => {remap_stmt}"
remap_dep += f"{remap_dep_stmt}\n"
for key, value in dict_group_source.items():
makets_stmt = ""
for src in value:
if makets_stmt != '':
# make-timeseries and make-timeavgs tasks have the chunksize in the task name,
# but rename-split-to-pp does not
if prereq_task == 'rename-split-to-pp':
makets_stmt = f"{makets_stmt} & {prereq_task}-{grid}_{src}"
else:
makets_stmt = f"{makets_stmt} & {prereq_task}-{grid}-{chunk}_{src}"
else:
if prereq_task == 'rename-split-to-pp':
makets_stmt = f"{prereq_task}-{grid}_{src}"
else:
makets_stmt = f"{prereq_task}-{grid}-{chunk}_{src}"

remap_stmt = f"remap-pp-components-{output_type}-{chunk}_{key}"
remap_dep_stmt = f"{makets_stmt} => {remap_stmt}"
remap_dep += f"{remap_dep_stmt}\n"
# Possibly, no tasks are needed for the given request (grid type, temporal/static, chunk, components).
# When that happens just exit with an empty string and exit normally.
return(remap_dep)
return remap_dep

#print(form_remap_dep('regrid-xy', 'temporal', 'P4D', 'land atmos land_cubic'))
# Testing #
#print(form_remap_dep('regrid-xy', 'temporal', 'P4D', 'atmos_cmip atmos', 'ts', 'c96L65_am5f7b12r1_amip_TEST_GRAPH.yaml'))
108 changes: 53 additions & 55 deletions Jinja2Filters/form_task_parameters.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,89 @@
import re
"""
Form the task parameter list based on the grid type,
the temporal type,and the desired pp component(s)
"""

import os
import metomi.rose.config
import ast
from pathlib import Path
import logging
import yaml

# set up logging
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def form_task_parameters(grid_type, temporal_type, pp_components_str, yamlfile):
"""Form the task parameter list based on the grid type, the temporal type,
and the desired pp component(s)

"""
Arguments:
grid_type (str): One of: native or regrid-xy
temporal_type (str): One of: temporal or static
pp_component (str): all, or a space-separated list
"""
logger.debug(f"Desired pp components: {pp_components_str}")
"""
logger.debug("Desired pp components: %s", pp_components_str)
pp_components = pp_components_str.split()
path_to_conf = os.path.dirname(os.path.abspath(__file__)) + '/../app/remap-pp-components/rose-app.conf'
node = metomi.rose.config.load(path_to_conf)

# Path to yaml configuration
exp_dir = Path(__file__).resolve().parents[1]
path_to_yamlconfig = os.path.join(exp_dir, yamlfile)
# Load and read yaml configuration
# Load and read yaml configuration
with open(path_to_yamlconfig,'r') as yml:
yml_info = yaml.safe_load(yml)

#
results = []
regex_pp_comp = re.compile('^\w+')
for keys, sub_node in node.walk():
# only target the keys
if len(keys) != 1:
continue
for comp_info in yml_info["postprocess"]["components"]:
comp = comp_info.get("type")

# skip env and command keys
item = keys[0]
if item == "env" or item == "command":
# Check that pp_components defined matches those in the yaml file
# Skip component if they don't match
# skip if pp component not desired
if comp not in pp_components:
continue
comp = regex_pp_comp.match(item).group()
logger.debug(f"Examining item '{item}' comp '{comp}'")

# skip if pp component not desired
logger.debug(f"Is {comp} in {pp_components}?")
if comp in pp_components:
logger.debug('Yes')
# Set grid type if component has xyInterp defined or not
if "xyInterp" not in comp_info.keys():
candidate_grid_type = "native"
else:
logger.debug('No')
continue
candidate_grid_type = "regrid-xy"

# skip if grid type is not desired
# some grid types (i.e. regrid-xy) have subtypes (i.e. 1deg, 2deg)
# in remap-pp-components/rose-app.conf the grid type and subgrid is specified as "regrid-xy/1deg" (e.g.).
# So we will strip off after the slash and the remainder is the grid type
candidate_grid_type = re.sub('\/.*', '', node.get_value(keys=[item, 'grid']))
# Check that candidate_grid_type matches grid type passed in function
# If not, skip post-processing of component
if candidate_grid_type != grid_type:
logger.debug(f"Skipping as not right grid; got '{candidate_grid_type}' and wanted '{grid_type}'")
logger.debug("Skipping as not right grid; got '%s' and wanted '%s'", candidate_grid_type, grid_type)
continue

# filter static and temporal
# if freq is not set => temporal
# if freq includes "P0Y" => static
# if freq does not include "P0Y" => temporal
freq = node.get_value(keys=[item, 'freq'])
if freq is not None and 'P0Y' in freq and temporal_type == 'temporal':
logger.debug("Skipping static when temporal is requested")
continue
# Filter static and temporal
if temporal_type == "static":
if freq is not None and 'P0Y' not in freq:
logger.debug("Skipping as static is requested, no P0Y here", freq)
continue
elif (temporal_type == "temporal"):
if freq is not None and 'P0Y' in freq:
logger.debug("Skipping as temporal is requested, P0Y here", freq)
#print(comp_info["static"]["freq"])
if "static" not in comp_info.keys():
logger.debug("Skipping static as there are no static sources defined")
continue
else:
raise Exception("Unknown temporal type:", temporal_type)

# convert array in string form to array
sources = ast.literal_eval(node.get_value(keys=[item, 'sources']))
results.extend(sources)
for static_info in comp_info["static"]:
if static_info.get("source") is not None:
results.append(static_info.get("source"))
## to-do: assess offline diagnostics
# elif:
# results = results + static_info.get("offline_sources")

elif temporal_type == "temporal":
for hist_file in comp_info["sources"]:
results.append(hist_file.get("history_file"))
#results = results + comp_info.get("sources")

else:
raise Exception(f"Unknown temporal type: {temporal_type}")

# results list --> set --> list: checks for repetitive sources listed
answer = sorted(list(set(results)))
logger.debug("Returning string" + ', '.join(answer))
return(', '.join(answer))

# Returns a comma separated list of sources
logger.debug("Returning string %s", ', '.join(answer))
return ', '.join(answer)

## OWN TESTING ##
#print(form_task_parameters('regrid-xy', 'temporal', 'ocean_cobalt_sfc ocean_cobalt_btm', 'COBALT_postprocess.yaml')))
#print(form_task_parameters('regrid-xy', 'static', 'ocean_cobalt_sfc ocean_cobalt_btm', 'COBALT_postprocess.yaml'))
#print(form_task_parameters('native', 'temporal', 'ocean_cobalt_sfc ocean_cobalt_btm', 'COBALT_postprocess.yaml'))
#print(form_task_parameters('native', 'static', 'ocean_cobalt_sfc ocean_cobalt_btm', 'COBALT_postprocess.yaml'))
Loading
Loading