Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions src/libecalc/domain/process/compressor/core/train/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,12 @@ def remove_rate(self, outlet_stream_stage: FluidStream) -> FluidStream:
stream=outlet_stream_stage,
)

def evaluate(
def get_compressor_inlet_stream(
self,
inlet_stream_stage: FluidStream,
rates_out_of_splitter: list[float] | None = None,
streams_in_to_mixer: list[FluidStream] | None = None,
) -> CompressorTrainStageResultSingleTimeStep:
"""Evaluates a compressor train stage given the conditions and rate of the inlet stream, and the speed
of the shaft driving the compressor if given.

Args:
inlet_stream_stage (FluidStream): The conditions of the inlet fluid stream. If there are several inlet streams,
the first one is the stage inlet stream, the others enter the stage at the Mixer.
rates_out_of_splitter (list[float] | None, optional): Additional rates to the Splitter if defined.
streams_in_to_mixer (list[FluidStream] | None, optional): Additional streams to the Mixer if defined.

Returns:
CompressorTrainStageResultSingleTimeStep: The result of the evaluation for the compressor stage
"""
):
# First the stream passes through the Splitter (if defined)
if self.splitter is not None:
self.splitter.rates_out_of_splitter = rates_out_of_splitter
Expand Down Expand Up @@ -156,7 +144,32 @@ def evaluate(
else:
inlet_stream_after_liquid_remover = inlet_stream_after_temperature_setter

inlet_stream_compressor = inlet_stream_after_liquid_remover
return inlet_stream_after_liquid_remover

def evaluate(
self,
inlet_stream_stage: FluidStream,
rates_out_of_splitter: list[float] | None = None,
streams_in_to_mixer: list[FluidStream] | None = None,
) -> CompressorTrainStageResultSingleTimeStep:
"""Evaluates a compressor train stage given the conditions and rate of the inlet stream, and the speed
of the shaft driving the compressor if given.

Args:
inlet_stream_stage (FluidStream): The conditions of the inlet fluid stream. If there are several inlet streams,
the first one is the stage inlet stream, the others enter the stage at the Mixer.
rates_out_of_splitter (list[float] | None, optional): Additional rates to the Splitter if defined.
streams_in_to_mixer (list[FluidStream] | None, optional): Additional streams to the Mixer if defined.

Returns:
CompressorTrainStageResultSingleTimeStep: The result of the evaluation for the compressor stage
"""

inlet_stream_compressor = self.get_compressor_inlet_stream(
inlet_stream_stage=inlet_stream_stage,
rates_out_of_splitter=rates_out_of_splitter,
streams_in_to_mixer=streams_in_to_mixer,
)

# Then additional rate is added by the RateModifier (if defined),
inlet_stream_compressor_including_asv = self.add_rate(
Expand All @@ -166,7 +179,7 @@ def evaluate(
# Compressor
self.compressor.validate_speed()
self.compressor.set_rate_before_asv(
rate_before_asv_m3_per_h=inlet_stream_after_liquid_remover.volumetric_rate_m3_per_hour
rate_before_asv_m3_per_h=inlet_stream_compressor.volumetric_rate_m3_per_hour
)

outlet_stream_compressor_including_asv = self.compress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
if self._pressure_change > 0.0:
pressure_bara = inlet_stream.pressure_bara - self._pressure_change
if pressure_bara < 0.0:
raise OutsideCapacityError("Unable to produce an outlet stream, trying to choke to negative pressure.")
raise OutsideCapacityError("Trying to choke to negative pressure.")
else:
# Delta pressure = 0, i.e. don't do anything
return inlet_stream
Expand Down
130 changes: 130 additions & 0 deletions src/libecalc/domain/process/process_solver/common_asv_solver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import abc
from collections.abc import Callable

from libecalc.domain.process.compressor.core.train.utils.common import EPSILON
from libecalc.domain.process.entities.process_units.recirculation_loop import RecirculationLoop
from libecalc.domain.process.entities.shaft import Shaft
from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
from libecalc.domain.process.process_solver.search_strategies import BinarySearchStrategy, ScipyRootFindingStrategy
from libecalc.domain.process.process_solver.solver import Solution
from libecalc.domain.process.process_solver.solvers.recirculation_solver import (
RecirculationConfiguration,
RecirculationSolver,
)
from libecalc.domain.process.process_solver.solvers.speed_solver import SpeedConfiguration, SpeedSolver
from libecalc.domain.process.process_system.process_error import RateTooLowError
from libecalc.domain.process.process_system.process_system import ProcessSystem
from libecalc.domain.process.process_system.process_unit import ProcessUnit
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream


class CompressorStageProcessUnit(ProcessUnit):
@abc.abstractmethod
def get_speed_boundary(self) -> Boundary: ...

@abc.abstractmethod
def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
"""
Maximum standard rate ignoring speed
"""
...


class CommonASVSolver:
def __init__(
self,
shaft: Shaft,
compressors: list[CompressorStageProcessUnit],
fluid_service: FluidService,
) -> None:
self._shaft = shaft
self._compressors = compressors
self._fluid_service = fluid_service
self._root_finding_strategy = ScipyRootFindingStrategy()
self._recirculation_loop = RecirculationLoop(
inner_process=ProcessSystem(
process_units=self._compressors,
),
fluid_service=self._fluid_service,
)

def get_recirculation_loop(self) -> RecirculationLoop:
return self._recirculation_loop

def get_initial_speed_boundary(self) -> Boundary:
speed_boundaries = [compressor.get_speed_boundary() for compressor in self._compressors]
max_speed = max(speed_boundary.max for speed_boundary in speed_boundaries)
min_speed = min(speed_boundary.min for speed_boundary in speed_boundaries)
return Boundary(
min=min_speed,
max=max_speed,
)

def get_maximum_recirculation_rate(self, inlet_stream: FluidStream) -> float:
first_compressor = self._compressors[0]
max_rate = first_compressor.get_maximum_standard_rate(inlet_stream=inlet_stream)
return max(0.0, max_rate - inlet_stream.standard_rate_sm3_per_day)

def get_initial_recirculation_rate_boundary(
self, inlet_stream: FluidStream, minimum_recirculation_rate: float = EPSILON
) -> Boundary:
return Boundary(
min=minimum_recirculation_rate,
max=self.get_maximum_recirculation_rate(inlet_stream=inlet_stream) * (1 - EPSILON),
)

def get_recirculation_solver(
self,
boundary: Boundary,
target_pressure: FloatConstraint | None = None,
) -> RecirculationSolver:
return RecirculationSolver(
root_finding_strategy=self._root_finding_strategy,
search_strategy=BinarySearchStrategy(tolerance=10e-3),
recirculation_rate_boundary=boundary,
target_pressure=target_pressure,
)

def get_recirculation_func(self, inlet_stream: FluidStream) -> Callable[[RecirculationConfiguration], FluidStream]:
def recirculation_func(configuration: RecirculationConfiguration) -> FluidStream:
self._recirculation_loop.set_recirculation_rate(configuration.recirculation_rate)
return self._recirculation_loop.propagate_stream(inlet_stream=inlet_stream)

return recirculation_func

def find_common_asv_solution(
self, pressure_constraint: FloatConstraint, inlet_stream: FluidStream
) -> tuple[Solution[SpeedConfiguration], Solution[RecirculationConfiguration]]:
speed_solver = SpeedSolver(
search_strategy=BinarySearchStrategy(),
root_finding_strategy=self._root_finding_strategy,
boundary=self.get_initial_speed_boundary(),
target_pressure=pressure_constraint.value,
)
recirculation_solver_to_capacity = self.get_recirculation_solver(
boundary=self.get_initial_recirculation_rate_boundary(inlet_stream=inlet_stream)
)
recirculation_func = self.get_recirculation_func(inlet_stream=inlet_stream)

def speed_func(configuration: SpeedConfiguration) -> FluidStream:
Comment thread
frodehk marked this conversation as resolved.
Comment thread
frodehk marked this conversation as resolved.
try:
self._shaft.set_speed(configuration.speed)
self._recirculation_loop.set_recirculation_rate(0)
return self._recirculation_loop.propagate_stream(inlet_stream=inlet_stream)
except RateTooLowError:
solution = recirculation_solver_to_capacity.solve(recirculation_func)
self._recirculation_loop.set_recirculation_rate(solution.configuration.recirculation_rate)
return self._recirculation_loop.propagate_stream(inlet_stream=inlet_stream)

speed_solution = speed_solver.solve(speed_func)
self._shaft.set_speed(speed_solution.configuration.speed)
recirculation_solver_with_target_pressure = self.get_recirculation_solver(
boundary=self.get_initial_recirculation_rate_boundary(
inlet_stream=inlet_stream,
),
target_pressure=pressure_constraint,
)
recirculation_solution = recirculation_solver_with_target_pressure.solve(recirculation_func)
# Return solution with all configurations
return speed_solution, recirculation_solution
58 changes: 58 additions & 0 deletions src/libecalc/domain/process/process_solver/float_constraint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from math import isclose

from libecalc.domain.process.compressor.core.train.utils.common import PRESSURE_CALCULATION_TOLERANCE


class FloatConstraint:
def __init__(self, value, abs_tol: float = PRESSURE_CALCULATION_TOLERANCE):
self.value = float(value)
self.abs_tol = abs_tol

def _is_close(self, other):
try:
other_val = float(other)
except (TypeError, ValueError):
return NotImplemented

return isclose(self.value, other_val, rel_tol=0, abs_tol=self.abs_tol)

def __eq__(self, other):
return self._is_close(other)

def __ne__(self, other):
return not self._is_close(other)

def __lt__(self, other):
try:
other_val = float(other)
except (TypeError, ValueError):
return NotImplemented
# a < b if a is not close to b and a < b
return not self._is_close(other) and self.value < other_val

def __le__(self, other):
try:
float(other)
except (TypeError, ValueError):
return NotImplemented
# a <= b if a < b or a approx equal to b
return self.__lt__(other) or self._is_close(other)

def __gt__(self, other):
try:
other_val = float(other)
except (TypeError, ValueError):
return NotImplemented
# a > b if a is not close to b and a > b
return not self._is_close(other) and self.value > other_val

def __ge__(self, other):
try:
float(other)
except (TypeError, ValueError):
return NotImplemented
# a >= b if a > b or a approx equal to b
return self.__gt__(other) or self._is_close(other)

def __repr__(self):
return f"FloatConstraint({self.value}, rel_tol={self.rel_tol}, abs_tol={self.abs_tol})"
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Literal

from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy, SearchStrategy
from libecalc.domain.process.process_solver.solver import Solution, Solver
from libecalc.domain.process.process_system.process_error import RateTooHighError, RateTooLowError
Expand All @@ -13,14 +14,17 @@
class RecirculationConfiguration:
recirculation_rate: float

def __post_init__(self):
self.recirculation_rate = float(self.recirculation_rate)


class RecirculationSolver(Solver):
def __init__(
self,
search_strategy: SearchStrategy,
root_finding_strategy: RootFindingStrategy,
recirculation_rate_boundary: Boundary,
target_pressure: float | None = None,
target_pressure: FloatConstraint | None = None,
):
self._recirculation_rate_boundary = recirculation_rate_boundary
self._target_pressure = target_pressure
Expand Down Expand Up @@ -74,14 +78,20 @@ def bool_func(x: float, mode: Literal["minimize", "maximize"]) -> tuple[bool, bo
minimum_outlet_stream = func(RecirculationConfiguration(recirculation_rate=minimum_rate))
if minimum_outlet_stream.pressure_bara <= target_pressure:
# Highest possible pressure is too low
return Solution(success=False, configuration=RecirculationConfiguration(recirculation_rate=minimum_rate))
return Solution(
success=minimum_outlet_stream.pressure_bara == target_pressure,
configuration=RecirculationConfiguration(recirculation_rate=minimum_rate),
)
maximum_outlet_stream = func(RecirculationConfiguration(recirculation_rate=maximum_rate))
if maximum_outlet_stream.pressure_bara >= target_pressure:
# Lowest possible pressure is too high
return Solution(success=False, configuration=RecirculationConfiguration(recirculation_rate=maximum_rate))
return Solution(
success=maximum_outlet_stream.pressure_bara == self._target_pressure,
configuration=RecirculationConfiguration(recirculation_rate=maximum_rate),
)

recirculation_rate = self._root_finding_strategy.find_root(
boundary=Boundary(min=minimum_rate, max=maximum_rate),
func=lambda x: func(RecirculationConfiguration(recirculation_rate=x)).pressure_bara - target_pressure,
func=lambda x: func(RecirculationConfiguration(recirculation_rate=x)).pressure_bara - target_pressure.value,
)
return Solution(success=True, configuration=RecirculationConfiguration(recirculation_rate=recirculation_rate))
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def get_outlet_stream(speed: float) -> FluidStream:
configuration=max_speed_configuration,
)

if maximum_speed_outlet_stream.pressure_bara < self._target_pressure:
return Solution(success=False, configuration=SpeedConfiguration(self._boundary.max))

try:
minimum_speed_configuration = SpeedConfiguration(speed=self._boundary.min)
minimum_speed_outlet_stream = func(minimum_speed_configuration)
Expand All @@ -65,26 +68,26 @@ def bool_speed_func(x: float):
)
minimum_speed_configuration = SpeedConfiguration(speed=minimum_speed_within_capacity)
minimum_speed_outlet_stream = func(minimum_speed_configuration)
if (

if minimum_speed_outlet_stream.pressure_bara > self._target_pressure:
# Solution 2, target pressure is too low
return Solution(success=False, configuration=minimum_speed_configuration)

assert (
minimum_speed_outlet_stream.pressure_bara
<= self._target_pressure
<= maximum_speed_outlet_stream.pressure_bara
):
# Solution 1, iterate on speed until target discharge pressure is found
def root_speed_func(x: float) -> float:
# We should be able to produce an outlet stream since we adjust minimum speed above,
# or exit if max speed is not enough
out = get_outlet_stream(speed=x)
return out.pressure_bara - self._target_pressure

speed = self._root_finding_strategy.find_root(
boundary=Boundary(min=minimum_speed_configuration.speed, max=self._boundary.max),
func=root_speed_func,
)
return Solution(success=True, configuration=SpeedConfiguration(speed=speed))
elif self._target_pressure < minimum_speed_outlet_stream.pressure_bara:
# Solution 2, target pressure is too low
return Solution(success=False, configuration=SpeedConfiguration(minimum_speed_configuration.speed))
)

# Solution 1, iterate on speed until target discharge pressure is found
def root_speed_func(x: float) -> float:
# We should be able to produce an outlet stream since we adjust minimum speed above,
# or exit if max speed is not enough
out = get_outlet_stream(speed=x)
return out.pressure_bara - self._target_pressure

# Solution 3, target discharge pressure is too high
return Solution(success=False, configuration=SpeedConfiguration(self._boundary.max))
speed = self._root_finding_strategy.find_root(
boundary=Boundary(min=minimum_speed_configuration.speed, max=self._boundary.max),
func=root_speed_func,
)
return Solution(success=True, configuration=SpeedConfiguration(speed=speed))
Loading