-
Notifications
You must be signed in to change notification settings - Fork 10
chore: init recirculation pressure control #1364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
44 changes: 44 additions & 0 deletions
44
src/libecalc/domain/process/entities/process_units/recirculation_loop.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| from libecalc.domain.process.process_system.process_system import ProcessSystem | ||
| from libecalc.domain.process.process_system.process_unit import ProcessUnit | ||
| from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream | ||
|
|
||
| InnerProcess = ProcessSystem | ProcessUnit | ||
|
|
||
|
|
||
| class RecirculationLoop(ProcessUnit): | ||
| def __init__( | ||
| self, | ||
| inner_process: InnerProcess, | ||
| fluid_service: FluidService, | ||
| recirculation_rate: float = 0, | ||
| ): | ||
| self._inner_process = inner_process | ||
| self._fluid_service = fluid_service | ||
| self._recirculation_rate = recirculation_rate | ||
|
|
||
| def get_inner_process(self) -> InnerProcess: | ||
| return self._inner_process | ||
|
|
||
| def set_recirculation_rate(self, rate: float): | ||
| self._recirculation_rate = rate | ||
|
|
||
| def get_recirculation_rate(self) -> float: | ||
| assert self._recirculation_rate is not None | ||
| return self._recirculation_rate | ||
|
|
||
| def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: | ||
| inner_inlet_stream = self._fluid_service.create_stream_from_standard_rate( | ||
| fluid_model=inlet_stream.fluid_model, | ||
| pressure_bara=inlet_stream.pressure_bara, | ||
| temperature_kelvin=inlet_stream.temperature_kelvin, | ||
| standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day + self._recirculation_rate, | ||
| ) | ||
|
|
||
| inner_outlet_stream = self._inner_process.propagate_stream(inlet_stream=inner_inlet_stream) | ||
|
|
||
| return self._fluid_service.create_stream_from_standard_rate( | ||
| fluid_model=inner_outlet_stream.fluid_model, | ||
| pressure_bara=inner_outlet_stream.pressure_bara, | ||
| temperature_kelvin=inner_outlet_stream.temperature_kelvin, | ||
| standard_rate_m3_per_day=inner_outlet_stream.standard_rate_sm3_per_day - self._recirculation_rate, | ||
| ) |
132 changes: 132 additions & 0 deletions
132
src/libecalc/domain/process/process_solver/search_strategies.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| import abc | ||
| from collections.abc import Callable | ||
|
|
||
| from scipy.optimize import root_scalar | ||
|
|
||
| from libecalc.common.errors.exceptions import EcalcError | ||
| from libecalc.domain.process.process_solver.boundary import Boundary | ||
|
|
||
| CONVERGENCE_TOLERANCE = 1e-5 | ||
|
|
||
|
|
||
| class DidNotConvergeError(EcalcError): | ||
| def __init__( | ||
| self, | ||
| boundary: Boundary, | ||
| tolerance: float, | ||
| iterations: int, | ||
| ): | ||
| super().__init__( | ||
| title="No solution found", | ||
| message=f"Did not reach convergence after maximum number of iterations: {iterations}." | ||
| f" lower bound: {boundary.min}, upper bound: {boundary.max}, convergence_tolerance: {tolerance}.", | ||
| ) | ||
|
|
||
|
|
||
| class SearchStrategy(abc.ABC): | ||
| @abc.abstractmethod | ||
| def search(self, boundary: Boundary, func: Callable[[float], tuple[bool, bool]]) -> float: ... | ||
|
|
||
|
|
||
| class BinarySearchStrategy(SearchStrategy): | ||
| def __init__(self, tolerance: float = CONVERGENCE_TOLERANCE, max_iterations: int = 20): | ||
| """ | ||
|
|
||
| Args: | ||
| tolerance: The tolerance of convergence that will be used to exist the iteration | ||
| max_iterations: The maximum number of iterations that will be used to find the root. | ||
| """ | ||
| self._tolerance = tolerance | ||
| self._max_iterations = max_iterations | ||
|
|
||
| def search(self, boundary: Boundary, func: Callable[[float], tuple[bool, bool]]) -> float: | ||
| """Binary search until we reach the maximum x value constrained by x_min and x_max | ||
| where we have a boolean constraint condition given as a function. | ||
|
|
||
| max(x) given f(x) == True | ||
|
|
||
| We assume f(x) to be a binary (Heaviside step) function where f(x) is 1 for x <= n and 0 for x > n. | ||
| n is the target value in this optimization. x == n is the highest possible value of x before f(x) turns to 0. | ||
|
|
||
| Note: This requires that the boolean condition is an indicator function where x > threshold returns False. | ||
| """ | ||
| x0, x1 = boundary.min, boundary.max | ||
| x2 = (x0 + x1) / 2 # Initial value x2. | ||
| i = 0 | ||
| rel_diff = 100.0 | ||
|
|
||
| while (abs(rel_diff) > self._tolerance) and (i < self._max_iterations): | ||
| x2 = (x0 + x1) / 2 # Bisecting x0 and x1. | ||
| higher, accepted = func(x2) | ||
| if higher: | ||
| x0, x1 = x2, x1 # x2 is valid. We can now search to the right in the binary three. | ||
| else: | ||
| x0, x1 = x0, x2 # x2 is invalid. We can now search to the left in the binary three | ||
|
|
||
| if accepted: | ||
| # Avoid division by zero: https://en.wikipedia.org/wiki/Relative_change_and_difference | ||
| rel_diff = 0 if x0 == x1 else abs(x1 - x0) / max(abs(x0), abs(x1)) | ||
| i += 1 | ||
|
|
||
| if i >= self._max_iterations: | ||
| raise DidNotConvergeError( | ||
| boundary=boundary, | ||
| tolerance=self._tolerance, | ||
| iterations=self._max_iterations, | ||
| ) | ||
| return x2 | ||
|
|
||
|
|
||
| class RootFindingStrategy(abc.ABC): | ||
| @abc.abstractmethod | ||
| def find_root( | ||
| self, | ||
| boundary: Boundary, | ||
| func: Callable[[float], float], | ||
| ): ... | ||
|
|
||
|
|
||
| class ScipyRootFindingStrategy(RootFindingStrategy): | ||
| def __init__(self, tolerance: float = CONVERGENCE_TOLERANCE, max_iterations: int = 50): | ||
| """ | ||
|
|
||
| Args: | ||
| tolerance: The tolerance of convergence that will be used to exist the iteration | ||
| max_iterations: The maximum number of iterations that will be used to find the root. | ||
| """ | ||
| # TODO: Investigate why we don't usE brentq method recommended by scipy | ||
| self._tolerance = tolerance | ||
| self._max_iterations = max_iterations | ||
|
|
||
| def find_root( | ||
| self, | ||
| boundary: Boundary, | ||
| func: Callable[[float], float], | ||
| ): | ||
| """Root finding using scipy´s implementation of the brenth method. | ||
|
|
||
| This will try to solve for the root: f(x) = 0. Another way to say this is "what x makes the function return 0"... | ||
|
|
||
| The result is bound on the interval [x0, x1]. | ||
|
|
||
| brenth is a version of Brent´s method (https://en.wikipedia.org/wiki/Brent%27s_method) with hyperbolic extrapolation | ||
|
|
||
| :param boundary: Lower and upper of solution. Used as initial guess | ||
| :param func: The function to be used in the secant root-finding method that we will solve f(x) = 0 | ||
| """ | ||
| result = root_scalar( | ||
| func, | ||
| bracket=(boundary.min, boundary.max), | ||
| x0=boundary.min, | ||
| x1=boundary.max, | ||
| maxiter=self._max_iterations, | ||
| method="brenth", | ||
| rtol=self._tolerance, | ||
| ) | ||
| if not result.converged: | ||
| raise DidNotConvergeError( | ||
| boundary=boundary, | ||
| tolerance=self._tolerance, | ||
| iterations=self._max_iterations, | ||
| ) | ||
| return result.root |
89 changes: 89 additions & 0 deletions
89
src/libecalc/domain/process/process_solver/solvers/recirculation_solver.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| from typing import Literal | ||
|
|
||
| from libecalc.domain.process.entities.process_units.recirculation_loop import RecirculationLoop | ||
| from libecalc.domain.process.process_solver.boundary import Boundary | ||
| from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy, SearchStrategy | ||
| from libecalc.domain.process.process_solver.solver import Solver | ||
| from libecalc.domain.process.process_system.process_error import RateTooHighError, RateTooLowError | ||
| from libecalc.domain.process.process_system.process_system import ProcessSystem | ||
| from libecalc.domain.process.value_objects.fluid_stream import FluidStream | ||
|
|
||
|
|
||
| class RecirculationSolver(Solver): | ||
| def __init__( | ||
| self, | ||
| search_strategy: SearchStrategy, | ||
| root_finding_strategy: RootFindingStrategy, | ||
| recirculation_loop: RecirculationLoop, | ||
| recirculation_rate_boundary: Boundary, | ||
| target_pressure: float | None = None, | ||
| ): | ||
| self._recirculation_loop = recirculation_loop | ||
| self._recirculation_rate_boundary = recirculation_rate_boundary | ||
| self._target_pressure = target_pressure | ||
| self._search_strategy = search_strategy | ||
| self._root_finding_strategy = root_finding_strategy | ||
|
|
||
| def solve(self, process_system: ProcessSystem, inlet_stream: FluidStream) -> FluidStream | None: | ||
| def get_outlet_stream(recirculation_rate: float) -> FluidStream: | ||
| self._recirculation_loop.set_recirculation_rate(recirculation_rate) | ||
| return process_system.propagate_stream(inlet_stream=inlet_stream) | ||
|
|
||
| def bool_func(x: float, mode: Literal["minimize", "maximize"]) -> tuple[bool, bool]: | ||
| """ | ||
| Return a tuple where first bool is True for higher value, | ||
| the second bool says if the solution is accepted or not. | ||
|
|
||
| Need to separate these to avoid accepting a solution which is outside capacity. I.e. when minimizing we | ||
| want to return True for a higher value, but we don't want to accept the solution. | ||
| """ | ||
| try: | ||
| get_outlet_stream(x) | ||
| return False if mode == "minimize" else True, True | ||
| except RateTooLowError: | ||
| return True, False | ||
| except RateTooHighError: | ||
| return False, False | ||
|
|
||
| try: | ||
| minimum_rate = self._recirculation_rate_boundary.min | ||
| get_outlet_stream(recirculation_rate=minimum_rate) | ||
| # No error for minimum rate, no need to find min boundary | ||
| except RateTooLowError: | ||
| # Min boundary is too low, find solution | ||
| minimum_rate = self._search_strategy.search( | ||
| boundary=self._recirculation_rate_boundary, | ||
| func=lambda x: bool_func(x, mode="minimize"), | ||
| ) | ||
|
|
||
| target_pressure = self._target_pressure | ||
| if target_pressure is None: | ||
| # Recirc used to get within capacity, but not to meet constraints | ||
| return get_outlet_stream(minimum_rate) | ||
|
|
||
| try: | ||
| maximum_rate = self._recirculation_rate_boundary.max | ||
| get_outlet_stream(recirculation_rate=maximum_rate) | ||
| # No error for max rate, no need to find max boundary | ||
| except RateTooHighError: | ||
| # Max boundary is too high, find solution | ||
| maximum_rate = self._search_strategy.search( | ||
| boundary=self._recirculation_rate_boundary, | ||
| func=lambda x: bool_func(x, mode="maximize"), | ||
| ) | ||
|
|
||
| minimum_outlet_stream = get_outlet_stream(recirculation_rate=minimum_rate) | ||
| if minimum_outlet_stream.pressure_bara <= target_pressure: | ||
| # Highest possible pressure is too low | ||
| return minimum_outlet_stream | ||
|
|
||
| maximum_outlet_stream = get_outlet_stream(recirculation_rate=maximum_rate) | ||
| if maximum_outlet_stream.pressure_bara >= target_pressure: | ||
| # Lowest possible pressure is too high | ||
| return maximum_outlet_stream | ||
|
|
||
| recirculation_rate = self._root_finding_strategy.find_root( | ||
| boundary=Boundary(min=minimum_rate, max=maximum_rate), | ||
| func=lambda x: get_outlet_stream(recirculation_rate=x).pressure_bara - target_pressure, | ||
| ) | ||
| return get_outlet_stream(recirculation_rate=recirculation_rate) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,24 +1,30 @@ | ||
| import logging | ||
|
|
||
| from libecalc.domain.process.compressor.core.train.utils.numeric_methods import ( | ||
| find_root, | ||
| maximize_x_given_boolean_condition_function, | ||
| ) | ||
| from libecalc.domain.process.entities.shaft import Shaft | ||
| from libecalc.domain.process.process_solver.boundary import Boundary | ||
| from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy, SearchStrategy | ||
| from libecalc.domain.process.process_solver.solver import Solver | ||
| from libecalc.domain.process.process_system.process_error import ProcessError | ||
| from libecalc.domain.process.process_system.process_error import ProcessError, RateTooHighError, RateTooLowError | ||
| from libecalc.domain.process.process_system.process_system import ProcessSystem | ||
| from libecalc.domain.process.value_objects.fluid_stream import FluidStream | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class SpeedSolver(Solver): | ||
| def __init__(self, boundary: Boundary, target_pressure: float, shaft: Shaft): | ||
| def __init__( | ||
| self, | ||
| search_strategy: SearchStrategy, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice separation. Could they be "combined"? Does user need to know nitty gritty details? |
||
| root_finding_strategy: RootFindingStrategy, | ||
| boundary: Boundary, | ||
| target_pressure: float, | ||
| shaft: Shaft, | ||
| ): | ||
| self._boundary = boundary | ||
| self._target_pressure = target_pressure | ||
| self._shaft = shaft | ||
| self._search_strategy = search_strategy | ||
| self._root_finding_strategy = root_finding_strategy | ||
|
|
||
| def solve( | ||
| self, | ||
|
|
@@ -31,32 +37,34 @@ def get_outlet_stream(speed: float) -> FluidStream: | |
| shaft.set_speed(speed) | ||
| return process_system.propagate_stream(inlet_stream) | ||
|
|
||
| maximum_speed = self._boundary.max | ||
| try: | ||
| maximum_speed_outlet_stream = get_outlet_stream(speed=maximum_speed) | ||
| maximum_speed_outlet_stream = get_outlet_stream(speed=self._boundary.max) | ||
| except ProcessError as e: | ||
| logger.debug(f"No solution found for maximum speed: {maximum_speed}", exc_info=e) | ||
| logger.debug(f"No solution found for maximum speed: {self._boundary.max}", exc_info=e) | ||
| return None | ||
|
|
||
| minimum_speed = self._boundary.min | ||
| try: | ||
| minimum_speed = self._boundary.min | ||
| minimum_speed_outlet_stream = get_outlet_stream(speed=minimum_speed) | ||
| except ProcessError as e: | ||
| logger.debug(f"No solution found for minimum speed: {minimum_speed}", exc_info=e) | ||
| logger.debug(f"No solution found for minimum speed: {self._boundary.min}", exc_info=e) | ||
|
|
||
| # rate is above maximum rate for minimum speed. Find the lowest minimum speed which gives a valid result | ||
| def bool_speed_func(x): | ||
| try: | ||
| get_outlet_stream(speed=x) | ||
| return True | ||
| return False, True | ||
| except RateTooHighError: | ||
| return True, False | ||
| except RateTooLowError: | ||
| return False, False | ||
| except ProcessError as e: | ||
| logger.debug(f"No solution found for speed: {x}", exc_info=e) | ||
| return False | ||
|
|
||
| minimum_speed = -maximize_x_given_boolean_condition_function( | ||
| x_min=-maximum_speed, | ||
| x_max=-minimum_speed, | ||
| bool_func=bool_speed_func, | ||
| minimum_speed = self._search_strategy.search( | ||
| boundary=self._boundary, | ||
| func=bool_speed_func, | ||
| ) | ||
| minimum_speed_outlet_stream = get_outlet_stream(speed=minimum_speed) | ||
|
|
||
|
|
@@ -72,9 +80,8 @@ def root_speed_func(x: float) -> float: | |
| out = get_outlet_stream(speed=x) | ||
| return out.pressure_bara - self._target_pressure | ||
|
|
||
| speed = find_root( | ||
| lower_bound=minimum_speed, | ||
| upper_bound=maximum_speed, | ||
| speed = self._root_finding_strategy.find_root( | ||
| boundary=Boundary(min=minimum_speed, max=self._boundary.max), | ||
| func=root_speed_func, | ||
| ) | ||
| return get_outlet_stream(speed=speed) | ||
|
|
@@ -84,5 +91,5 @@ def root_speed_func(x: float) -> float: | |
| return minimum_speed_outlet_stream | ||
|
|
||
| # Solution 3, target discharge pressure is too high | ||
| shaft.set_speed(maximum_speed) | ||
| shaft.set_speed(self._boundary.max) | ||
| return maximum_speed_outlet_stream | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, possibly let the solver return a "solution" instead (or NO_SOLUTION if none, but I guess in that case we want to throw an exception? Might be that we ahve several strategies that we try, and we dont know that the search for a solution is exhaustive, until we reach this point?)
RecirculationSolverSolution(SUCCESS, RecirculationLoopConfiguration(recirculation_rate=nn))?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case, it is important that we work on immutable objects, or immutable methods, where we just "test a solution"; which is different from the actual process, that does manipulate the actual stream?