Skip to content

Commit 777ae3b

Browse files
authored
Merge pull request #65 from cseptesting/56-decide-on-and-implement-dag-solving-strategy
Implement concurrent dag solver
2 parents 68c9d8e + d23caca commit 777ae3b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+37036
-357
lines changed

docs/guide/experiment_config.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Configuration files are written in ``YAML`` format and are divided into differen
4949
- Poisson S-test:
5050
func: poisson_evaluations.spatial_test
5151
plot_func: plot_poisson_consistency_test
52+
run_mode: parallel
53+
force_rerun: true
5254
postprocess:
5355
plot_forecasts:
5456
cmap: magma
@@ -278,3 +280,12 @@ The seismicity catalog can be defined with the ``catalog`` parameter. It represe
278280

279281
.. important::
280282
The main catalog will be stored, and consecutively filtered to the extent of each testing time-window, as well as to the experiment's spatial domain, and magnitude- and depth- ranges.
283+
284+
285+
Run Configuration
286+
-----------------
287+
288+
The ``run_mode`` parameter allows to perform the experiment tasks in either ``sequential`` (default) or ``parallel``.
289+
The former is appropriate for staging and testing the experiment is working, whereas ``parallel`` is optimal for heavy computations when the experiment is set to production (e.g., real conditions)
290+
291+
The ``force_rerun`` makes the experiment recompute every forecast. Default is ``false``, which allows when instantiating the experiment to make a self-discovery of existing forecasts.

docs/guide/postprocess_config.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -422,19 +422,19 @@ Here are some basic functionalities from **floatCSEP** to access catalogs, forec
422422
timewindow_str = timewindow2str(timewindow)
423423
model.get_forecast(timewindow_str)
424424
425-
* - :attr:`Model.registry.path <floatcsep.infrastructure.registries.ForecastRegistry>`
425+
* - :attr:`Model.registry.path <floatcsep.infrastructure.registries.ModelFileRegistry>`
426426
- Directory of the model file or source code.
427-
* - :attr:`Model.registry.database <floatcsep.infrastructure.registries.ForecastRegistry>`
427+
* - :attr:`Model.registry.database <floatcsep.infrastructure.registries.ModelFileRegistry>`
428428
- Database path where forecasts are stored.
429429
* - :attr:`TimeIndependentModel.forecast_unit <floatcsep.model.TimeIndependentModel>`
430430
- The forecast unit for a time independent model.
431431
* - :meth:`TimeDependentModel.func <floatcsep.model.TimeIndependentModel>`
432432
- The function command to execute a time dependent source code.
433433
* - :meth:`TimeDependentModel.func_kwargs`
434434
- The keyword arguments of the model, passed to the arguments file.
435-
* - :meth:`TimeDependentModel.registry.args_file <floatcsep.infrastructure.registries.ForecastRegistry>`
435+
* - :meth:`TimeDependentModel.registry.args_file <floatcsep.infrastructure.registries.ModelFileRegistry>`
436436
- The path of the arguments file. Default is ``args.txt``.
437-
* - :meth:`TimeDependentModel.registry.input_cat <floatcsep.infrastructure.registries.ForecastRegistry>`
437+
* - :meth:`TimeDependentModel.registry.input_cat <floatcsep.infrastructure.registries.ModelFileRegistry>`
438438
- The path of the input catalog for the model execution.
439439

440440

docs/reference/api_reference.rst

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ instances onto an experimental workflow. The class and its main methods are:
5252
Experiment.set_models
5353
Experiment.set_tests
5454
Experiment.stage_models
55-
Experiment.set_input_cat
56-
Experiment.set_test_cat
5755
Experiment.set_tasks
5856
Experiment.run
5957
Experiment.read_results
@@ -74,7 +72,6 @@ reading. The abstract and concrete classes, and their main methods are:
7472
Model.factory
7573

7674
TimeIndependentModel
77-
TimeIndependentModel.init_db
7875
TimeIndependentModel.get_forecast
7976

8077
TimeDependentModel.stage
@@ -128,8 +125,8 @@ These are the helper functions of ``floatCSEP``
128125
parse_timedelta_string
129126
read_time_cfg
130127
read_region_cfg
131-
timewindows_ti
132-
timewindows_td
128+
time_windows_ti
129+
time_windows_td
133130

134131

135132
Some additional plotting functions to pyCSEP are:
@@ -148,16 +145,26 @@ Some additional plotting functions to pyCSEP are:
148145

149146
A small wrapper for ``pyCSEP`` readers
150147

151-
.. currentmodule:: floatcsep.utils.readers
148+
.. currentmodule:: floatcsep.utils.file_io
152149

153150
.. autosummary::
154151
:nosignatures:
155152

156-
ForecastParsers.dat
157-
ForecastParsers.xml
158-
ForecastParsers.quadtree
159-
ForecastParsers.csv
160-
ForecastParsers.hdf5
153+
CatalogParser.ascii
154+
CatalogParser.json
155+
156+
CatalogSerializer.ascii
157+
CatalogSerializer.json
158+
159+
GriddedForecastParsers.dat
160+
GriddedForecastParsers.xml
161+
GriddedForecastParsers.quadtree
162+
GriddedForecastParsers.csv
163+
GriddedForecastParsers.hdf5
164+
165+
CatalogForecastParsers.csv
166+
CatalogForecastParsers.load_hermes_catalog
167+
161168
HDF5Serializer.grid2hdf5
162169
serialize
163170

@@ -194,20 +201,25 @@ components (e.g., forecasts, catalogs, results, etc.), and allows to be aware of
194201
.. autosummary::
195202
:nosignatures:
196203

197-
ForecastRegistry
198-
ForecastRegistry.get_forecast
199-
ForecastRegistry.fmt
200-
ForecastRegistry.forecast_exists
201-
ForecastRegistry.build_tree
202-
203-
ExperimentRegistry
204-
ExperimentRegistry.add_forecast_registry
205-
ExperimentRegistry.get_forecast_registry
206-
ExperimentRegistry.get_result
207-
ExperimentRegistry.get_test_catalog
208-
ExperimentRegistry.get_figure
209-
ExperimentRegistry.result_exist
210-
ExperimentRegistry.build_tree
204+
ModelFileRegistry
205+
ModelFileRegistry.fmt
206+
ModelFileRegistry.get_input_catalog_key
207+
ModelFileRegistry.get_forecast_key
208+
ModelFileRegistry.get_args_key
209+
ModelFileRegistry.get_input_dir
210+
ModelFileRegistry.get_forecast_dir
211+
ModelFileRegistry.get_args_template_path
212+
ModelFileRegistry.forecast_exists
213+
ModelFileRegistry.build_tree
214+
215+
ExperimentFileRegistry
216+
ExperimentFileRegistry.add_model_registry
217+
ExperimentFileRegistry.get_model_registry
218+
ExperimentFileRegistry.get_result_key
219+
ExperimentFileRegistry.get_test_catalog_key
220+
ExperimentFileRegistry.get_figure_key
221+
ExperimentFileRegistry.result_exist
222+
ExperimentFileRegistry.build_tree
211223

212224

213225
**Repositories**
@@ -225,8 +237,9 @@ catalogs, forecasts), abstracting the experiment logic from the pyCSEP io functi
225237
CatalogRepository.set_main_catalog
226238
CatalogRepository.catalog
227239
CatalogRepository.get_test_cat
228-
CatalogRepository.set_test_cat
229-
CatalogRepository.set_input_cat
240+
CatalogRepository.set_test_cats
241+
CatalogRepository.set_input_cats
242+
CatalogRepository.filter_catalog
230243

231244
GriddedForecastRepository
232245
GriddedForecastRepository.load_forecast

docs/reference/infrastructure.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ and the required workflow to run an Experiment.
77
Registries
88
----------
99

10-
.. autoclass:: floatcsep.infrastructure.registries.ForecastRegistry
10+
.. autoclass:: floatcsep.infrastructure.registries.ModelFileRegistry
1111
:members:
1212
:undoc-members:
1313
:show-inheritance:
1414

15-
.. autoclass:: floatcsep.infrastructure.registries.ExperimentRegistry
15+
.. autoclass:: floatcsep.infrastructure.registries.ExperimentFileRegistry
1616
:members:
1717
:undoc-members:
1818
:show-inheritance:

docs/reference/utilities.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ This section documents the `accessors` module.
2727
:inherited-members:
2828

2929

30-
Readers
31-
-------
32-
This section documents the `readers` module.
30+
Readers and Parsers
31+
-------------------
32+
This section documents the `file_io` module.
3333

34-
.. automodule:: floatcsep.utils.readers
34+
.. automodule:: floatcsep.utils.file_io
3535
:members:
3636
:undoc-members:
3737
:show-inheritance:

docs/tutorials/case_a.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ The source code can be found in the ``tutorials/case_a`` folder or in `GitHub <
4646
.. literalinclude:: ../../tutorials/case_a/catalog.csep
4747
:caption: tutorials/case_a/catalog.csep
4848

49-
* The forecast ``best_model.dat`` to be evaluated is written in the ``.dat`` format (see :doc:`pycsep:concepts/forecasts`). Forecast formats are detected automatically (see :mod:`floatcsep.utils.readers.ForecastParsers`)
49+
* The forecast ``best_model.dat`` to be evaluated is written in the ``.dat`` format (see :doc:`pycsep:concepts/forecasts`). Forecast formats are detected automatically (see :mod:`floatcsep.utils.file_io.GriddedForecastParsers`)
5050

5151
.. literalinclude:: ../../tutorials/case_a/best_model.dat
5252
:caption: tutorials/case_a/best_model.dat

floatcsep/experiment.py

Lines changed: 34 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class Experiment:
7777
7878
model_config (str): Path to the models' configuration file
7979
test_config (str): Path to the evaluations' configuration file
80-
run_mode (str): 'serial' or 'parallel'
80+
run_mode (str): 'sequential' or 'parallel'
8181
default_test_kwargs (dict): Default values for the testing
8282
(seed, number of simulations, etc.)
8383
postprocess (dict): Contains the instruction for postprocessing
@@ -99,10 +99,11 @@ def __init__(
9999
catalog: str = None,
100100
models: str = None,
101101
tests: str = None,
102+
exp_class: str = "ti",
102103
postprocess: str = None,
103104
default_test_kwargs: dict = None,
104105
run_dir: str = "results",
105-
run_mode: str = "serial",
106+
run_mode: str = "sequential",
106107
stage_dir: ... = "results",
107108
report_hook: dict = None,
108109
**kwargs,
@@ -155,14 +156,19 @@ def __init__(
155156
f"\tMagnitude range: [{numpy.min(self.magnitudes)},"
156157
f" {numpy.max(self.magnitudes)}]"
157158
)
159+
exp_class_str = (
160+
"Time-Dependent"
161+
if self.exp_class in ("td", "time-dependent")
162+
else "Time-Independent"
163+
)
164+
log.info(f"\tExperiment class: {exp_class_str}")
158165

159166
self.catalog = None
160167
self.models = []
161168
self.tests = []
162169

163170
self.postprocess = postprocess if postprocess else {}
164171
self.default_test_kwargs = default_test_kwargs
165-
166172
self.catalog_repo.set_main_catalog(catalog, self.time_config, self.region_config)
167173

168174
self.models = self.set_models(
@@ -347,35 +353,12 @@ def set_tests(self, test_config: Union[str, Dict, List]) -> list:
347353

348354
return tests
349355

350-
def set_test_cat(self, tstring: str) -> None:
351-
"""
352-
Filters the complete experiment catalog to a test sub-catalog bounded by the test
353-
time-window. Writes it to filepath defined in :attr:`Experiment.registry`
354-
355-
Args:
356-
tstring (str): Time window string
357-
"""
358-
359-
self.catalog_repo.set_test_cat(tstring)
360-
361-
def set_input_cat(self, tstring: str, model: Model) -> None:
362-
"""
363-
Filters the complete experiment catalog to an input sub-catalog filtered to the
364-
beginning of the test time-window.
365-
366-
Args:
367-
tstring (str): Time window string
368-
model (:class:`~floatcsep.model.Model`): Model to give the input
369-
catalog
370-
"""
371-
self.catalog_repo.set_input_cat(tstring, model)
372-
373356
def set_tasks(self) -> None:
374357
"""
375358
Lazy definition of the experiment core tasks by wrapping instances,
376359
methods and arguments. Creates a graph with task nodes, while assigning
377360
task-parents to each node, depending on each Evaluation signature.
378-
The tasks can then be run in serial as a list or asynchronous
361+
The tasks can then be run in sequential as a list or asynchronous
379362
using the graph's node dependencies.
380363
For instance:
381364
@@ -403,38 +386,39 @@ def set_tasks(self) -> None:
403386
# Prepare the testing catalogs
404387
task_graph = TaskGraph()
405388
for time_i in tw_strings:
406-
# The method call Experiment.set_test_cat(time_i) is created lazily
407-
task_i = Task(instance=self, method="set_test_cat", tstring=time_i)
408-
# An is added to the task graph
389+
task_i = Task(instance=self.catalog_repo, method="set_test_cats", tstring=time_i)
409390
task_graph.add(task_i)
410-
# the task will be executed later with Experiment.run()
411-
# once all the tasks are defined
391+
if self.exp_class in ["td", "time_dependent"]:
392+
task_j = Task(
393+
instance=self.catalog_repo,
394+
method="set_input_cats",
395+
tstring=time_i,
396+
models=self.models,
397+
)
398+
399+
task_graph.add(task=task_j)
412400

413401
# Set up the Forecasts creation
414402
for time_i in tw_strings:
415403
for model_j in self.models:
416-
if isinstance(model_j, TimeDependentModel):
417-
task_tj = Task(
418-
instance=self, method="set_input_cat", tstring=time_i, model=model_j
419-
)
420-
421-
task_graph.add(task=task_tj)
422-
# A catalog needs to have been filtered
423-
424404
task_ij = Task(
425405
instance=model_j,
426406
method="create_forecast",
427407
tstring=time_i,
428408
force=self.force_rerun,
429409
)
410+
430411
task_graph.add(task=task_ij)
431412
# A catalog needs to have been filtered
432413
if isinstance(model_j, TimeDependentModel):
433414
task_graph.add_dependency(
434-
task_ij, dep_inst=self, dep_meth="set_input_cat", dkw=(time_i, model_j)
415+
task_ij,
416+
dep_inst=self.catalog_repo,
417+
dep_meth="set_input_cats",
418+
dkw=(time_i, model_j),
435419
)
436420
task_graph.add_dependency(
437-
task_ij, dep_inst=self, dep_meth="set_test_cat", dkw=time_i
421+
task_ij, dep_inst=self.catalog_repo, dep_meth="set_test_cats", dkw=time_i
438422
)
439423

440424
# Set up the Consistency Tests
@@ -450,6 +434,7 @@ def set_tasks(self) -> None:
450434
region=self.region,
451435
)
452436
task_graph.add(task_ijk)
437+
453438
# the forecast needs to have been created
454439
task_graph.add_dependency(
455440
task_ijk, dep_inst=model_j, dep_meth="create_forecast", dkw=time_i
@@ -531,7 +516,6 @@ def set_tasks(self) -> None:
531516
task_graph.add_dependency(
532517
task_k, dep_inst=m_j, dep_meth="create_forecast", dkw=time_str
533518
)
534-
535519
self.task_graph = task_graph
536520

537521
def run(self) -> None:
@@ -544,13 +528,16 @@ def run(self) -> None:
544528
- Memory monitor?
545529
- Queuer?
546530
"""
547-
log.info(f"Running {self.task_graph.ntasks} tasks")
548531

549532
if self.seed:
550533
numpy.random.seed(self.seed)
551534

552-
self.task_graph.run()
553-
log.info("Calculation completed")
535+
if self.run_mode == "parallel":
536+
cpu_count = os.cpu_count() or 4
537+
workers = getattr(self, "concurrent_tasks", None) or min(cpu_count, 32)
538+
self.task_graph.run_parallel(max_workers=workers)
539+
else:
540+
self.task_graph.run()
554541
log.debug("Post-run forecast registry")
555542
log_models_tree(log, self.registry, self.time_windows)
556543
log.debug("Post-run result summary")
@@ -677,7 +664,7 @@ def from_yml(cls, config_yml: str, repr_dir=None, **kwargs):
677664
Returns:
678665
An :class:`~floatcsep.experiment.Experiment` class instance
679666
"""
680-
log.info("Initializing experiment from .yml file")
667+
log.info(f"Initializing experiment from {config_yml} file")
681668
with open(config_yml, "r") as yml:
682669

683670
# experiment configuration file

0 commit comments

Comments
 (0)