Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/interfaces/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def cli():
default=10,
help="Number of samples for the dispatcher. Default is 10.",
)
parser.add_argument(
"--patience",
type=int,
default=10,
help="Iterations limit without better result. Default is 10.",
)

# Parse the arguments
args = parser.parse_args()
Expand All @@ -55,6 +61,7 @@ def cli():
problem_definition=problem_definition,
simulation_model_archive=args.model_file,
n_size=args.n_size,
patience=args.patience,
)
logger.info("Risk management process completed successfully.")
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ def run_risk_management(
problem_definition: dict[str, Any],
simulation_model_archive: bytes | str,
n_size: int = 10,
patience: int = 10,
max_generations: int = 10,
):
"""
Main entry point for running risk management.

Args:
patience: Patience limit for the optimization process.
max_generations: Maximum number of generations for the optimization process.
problem_definition (dict[str, Any]): The problem definition used by the dispatcher.
simulation_model_archive (bytes | str): The simulation model archive to transfer.
Expand All @@ -59,6 +61,7 @@ def run_risk_management(
solution_updater = SolutionUpdaterService(
optimization_engine=OptimizationEngine.PSO,
max_generations=max_generations,
patience=patience,
)
dispatcher = ProblemDispatcherService(
problem_definition=problem_definition, n_size=n_size
Expand All @@ -72,64 +75,67 @@ def run_risk_management(
next_solutions = None

loop_controller = solution_updater.loop_controller
try:
while loop_controller.running():
logger.info(
"Starting generation %d for risk management.",
loop_controller.current_generation,
)

# Generate or update solutions
solutions = dispatcher.process_iteration(next_solutions)
logger.debug("Generated solutions: %s", solutions)
while loop_controller.running():
logger.info(
"Starting generation %d for risk management.",
loop_controller.current_generation,
)

# Prepare simulation cases
sim_cases = _prepare_simulation_cases(solutions)
logger.debug("Prepared simulation cases: %s", sim_cases)
# Generate or update solutions
solutions = dispatcher.process_iteration(next_solutions)
logger.debug("Generated solutions: %s", solutions)

# Process simulation with the simulation service
logger.info("Submitting simulation cases to SimulationService.")
completed_cases = SimulationService.process_request(
{"simulation_cases": sim_cases}
)
logger.debug("Completed simulation cases: %s", completed_cases)

# Update solutions based on simulation results
updated_solutions = [
{
"control_vector": {"items": simulation_case.control_vector},
"cost_function_results": {
"values": ensure_not_none(
simulation_case.results
).model_dump()
},
}
for simulation_case in completed_cases.simulation_cases
]
logger.debug(
"Updated solutions for next iteration: %s", updated_solutions
)
# Prepare simulation cases
sim_cases = _prepare_simulation_cases(solutions)
logger.debug("Prepared simulation cases: %s", sim_cases)

# Process simulation with the simulation service
logger.info("Submitting simulation cases to SimulationService.")
completed_cases = SimulationService.process_request(
{"simulation_cases": sim_cases}
)
logger.debug("Completed simulation cases: %s", completed_cases)

# Update solutions based on simulation results
updated_solutions = [
{
"control_vector": {"items": simulation_case.control_vector},
"cost_function_results": {
"values": ensure_not_none(
simulation_case.results
).model_dump()
},
}
for simulation_case in completed_cases.simulation_cases
]
logger.debug(
"Updated solutions for next iteration: %s", updated_solutions
)

# Map simulation service solutions to the ProblemDispatcherService format
response = solution_updater.process_request(
{
"solution_candidates": updated_solutions,
"optimization_constraints": {"boundaries": boundaries},
}
)

next_solutions = ControlVectorMapper.convert_su_to_pd(
response.next_iter_solutions
)

# Map simulation service solutions to the ProblemDispatcherService format
next_solutions = ControlVectorMapper.convert_su_to_pd(
solution_updater.process_request(
{
"solution_candidates": updated_solutions,
"optimization_constraints": {"boundaries": boundaries},
}
).next_iter_solutions
)
logger.info(
"Generation %d successfully completed for risk management.",
loop_controller.current_generation,
)
except StopIteration as e:
logger.info(
"Loop controller stopped at generation %d: %s",
"Generation %d successfully completed for risk management.",
loop_controller.current_generation,
str(e),
)

logger.info(
"Loop controller stopped at generation %d. Info: %s",
loop_controller.current_generation,
loop_controller.info,
)

except Exception as e:
logger.error("Error in risk management process: %s", str(e), exc_info=True)
raise
Expand Down
4 changes: 4 additions & 0 deletions src/services/solution_updater_service/core/engines/pso.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def update_solution_to_next_iter(
new_positions = self._calculate_new_position(parameters, new_velocities)
return self._reflect_and_clip_positions(new_positions, lb, ub)

@property
def state(self):
return self._state

def _initialize_state_on_first_call(
self, parameters: npt.NDArray[np.float64], results: npt.NDArray[np.float64]
) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/services/solution_updater_service/core/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class SolutionUpdaterServiceResponse(BaseModel, extra="forbid"):
next_iter_solutions (list[ControlVector]): A list of `ControlVector` instances representing
the next iteration's optimal solutions determined by the optimization process. Each
control vector contains the parameters and their corresponding optimized values.
patience_exceeded (PatienceExceeded): A boolean flag indicating whether the optimization
process has reached its patience limit. When True, it signals that the solution has not
improved for the specified number of consecutive iterations, suggesting convergence
or a local minimum.

Notes:
The `extra="forbid"` option ensures that no additional fields are allowed beyond the explicitly
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import Any, Mapping, Sequence

import numpy as np
Expand Down Expand Up @@ -322,14 +324,33 @@ def _initiate_mapper_on_first_call(


class _SolutionUpdaterServiceLoopController:
def __init__(self, max_generations: int) -> None:
def __init__(
self,
max_generations: int,
patience: int,
solution_updater_service: SolutionUpdaterService,
) -> None:
"""
Helper class to control the loop of the solution updater service.
This class manages the number of iterations for the optimization process, and raises StopIteration exception when convergence fails.
"""
self._info = "Loop controller running"
self._solution_updater_service = solution_updater_service

self._max_generations = max_generations
self.current_generation = 0
self._is_running = True
self._current_generation = 0

self._base_patience, self._patience_left = patience, patience

self._last_run_global_best_result: float | None = None

@property
def current_generation(self) -> int:
return self._current_generation

@property
def info(self) -> str:
return self._info

def running(self) -> bool:
"""
Expand All @@ -338,22 +359,78 @@ def running(self) -> bool:
Returns:
bool: True if the loop controller is running, False otherwise.
"""
if self.current_generation >= self._max_generations:
self._is_running = False
self.current_generation += 1
return self._is_running
self._update_generation()
self._update_patience()

running = True

if self._max_generation_reached():
self._info = f"Max Generation {self._max_generations} reached, stopping optimization loop."
running = False

elif self._patience_reached():
self._info = (
f"Patience {self._base_patience} reached, stopping optimization loop."
)
running = False

return running

def _update_generation(self) -> None:
self._current_generation += 1

def _update_patience(self) -> None:
if self._current_generation < 1:
raise ValueError(
f"Generation cant be lower than 0. Current generation: {self._current_generation}"
)

if self._current_generation == 1:
return

last_best = self._last_run_global_best_result
current_best = self._solution_updater_service.global_best_result

if last_best == current_best:
self._patience_left -= 1
else:
self._patience_left = self._base_patience

self._last_run_global_best_result = current_best

def _max_generation_reached(self) -> bool:
return self._current_generation > self._max_generations

def _patience_reached(self) -> bool:
return self._patience_left < 0


class SolutionUpdaterService:
def __init__(
self, optimization_engine: OptimizationEngine, max_generations: int
self,
optimization_engine: OptimizationEngine,
max_generations: int,
patience: int,
) -> None:
"""
Initializes the SolutionUpdaterService with specified optimization engine and parameters.

Args:
optimization_engine (OptimizationEngine): The optimization algorithm to use.
max_generations (int): Maximum number of optimization iterations to perform.
patience (int, optional): Number of consecutive iterations without improvement
before early stopping is triggered. Defaults to 10.
"""
self._mapper: _Mapper = _Mapper()
self._engine: OptimizationEngineInterface = (
OptimizationEngineFactory.get_engine(optimization_engine)
)
self._logger = get_logger(__name__)
self.loop_controller = _SolutionUpdaterServiceLoopController(max_generations)
self.loop_controller = _SolutionUpdaterServiceLoopController(
max_generations=max_generations,
patience=patience,
solution_updater_service=self,
)

@property
def global_best_result(self) -> float:
Expand Down Expand Up @@ -403,6 +480,8 @@ def process_request(
An object containing the updated solution control vectors for the next
optimization iteration. It includes:
- `next_iter_solutions`: A list of updated `ControlVector` instances.
- `patience_exceeded`: A boolean flag indicating whether the optimization
process has reached its patience limit due to lack of improvement.

Raises:
RuntimeError:
Expand All @@ -422,8 +501,6 @@ def process_request(
if not config.solution_candidates:
raise RuntimeError("Nothing to optimize")

self._check_convergence(config.solution_candidates)

control_vector, cost_function_values = self._mapper.to_numpy(
config.solution_candidates
)
Expand All @@ -440,17 +517,3 @@ def process_request(
self._logger.info("Control vectors update request processed successfully.")

return SolutionUpdaterServiceResponse(next_iter_solutions=next_iter_solutions)

def _check_convergence(
self, solution: list[SolutionCandidate], tol: float = 1e-4
) -> None:
"""
Should raise StopIteration exception when convergence reach desired value.
Args:
tol: function convergence tolerance
solution: list of SolutionCandidate

Returns:

"""
pass
Loading