1+ import logging
12from pathlib import Path
23
34from metomi .isodatetime .parsers import DurationParser , TimePointParser
45from yaml import safe_load
56
6- from legacy_date_conversions import *
7+ from legacy_date_conversions import convert_iso_duration_to_bronx_chunk
78
89# set up logging
9- import logging
1010logging .basicConfig ()
1111logger = logging .getLogger (__name__ )
1212logger .setLevel (logging .INFO )
1717time_parser = TimePointParser (assumed_time_zone = (0 , 0 ))
1818
1919
20- class AnalysisScript ( object ) :
20+ class AnalysisScript :
2121 def __init__ (self , name , config , experiment_components , experiment_starting_date ,
22- experiment_stopping_date , pp_chunks ):
22+ experiment_stopping_date , pp_chunks , yaml ):
2323 """Initialize the analysis script object.
2424
2525 Args:
@@ -29,13 +29,14 @@ def __init__(self, name, config, experiment_components, experiment_starting_date
2929 experiment_starting_date: Starting date for the experiment.
3030 experiment_stopping_date: Stopping date for the experiment.
3131 pp_chunks: List of ISO8601 durations used by the workflow.
32+ yaml: Resolved postprocessing yaml
3233 """
3334 self .name = name
3435 logger .debug (f"{ name } : initializing AnalysisScript instance" )
3536
3637 # Skip if configuration wants to skip it
3738 self .switch = config ["workflow" ]["analysis_on" ]
38- if self .switch == False :
39+ if self .switch is False :
3940 return
4041
4142 # Skip if the components are not available
@@ -55,9 +56,30 @@ def __init__(self, name, config, experiment_components, experiment_starting_date
5556 self .script_type = config ["workflow" ]["script_type" ]
5657 self .chunk = duration_parser .parse (config ["workflow" ]["chunk_size" ])
5758
58- if self .chunk not in pp_chunks :
59- raise ValueError (f"ERROR: Analysis script '{ self .name } ' requests chunk size '{ self .chunk } ', but " +
60- f"this chunk size is not declared in 'pp_chunks'" )
59+ # Retrieve other config
60+ self .data_frequency = config ["required" ]["data_frequency" ]
61+
62+ # check for needed pp prerequisites
63+ if self .product not in ['av' , 'ts' ]:
64+ raise ValueError ("ERROR: product type must be 'ts' or 'av'" )
65+ if self .product == "ts" :
66+ if self .chunk not in pp_chunks :
67+ raise ValueError (f"ERROR: Analysis script '{ self .name } ' requests timeseries chunk size '{ self .chunk } ', but " +
68+ "this chunk size is not declared in 'pp_chunks'" )
69+ else :
70+ # Loop through the components and look for the ones specified by the analysis script
71+ # For each component to check, confirm that its climatology section contains the requested climo chunk
72+ for ana_comp in config ["workflow" ]["components" ]:
73+ found_needed_inputs_for_component = False
74+ for exp_comp in yaml ["postprocess" ]["components" ]:
75+ if exp_comp ["type" ] == ana_comp :
76+ if 'climatology' in exp_comp :
77+ for climo_request in exp_comp ["climatology" ]:
78+ if climo_request ["frequency" ] == self .data_frequency and climo_request ["interval_years" ] == self .chunk .years :
79+ found_needed_inputs_for_component = True
80+ if not found_needed_inputs_for_component :
81+ raise ValueError (f"ERROR: Analysis script '{ self .name } ' requests climatology chunk size '{ self .chunk } ', but " +
82+ f"no suitable climatology sections were found in postprocess component '{ ana_comp } '" )
6183
6284 # Parse the new analysis config items
6385 if 'legacy' in config :
@@ -73,8 +95,6 @@ def __init__(self, name, config, experiment_components, experiment_starting_date
7395 else :
7496 self .is_legacy = False
7597
76- self .data_frequency = config ["required" ]["data_frequency" ]
77-
7898 # if dates are years, convert to string or else ISO conversion will fail
7999 if isinstance (config ["required" ]["date_range" ][0 ], int ):
80100 one = "{:04d}" .format (config ["required" ]["date_range" ][0 ])
@@ -98,7 +118,7 @@ def graph(self, analysis_only):
98118 Returns:
99119 String cylc task graph for the analysis.
100120 """
101- if self .switch == False :
121+ if self .switch is False :
102122 return ""
103123
104124 graph = ""
@@ -191,7 +211,7 @@ def definition(self, pp_dir):
191211 Returns:
192212 Cylc task definition string for this analysis script
193213 """
194- if self .switch == False :
214+ if self .switch is False :
195215 return ""
196216
197217 definitions = ""
@@ -270,12 +290,12 @@ def definition(self, pp_dir):
270290 new_analysis_str = f"""
271291 [[analysis-{ self .name } ]]
272292 script = '''
273- fre analysis run \
274- --name freanalysis_{ self .name } \
275- --catalog $catalog \
276- --output-directory $out_dir/{ self .name } \
277- --output-yaml $out_dir/{ self .name } /output.yaml \
278- --experiment-yaml $experiment_yaml \
293+ fre analysis run
294+ --name freanalysis_{ self .name }
295+ --catalog $catalog
296+ --output-directory $out_dir/{ self .name }
297+ --output-yaml $out_dir/{ self .name } /output.yaml
298+ --experiment-yaml $experiment_yaml
279299 --library-directory $CYLC_WORKFLOW_SHARE_DIR/analysis-envs/freanalysis_{ self .name }
280300 '''
281301 # retry 10 times (due to mysterious intake-esm issue)
@@ -293,25 +313,27 @@ def definition(self, pp_dir):
293313 '''
294314 """
295315
296- if self .script_type == "independent" and self . date_range == self . experiment_date_range :
316+ if self .script_type == "independent" :
297317 # to make the task run, we will create a corresponding task graph below
298318 # corresponding to the interval (chunk), e.g. ANALYSIS-P1Y.
299319 # Then, the analysis script will inherit from that family, to enable
300320 # both the task triggering and the yr1 and datachunk template vars.
301- logger .info (f"{ self .name } : Will run every chunk { self .chunk } " )
321+ logger .debug (f"{ self .name } : Will run every chunk { self .chunk } " )
302322 if self .is_legacy :
303323 definitions += legacy_analysis_str
304324 else :
305325 definitions += new_analysis_str
306326
307327 # create the task family for all every-interval analysis scripts
328+ interval_years_minus_one = self .chunk - one_year
308329 definitions += f"""
309330 [[data-catalog-{ self .chunk } ]]
310331 inherit = DATA-CATALOG
311332 [[ANALYSIS-{ self .chunk } ]]
312333 inherit = ANALYSIS
313334 [[[environment]]]
314335 yr1 = $(cylc cycle-point --template=CCYY)
336+ yr2 = $(cylc cycle-point --template=CCYY --offset-years={ interval_years_minus_one .years } )
315337 databegyr = $yr1
316338 dataendyr = $yr2
317339 datachunk = { self .chunk .years }
@@ -346,12 +368,12 @@ def definition(self, pp_dir):
346368 logger .debug (f"{ self .name } : Finished determining scripting" )
347369 return definitions
348370
349- if self .script_type == "cumulative" and self . date_range == self . experiment_date_range :
371+ if self .script_type == "cumulative" :
350372 # Case 2: run the analysis every chunk, but depend on all previous chunks too.
351373 # To make the task run, we will create a task family for
352374 # each chunk/interval, starting from the beginning of pp data
353375 # then we create an analysis script task for each of these task families.
354- logger .info (f"{ self .name } : Will run each chunk { self .chunk } from beginning { self .experiment_date_range [0 ]} " )
376+ logger .debug (f"{ self .name } : Will run each chunk { self .chunk } from beginning { self .experiment_date_range [0 ]} " )
355377 date = self .experiment_date_range [0 ]
356378 while date <= self .experiment_date_range [1 ]:
357379 date_str = f"{ date .year :04} "
@@ -428,7 +450,7 @@ def definition(self, pp_dir):
428450 d2 -= self .chunk
429451 d1_str = f"{ d1 .year :04} "
430452 d2_str = f"{ d2 .year :04} "
431- logger .info (f"{ self .name } : Will run once for time period { self .date_range [0 ]} to { self .date_range [1 ]} (chunks { d1_str } to { d2_str } )" )
453+ logger .debug (f"{ self .name } : Will run once for time period { self .date_range [0 ]} to { self .date_range [1 ]} (chunks { d1_str } to { d2_str } )" )
432454 date1_str = f"{ self .date_range [0 ].year :04} "
433455 date2_str = f"{ self .date_range [1 ].year :04} "
434456
@@ -494,9 +516,9 @@ def task_generator(yaml_, experiment_components, experiment_start, experiment_st
494516 for script_name , script_params in yaml_ ["analysis" ].items ():
495517 # Retrieve information about the script
496518 script_info = AnalysisScript (script_name , script_params , experiment_components ,
497- experiment_start , experiment_stop , pp_chunks )
498- if script_info .switch == False :
499- logger .info (f"{ script_name } : Skipping, switch set to off" )
519+ experiment_start , experiment_stop , pp_chunks , yaml_ )
520+ if script_info .switch is False :
521+ logger .debug (f"{ script_name } : Skipping, switch set to off" )
500522 continue
501523 yield script_info
502524
@@ -575,7 +597,7 @@ def get_analysis_info(experiment_yaml, info_type, experiment_components, pp_dir,
575597 logger .debug ("get_analysis_info: about to return graph" )
576598 return task_graph (yaml_ , experiment_components , experiment_start ,
577599 experiment_stop , pp_chunks , analysis_only )
578- elif info_type == "task-definitions" :
600+ if info_type == "task-definitions" :
579601 logger .debug ("get_analysis_info: about to return definitions" )
580602 return task_definitions (yaml_ , experiment_components , experiment_start ,
581603 experiment_stop , pp_chunks , pp_dir )
0 commit comments