Skip to content

Commit d29712b

Browse files
authored
Merge pull request #72 from GeoOcean/66-swash-wrapper-postprocess-flags
[JTH] add more info in monitor, force in swash postprocess and less warnings
2 parents 3a144fe + 06ae8f6 commit d29712b

File tree

6 files changed

+92
-19
lines changed

6 files changed

+92
-19
lines changed

bluemath_tk/core/decorators.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,13 @@ def wrapper(
142142
if not isinstance(normalize_data, bool):
143143
raise TypeError("Normalize data must be a boolean")
144144
return func(
145-
self, data, directional_variables, custom_scale_factor, min_number_of_points
145+
self,
146+
data,
147+
directional_variables,
148+
custom_scale_factor,
149+
min_number_of_points,
150+
max_number_of_iterations,
151+
normalize_data,
146152
)
147153

148154
return wrapper

bluemath_tk/core/models.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ def __init__(self) -> None:
4949
bluemath_num_workers = os.environ.get("BLUEMATH_NUM_WORKERS", None)
5050
omp_num_threads = os.environ.get("OMP_NUM_THREADS", None)
5151
if bluemath_num_workers is not None:
52-
self.logger.warning(
52+
self.logger.info(
5353
f"Setting self.num_workers to {bluemath_num_workers} due to BLUEMATH_NUM_WORKERS. \n"
5454
"Change it using self.set_num_processors_to_use method. \n"
5555
"Also setting OMP_NUM_THREADS to 1, to avoid conflicts with BlueMath parallel processing."
5656
)
5757
self.set_num_processors_to_use(num_processors=int(bluemath_num_workers))
5858
self.set_omp_num_threads(num_threads=1)
5959
elif omp_num_threads is not None:
60-
self.logger.warning(
60+
self.logger.info(
6161
f"Changing variable OMP_NUM_THREADS from {omp_num_threads} to 1. \n"
6262
f"And setting self.num_workers to {omp_num_threads}. \n"
6363
"To avoid conflicts with BlueMath parallel processing."
@@ -66,7 +66,7 @@ def __init__(self) -> None:
6666
self.set_num_processors_to_use(num_processors=int(omp_num_threads))
6767
else:
6868
self.num_workers = 1 # self.get_num_processors_available()
69-
self.logger.warning(
69+
self.logger.info(
7070
f"Setting self.num_workers to {self.num_workers}. "
7171
"Change it using self.set_num_processors_to_use method."
7272
)
@@ -511,8 +511,8 @@ def set_num_processors_to_use(self, num_processors: int) -> None:
511511
elif num_processors <= 0:
512512
raise ValueError("Number of processors must be greater than 0")
513513
elif (num_processors_available - num_processors) < 2:
514-
self.logger.warning(
515-
"Number of processors requested is less than 2 processors available"
514+
self.logger.info(
515+
"Number of processors requested leaves less than 2 processors available"
516516
)
517517

518518
# Set the number of processors to use

bluemath_tk/waves/binwaves.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
from typing import List, Tuple
34

@@ -123,6 +124,7 @@ def reconstruc_spectra(
123124
num_workers: int = None,
124125
memory_limit: float = 0.5,
125126
chunk_sizes: dict = {"time": 24},
127+
verbose: bool = False,
126128
):
127129
"""
128130
Reconstruct the onshore spectra using offshore spectra and kp coefficients.
@@ -146,6 +148,17 @@ def reconstruc_spectra(
146148
The reconstructed onshore spectra dataset.
147149
"""
148150

151+
if not verbose:
152+
# Suppress Dask logs
153+
logging.getLogger("distributed").setLevel(logging.ERROR)
154+
logging.getLogger("distributed.client").setLevel(logging.ERROR)
155+
logging.getLogger("distributed.scheduler").setLevel(logging.ERROR)
156+
logging.getLogger("distributed.worker").setLevel(logging.ERROR)
157+
logging.getLogger("distributed.nanny").setLevel(logging.ERROR)
158+
# Also suppress bokeh and tornado logs that Dask uses
159+
logging.getLogger("bokeh").setLevel(logging.ERROR)
160+
logging.getLogger("tornado").setLevel(logging.ERROR)
161+
149162
# Setup Dask client
150163
if num_workers is None:
151164
num_workers = os.environ.get("BLUEMATH_NUM_WORKERS", 4)

bluemath_tk/wrappers/_base_wrappers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def set_cases_dirs_from_output_dir(self) -> None:
275275
[
276276
op.join(self.output_dir, case_dir)
277277
for case_dir in os.listdir(self.output_dir)
278+
if op.isdir(op.join(self.output_dir, case_dir))
278279
]
279280
)
280281

bluemath_tk/wrappers/swash/swash_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def build_case(
3434
# Usage example
3535
if __name__ == "__main__":
3636
# Define the output directory
37-
output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath/test_cases/CHY" # CHANGE THIS TO YOUR DESIRED OUTPUT DIRECTORY!
37+
output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath_tk/test_cases/CHY" # CHANGE THIS TO YOUR DESIRED OUTPUT DIRECTORY!
3838
# Templates directory
3939
swash_file_path = op.dirname(inspect.getfile(SwashModelWrapper))
4040
templates_dir = op.join(swash_file_path, "templates")

bluemath_tk/wrappers/swash/swash_wrapper.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import re
3-
from typing import List, Tuple
3+
from typing import List, Tuple, Union
44

55
import numpy as np
66
import pandas as pd
@@ -341,32 +341,78 @@ def get_case_percentage_from_file(self, output_log_file: str) -> str:
341341

342342
return "0 %" # if no progress is found
343343

344-
def monitor_cases(self) -> pd.DataFrame:
344+
def monitor_cases(self, value_counts: str = None) -> Union[pd.DataFrame, dict]:
345345
"""
346346
Monitor the cases and log relevant information.
347347
348+
Parameters
349+
----------
350+
value_counts : str, optional
351+
The value counts to be returned.
352+
If "simple", it returns a dictionary with the number of cases in each status.
353+
If "cases", it returns a dictionary with the cases in each status.
354+
Default is None.
355+
348356
Returns
349357
-------
350-
pd.DataFrame
351-
The cases percentage.
358+
Union[pd.DataFrame, dict]
359+
The cases status as a pandas DataFrame or a dictionary with aggregated info.
352360
"""
353361

354362
cases_percentage = {}
355363

356364
for case_dir in self.cases_dirs:
357-
output_log_file = os.path.join(case_dir, "wrapper_out.log")
358-
progress = self.get_case_percentage_from_file(
359-
output_log_file=output_log_file
360-
)
361-
cases_percentage[os.path.basename(case_dir)] = progress
365+
case_dir_name = os.path.basename(case_dir)
366+
if os.path.exists(os.path.join(case_dir, "Errfile")):
367+
cases_percentage[case_dir_name] = "Errfile"
368+
elif os.path.exists(os.path.join(case_dir, "norm_end")):
369+
cases_percentage[case_dir_name] = "END"
370+
else:
371+
run_tab_file = os.path.join(case_dir, "run.tab")
372+
if os.path.exists(run_tab_file):
373+
run_tab = self._read_tabfile(file_path=run_tab_file)
374+
if run_tab.isnull().values.any():
375+
cases_percentage[case_dir_name] = "NaN"
376+
continue
377+
else:
378+
cases_percentage[case_dir_name] = "No run.tab"
379+
continue
380+
output_log_file = os.path.join(case_dir, "wrapper_out.log")
381+
progress = self.get_case_percentage_from_file(
382+
output_log_file=output_log_file
383+
)
384+
cases_percentage[case_dir_name] = progress
362385

363-
return pd.DataFrame(cases_percentage.items(), columns=["Case", "Percentage"])
386+
full_monitorization_df = pd.DataFrame(
387+
cases_percentage.items(), columns=["Case", "Percentage"]
388+
)
389+
if value_counts:
390+
value_counts_df = full_monitorization_df.set_index("Case").value_counts()
391+
if value_counts == "simple":
392+
return value_counts_df
393+
value_counts_unique_values = [
394+
run_type[0] for run_type in value_counts_df.index.values
395+
]
396+
value_counts_dict = {
397+
run_type: list(
398+
full_monitorization_df.where(
399+
full_monitorization_df["Percentage"] == run_type
400+
)
401+
.dropna()["Case"]
402+
.values
403+
)
404+
for run_type in value_counts_unique_values
405+
}
406+
return value_counts_dict
407+
else:
408+
return full_monitorization_df
364409

365410
def postprocess_case(
366411
self,
367412
case_num: int,
368413
case_dir: str,
369414
output_vars: List[str] = None,
415+
force: bool = False,
370416
remove_tab: bool = False,
371417
remove_nc: bool = False,
372418
) -> xr.Dataset:
@@ -379,6 +425,8 @@ def postprocess_case(
379425
The case number.
380426
case_dir : str
381427
The case directory.
428+
force : bool, optional
429+
Force the postprocessing, re-creating the output.nc file. Default is False.
382430
output_vars : list, optional
383431
The output variables to postprocess. Default is None.
384432
remove_tab : bool, optional
@@ -403,13 +451,15 @@ def postprocess_case(
403451
output_vars = list(self.postprocess_functions.keys())
404452

405453
output_nc_path = os.path.join(case_dir, "output.nc")
406-
if not os.path.exists(output_nc_path):
454+
if not os.path.exists(output_nc_path) or force:
407455
# Convert tab files to netCDF file
408456
output_path = os.path.join(case_dir, "output.tab")
409457
run_path = os.path.join(case_dir, "run.tab")
410458
output_nc = self._convert_case_output_files_to_nc(
411459
case_num=case_num, output_path=output_path, run_path=run_path
412460
)
461+
if os.path.exists(output_nc_path):
462+
os.remove(output_nc_path)
413463
output_nc.to_netcdf(output_nc_path)
414464
else:
415465
self.logger.info("Reading existing output.nc file.")
@@ -432,7 +482,10 @@ def postprocess_case(
432482
ds = xr.merge(var_ds_list, compat="no_conflicts")
433483

434484
# Save Dataset to netCDF file
435-
ds.to_netcdf(os.path.join(case_dir, "output_postprocessed.nc"))
485+
processed_nc_path = os.path.join(case_dir, "output_postprocessed.nc")
486+
if os.path.exists(processed_nc_path):
487+
os.remove(processed_nc_path)
488+
ds.to_netcdf(processed_nc_path)
436489

437490
# Remove raw files to save space
438491
if remove_tab:

0 commit comments

Comments
 (0)