Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Data
benchmarks/
*.csv
*.tsv
*.parquet
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ uv run mrp run example_model.mrp.toml --input seed=42 --input max_gen=10

You can run `uv tool install cfa-mrp` to omit the `uv run`.

## Running a calibration

The repository includes a complete calibration example for the bundled example model:

```bash
uv sync --all-packages --all-extras
uv run python -m example_model.calibrate
```

This runs the ABC-SMC calibration workflow defined in [packages/example_model/src/example_model/calibrate.py](/home/as81/work/cfa-calibration-tools-wtk-mp/packages/example_model/src/example_model/calibrate.py) and prints the posterior summary and diagnostics.

To compare serial and parallel execution for the same example, run:

```bash
uv run python -m example_model.benchmark
```

## General Disclaimer

This repository was created for use by CDC programs to collaborate on public health related projects in support of the [CDC mission](https://www.cdc.gov/about/organization/mission.htm). GitHub is not hosted by the CDC, but is a third party website used by CDC and its partners to share information and collaborate on software. CDC use of GitHub does not imply an endorsement of any one particular service, product, or enterprise.
Expand Down
3 changes: 2 additions & 1 deletion example_model.mrp.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ version = "0.0.1"

[runtime]
env = "uv"
command = "example_model"
command = "python"
args = ["-m", "example_model"]

[output]
spec = "filesystem"
Expand Down
6 changes: 5 additions & 1 deletion packages/example_model/src/example_model/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import timeit
from pathlib import Path

import numpy as np
from mrp import Environment
Expand Down Expand Up @@ -132,5 +133,8 @@ def outputs_to_distance(model_output, target_data):
for result in benchmark_results:
print(f"workers: {result['max_workers']}, time: {result['time']}")

with open("./benchmarks/parallelization_check.json", "w") as fp:
benchmark_dir = Path("./benchmarks")
benchmark_dir.mkdir(exist_ok=True)

with open(benchmark_dir / "parallelization_check.json", "w") as fp:
json.dump(benchmark_results, fp)
50 changes: 50 additions & 0 deletions src/calibrationtools/async_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Run coroutines from synchronous sampler code.

This module centralizes the event-loop bridging used by sampler execution
paths so synchronous orchestration can safely invoke async helpers in both
normal scripts and already-running event loops.
"""

import asyncio
import threading
from typing import Any, Callable, NoReturn


def run_coroutine_from_sync(coroutine_factory: Callable[[], Any]) -> Any:
"""Run an async workflow from synchronous code.

This helper executes the coroutine directly when no event loop is active.
If the caller already runs inside an event loop, it executes the coroutine
in a dedicated worker thread and re-raises any exception from that thread.

Args:
coroutine_factory (Callable[[], Any]): Factory returning the coroutine
to execute.

Returns:
Any: The value returned by the coroutine.
"""

try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coroutine_factory())

result: dict[str, Any] = {}
error: dict[str, BaseException] = {}

def runner() -> None:
try:
result["value"] = asyncio.run(coroutine_factory())
except BaseException as exc: # pragma: no cover - passthrough
error["value"] = exc

def raise_worker_error(exc: BaseException) -> NoReturn:
raise exc

thread = threading.Thread(target=runner, daemon=True)
thread.start()
thread.join()
if "value" in error:
raise_worker_error(error["value"])
return result["value"]
Loading
Loading