diff --git a/src/libecalc/domain/process/entities/process_units/choke.py b/src/libecalc/domain/process/entities/process_units/choke.py index de73f9e143..1c8ee5ead1 100644 --- a/src/libecalc/domain/process/entities/process_units/choke.py +++ b/src/libecalc/domain/process/entities/process_units/choke.py @@ -1,18 +1,18 @@ from typing import Final from libecalc.domain.process.process_pipeline.process_error import OutsideCapacityError -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class Choke(ProcessUnit): +class Choke(GasProcessUnit): def __init__( self, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, pressure_change: float = 0.0, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._pressure_change = pressure_change self._fluid_service = fluid_service diff --git a/src/libecalc/domain/process/entities/process_units/compressor.py b/src/libecalc/domain/process/entities/process_units/compressor.py index c4598b7b41..a2deede0b5 100644 --- a/src/libecalc/domain/process/entities/process_units/compressor.py +++ b/src/libecalc/domain/process/entities/process_units/compressor.py @@ -5,21 +5,21 @@ calculate_outlet_pressure_and_stream, ) from libecalc.domain.process.process_pipeline.process_error import RateTooHighError, RateTooLowError -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.process_solver.boundary import Boundary from libecalc.domain.process.value_objects.chart.chart import ChartData from libecalc.domain.process.value_objects.chart.compressor import CompressorChart from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class Compressor(ProcessUnit): +class Compressor(GasProcessUnit): def __init__( self, compressor_chart: ChartData, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._compressor_chart = CompressorChart(compressor_chart) self._fluid_service = fluid_service self._speed: float | None = None @@ -86,6 +86,14 @@ def set_speed(self, speed: float) -> None: """Set the rotational speed used for chart lookup.""" self._speed = speed + @property + def minimum_speed(self) -> float: + return self._compressor_chart.minimum_speed + + @property + def maximum_speed(self) -> float: + return self._compressor_chart.maximum_speed + def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float: """Minimum standard volumetric rate [sm³/day] at current speed. diff --git a/src/libecalc/domain/process/entities/process_units/direct_mixer.py b/src/libecalc/domain/process/entities/process_units/direct_mixer.py index e2229401e8..5e85303eb0 100644 --- a/src/libecalc/domain/process/entities/process_units/direct_mixer.py +++ b/src/libecalc/domain/process/entities/process_units/direct_mixer.py @@ -1,22 +1,21 @@ from typing import Final from libecalc.common.units import UnitConstants -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.common.utils.ecalc_uuid import ecalc_id_generator +from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId +from libecalc.domain.process.value_objects.stream_protocol import MixableStream -class DirectMixer(ProcessUnit): +class DirectMixer: def __init__(self, mix_rate: float = 0, process_unit_id: ProcessUnitId | None = None): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnitId(ecalc_id_generator()) self._mix_rate = mix_rate def get_id(self) -> ProcessUnitId: return self._id - def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: - added_mass_kg_per_h = ( - self._mix_rate * inlet_stream.standard_density_gas_phase_after_flash / UnitConstants.HOURS_PER_DAY - ) + def propagate_stream(self, inlet_stream: MixableStream) -> MixableStream: + added_mass_kg_per_h = self._mix_rate * inlet_stream.standard_density / UnitConstants.HOURS_PER_DAY return inlet_stream.with_mass_rate(inlet_stream.mass_rate_kg_per_h + added_mass_kg_per_h) def get_mix_rate(self) -> float: diff --git a/src/libecalc/domain/process/entities/process_units/direct_splitter.py b/src/libecalc/domain/process/entities/process_units/direct_splitter.py index e71f04a3af..da81c952a2 100644 --- a/src/libecalc/domain/process/entities/process_units/direct_splitter.py +++ b/src/libecalc/domain/process/entities/process_units/direct_splitter.py @@ -1,22 +1,21 @@ from typing import Final from libecalc.common.units import UnitConstants -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.common.utils.ecalc_uuid import ecalc_id_generator +from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId +from libecalc.domain.process.value_objects.stream_protocol import MixableStream -class DirectSplitter(ProcessUnit): +class DirectSplitter: def __init__(self, process_unit_id: ProcessUnitId | None = None, split_rate: float = 0): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnitId(ecalc_id_generator()) self._split_rate = split_rate def get_id(self) -> ProcessUnitId: return self._id - def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: - removed_mass_kg_per_h = ( - self._split_rate * inlet_stream.standard_density_gas_phase_after_flash / UnitConstants.HOURS_PER_DAY - ) + def propagate_stream(self, inlet_stream: MixableStream) -> MixableStream: + removed_mass_kg_per_h = self._split_rate * inlet_stream.standard_density / UnitConstants.HOURS_PER_DAY return inlet_stream.with_mass_rate(inlet_stream.mass_rate_kg_per_h - removed_mass_kg_per_h) def set_split_rate(self, split_rate: float): diff --git a/src/libecalc/domain/process/entities/process_units/liquid_remover.py b/src/libecalc/domain/process/entities/process_units/liquid_remover.py index 2aee5b350e..5ec7fe9e3e 100644 --- a/src/libecalc/domain/process/entities/process_units/liquid_remover.py +++ b/src/libecalc/domain/process/entities/process_units/liquid_remover.py @@ -1,13 +1,13 @@ from typing import Final -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream from libecalc.domain.process.value_objects.fluid_stream.constants import ThermodynamicConstants -class LiquidRemover(ProcessUnit): +class LiquidRemover(GasProcessUnit): def __init__(self, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._fluid_service = fluid_service def get_id(self) -> ProcessUnitId: diff --git a/src/libecalc/domain/process/entities/process_units/mixer.py b/src/libecalc/domain/process/entities/process_units/mixer.py index 5ec32dee13..7e76d38602 100644 --- a/src/libecalc/domain/process/entities/process_units/mixer.py +++ b/src/libecalc/domain/process/entities/process_units/mixer.py @@ -3,11 +3,11 @@ from libecalc.domain.process.entities.process_units.simplified_stream_mixer.simplified_stream_mixer import ( SimplifiedStreamMixer, ) -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class Mixer(ProcessUnit): +class Mixer(GasProcessUnit): """Mixes one external stream into the through-stream. The external stream is set via `set_stream` before propagation. This models @@ -15,7 +15,7 @@ class Mixer(ProcessUnit): """ def __init__(self, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._mixer = SimplifiedStreamMixer(fluid_service) self._external_stream: FluidStream | None = None diff --git a/src/libecalc/domain/process/entities/process_units/pump.py b/src/libecalc/domain/process/entities/process_units/pump.py new file mode 100644 index 0000000000..c6e1f07395 --- /dev/null +++ b/src/libecalc/domain/process/entities/process_units/pump.py @@ -0,0 +1,173 @@ +from libecalc.domain.process.process_pipeline.liquid_process_unit import LiquidProcessUnit +from libecalc.domain.process.process_pipeline.process_error import RateTooHighError, RateTooLowError +from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.value_objects.chart.chart import Chart, ChartData +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream + +_BARA_TO_PASCAL = 1e5 + + +class Pump(LiquidProcessUnit): + """Pump stage operating on a liquid stream. + + Physics: + head [J/kg] = pressure_rise [Pa] / density [kg/m³] + power [MW] = density × head × Q [m³/s] / efficiency / 1e6 + + For single-speed pumps the chart has one curve — no set_speed() needed. + For variable-speed pumps, set_speed() must be called before propagate_stream(). + """ + + def __init__( + self, + process_unit_id: ProcessUnitId, + pump_chart: ChartData, + ) -> None: + self._id = process_unit_id + self._pump_chart = Chart(pump_chart) + self._speed: float | None = None + self._last_head_joule_per_kg: float | None = None + self._last_efficiency: float | None = None + self._last_mass_rate_kg_per_s: float | None = None + + def get_id(self) -> ProcessUnitId: + return self._id + + def set_speed(self, speed: float) -> None: + self._speed = speed + + @property + def minimum_flow_rate(self) -> float: + """Minimum volumetric flow rate [m³/h] at current speed.""" + return float(self._pump_chart.minimum_rate_as_function_of_speed(self.speed)) + + @property + def maximum_flow_rate(self) -> float: + """Maximum volumetric flow rate [m³/h] at current speed.""" + return float(self._pump_chart.maximum_rate_as_function_of_speed(self.speed)) + + def get_recirculation_range(self, inlet_stream: LiquidStream) -> Boundary: + """How much recirculation (Sm³/day) is needed/available to stay within pump capacity. + + Standard ≈ actual for liquids; min/max rates [m³/h] × 24 → Sm³/day. + """ + min_sm3_day = self.minimum_flow_rate * 24.0 + max_sm3_day = self.maximum_flow_rate * 24.0 + inlet_sm3_day = inlet_stream.standard_rate_sm3_per_day + return Boundary( + min=max(0.0, min_sm3_day - inlet_sm3_day), + max=max(0.0, max_sm3_day - inlet_sm3_day), + ) + + @property + def speed(self) -> float: + if self._speed is None: + if not self._pump_chart.is_variable_speed: + return self._pump_chart.minimum_speed + raise ValueError("Speed not set. Variable-speed Pump must be registered on a Shaft.") + return self._speed + + def propagate_stream(self, inlet_stream: LiquidStream) -> LiquidStream: + """Propagate liquid stream through the pump. + + Computes outlet pressure from hydraulic head and tracks shaft power. + Raises RateTooLowError / RateTooHighError if the operating point is + outside the pump chart envelope at the current speed. + """ + rate = inlet_stream.volumetric_rate_m3_per_hour + density = inlet_stream.density_kg_per_m3 + + head, efficiency = self._head_and_efficiency_at_rate(rate) + + min_rate = float(self._pump_chart.minimum_rate_as_function_of_head(head)) + max_rate = float(self._pump_chart.maximum_rate_as_function_of_head(head)) + + if rate < min_rate: + raise RateTooLowError( + actual_rate=rate, + boundary_rate=min_rate, + process_unit_id=self._id, + ) + + if rate > max_rate: + raise RateTooHighError( + actual_rate=rate, + boundary_rate=max_rate, + process_unit_id=self._id, + ) + + self._last_head_joule_per_kg = head + self._last_efficiency = efficiency + self._last_mass_rate_kg_per_s = inlet_stream.mass_rate_kg_per_h / 3600.0 + + pressure_rise_bara = head * density / _BARA_TO_PASCAL + outlet_pressure = inlet_stream.pressure_bara + pressure_rise_bara + + return inlet_stream.with_pressure(outlet_pressure) + + @property + def minimum_speed(self) -> float: + return self._pump_chart.minimum_speed + + @property + def maximum_speed(self) -> float: + return self._pump_chart.maximum_speed + + @property + def last_shaft_power_mw(self) -> float: + """Shaft power [MW] from the most recent propagate_stream() call. + + P = ṁ [kg/s] × head [J/kg] / η / 1e6 + """ + if ( + self._last_head_joule_per_kg is None + or self._last_efficiency is None + or self._last_mass_rate_kg_per_s is None + ): + raise ValueError("No result available — call propagate_stream() first.") + return self._last_mass_rate_kg_per_s * self._last_head_joule_per_kg / self._last_efficiency / 1e6 + + @property + def last_head_joule_per_kg(self) -> float: + """Hydraulic head [J/kg] from the most recent propagate_stream() call.""" + if self._last_head_joule_per_kg is None: + raise ValueError("No result available — call propagate_stream() first.") + return self._last_head_joule_per_kg + + @property + def last_efficiency(self) -> float: + """Pump efficiency [-] from the most recent propagate_stream() call.""" + if self._last_efficiency is None: + raise ValueError("No result available — call propagate_stream() first.") + return self._last_efficiency + + def _head_and_efficiency_at_rate(self, rate_m3_per_hour: float) -> tuple[float, float]: + """Head [J/kg] and efficiency [-] at current speed and flow rate.""" + curve = self._pump_chart.get_curve_by_speed(self.speed) + if curve is not None: + return ( + float(curve.head_as_function_of_rate(rate_m3_per_hour)), + float(curve.efficiency_as_function_of_rate(rate_m3_per_hour)), + ) + + # Variable speed between two curves — linear interpolation + curves_below = [c for c in self._pump_chart.curves if c.speed <= self.speed] + curves_above = [c for c in self._pump_chart.curves if c.speed >= self.speed] + if not curves_below or not curves_above: + raise ValueError( + f"Speed {self.speed} rpm is outside the pump chart range " + f"[{self._pump_chart.minimum_speed}, {self._pump_chart.maximum_speed}]." + ) + + c_low = curves_below[-1] + c_high = curves_above[0] + alpha = (self.speed - c_low.speed) / (c_high.speed - c_low.speed) + + head = (1 - alpha) * float(c_low.head_as_function_of_rate(rate_m3_per_hour)) + alpha * float( + c_high.head_as_function_of_rate(rate_m3_per_hour) + ) + efficiency = (1 - alpha) * float(c_low.efficiency_as_function_of_rate(rate_m3_per_hour)) + alpha * float( + c_high.efficiency_as_function_of_rate(rate_m3_per_hour) + ) + return head, efficiency diff --git a/src/libecalc/domain/process/entities/process_units/splitter.py b/src/libecalc/domain/process/entities/process_units/splitter.py index 827b9a4a17..eee39c5049 100644 --- a/src/libecalc/domain/process/entities/process_units/splitter.py +++ b/src/libecalc/domain/process/entities/process_units/splitter.py @@ -1,10 +1,10 @@ from typing import Final -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class Splitter(ProcessUnit): +class Splitter(GasProcessUnit): """Removes a fixed standard rate from the through-stream. The split rate is set via `set_rate` before propagation. This models a gas @@ -13,7 +13,7 @@ class Splitter(ProcessUnit): """ def __init__(self, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, rate: float = 0.0): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._fluid_service = fluid_service self._rate = rate diff --git a/src/libecalc/domain/process/entities/process_units/temperature_setter.py b/src/libecalc/domain/process/entities/process_units/temperature_setter.py index 2cc31f07b1..f2f175984e 100644 --- a/src/libecalc/domain/process/entities/process_units/temperature_setter.py +++ b/src/libecalc/domain/process/entities/process_units/temperature_setter.py @@ -1,17 +1,17 @@ from typing import Final -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class TemperatureSetter(ProcessUnit): +class TemperatureSetter(GasProcessUnit): def __init__( self, required_temperature_kelvin: float, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._required_temperature_kelvin = required_temperature_kelvin self._fluid_service = fluid_service diff --git a/src/libecalc/domain/process/entities/shaft/shaft.py b/src/libecalc/domain/process/entities/shaft/shaft.py index 0072702b2b..2dc575a622 100644 --- a/src/libecalc/domain/process/entities/shaft/shaft.py +++ b/src/libecalc/domain/process/entities/shaft/shaft.py @@ -1,8 +1,9 @@ +import uuid from abc import ABC, abstractmethod -from typing import Final +from typing import Final, NewType, Protocol, runtime_checkable +from uuid import UUID from libecalc.common.errors.exceptions import ProgrammingError -from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_solver.boundary import Boundary from libecalc.domain.process.process_solver.configuration import ( Configuration, @@ -10,12 +11,32 @@ ) from libecalc.domain.process.process_solver.configuration_handler import ConfigurationHandler, ConfigurationHandlerId +ShaftId = NewType("ShaftId", UUID) -class Shaft(ConfigurationHandler, ABC): - """Abstract base class for a shaft. - Can be expanded to include more properties and methods as needed. - Name, id, units connected to it, etc +def create_shaft_id() -> ShaftId: + return ShaftId(uuid.uuid4()) + + +@runtime_checkable +class ShaftConnectable(Protocol): + """Any unit that can be mounted on a shaft: exposes speed bounds and accepts speed updates.""" + + @property + def minimum_speed(self) -> float: ... + + @property + def maximum_speed(self) -> float: ... + + def set_speed(self, speed: float) -> None: ... + + +class Shaft[T: ShaftConnectable](ConfigurationHandler, ABC): + """Abstract base class for a shaft driving a homogeneous set of process units. + + A shaft connects to either compressors or pumps — never a mix of both. + The generic parameter _T enforces this at the type-checking level: + VariableSpeedShaft[Compressor], SingleSpeedShaft[Pump], etc. """ def __init__( @@ -25,7 +46,7 @@ def __init__( ): self._id: Final[ConfigurationHandlerId] = configuration_handler_id or ConfigurationHandler._create_id() self._speed_rpm = speed_rpm - self._compressors: list[Compressor] = [] + self._units: list[T] = [] def get_id(self) -> ConfigurationHandlerId: return self._id @@ -43,10 +64,10 @@ def handle_configuration(self, configuration: Configuration): def set_speed(self, value: float) -> None: pass - def connect(self, compressor: Compressor) -> None: - if compressor in self._compressors: - raise ProgrammingError("Compressor is already registered on this shaft.") - self._compressors.append(compressor) + def connect(self, unit: T) -> None: + if unit in self._units: + raise ProgrammingError("Unit is already registered on this shaft.") + self._units.append(unit) def get_speed(self) -> float: if self._speed_rpm is None: @@ -61,21 +82,19 @@ def speed_is_defined(self) -> bool: return self._speed_rpm is not None def get_speed_boundary(self) -> Boundary: - from libecalc.domain.process.process_solver.boundary import Boundary - - if not self._compressors: - raise ValueError("No compressors registered on this shaft.") - min_speed = max(c.compressor_chart.minimum_speed for c in self._compressors) - max_speed = min(c.compressor_chart.maximum_speed for c in self._compressors) + if not self._units: + raise ValueError("No units registered on this shaft.") + min_speed = max(u.minimum_speed for u in self._units) + max_speed = min(u.maximum_speed for u in self._units) return Boundary(min=min_speed, max=max_speed) def _apply_speed(self, value: float): self._speed_rpm = value - for compressor in self._compressors: - compressor.set_speed(value) + for unit in self._units: + unit.set_speed(value) -class SingleSpeedShaft(Shaft): +class SingleSpeedShaft[T: ShaftConnectable](Shaft[T]): def set_speed(self, value: float): if self._speed_rpm is None: self._apply_speed(value) @@ -83,6 +102,6 @@ def set_speed(self, value: float): raise AttributeError("Speed has already been set. Cannot modify speed of SingleSpeedShaft") -class VariableSpeedShaft(Shaft): +class VariableSpeedShaft[T: ShaftConnectable](Shaft[T]): def set_speed(self, value: float): self._apply_speed(value) diff --git a/src/libecalc/domain/process/process_pipeline/liquid_process_unit.py b/src/libecalc/domain/process/process_pipeline/liquid_process_unit.py new file mode 100644 index 0000000000..76bbab2e8a --- /dev/null +++ b/src/libecalc/domain/process/process_pipeline/liquid_process_unit.py @@ -0,0 +1,20 @@ +import abc + +from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream + + +class LiquidProcessUnit(abc.ABC): + """A process unit that operates on a liquid stream. + + Distinct from GasProcessUnit (gas/FluidStream path). Liquid units are incompressible — + density is constant through the unit, no thermodynamic flash required. + + Current implementations: Pump + """ + + @abc.abstractmethod + def get_id(self) -> ProcessUnitId: ... + + @abc.abstractmethod + def propagate_stream(self, inlet_stream: LiquidStream) -> LiquidStream: ... diff --git a/src/libecalc/domain/process/process_pipeline/process_unit.py b/src/libecalc/domain/process/process_pipeline/process_unit.py index b75a687dcf..8a640bccec 100644 --- a/src/libecalc/domain/process/process_pipeline/process_unit.py +++ b/src/libecalc/domain/process/process_pipeline/process_unit.py @@ -1,18 +1,40 @@ import abc -from typing import NewType, Self +from typing import Any, NewType, Protocol, Self from uuid import UUID from libecalc.common.ddd.entity import Entity from libecalc.common.utils.ecalc_uuid import ecalc_id_generator -from libecalc.domain.process.process_pipeline.stream_propagator import StreamPropagator +from libecalc.domain.process.value_objects.fluid_stream import FluidStream ProcessUnitId = NewType("ProcessUnitId", UUID) -class ProcessUnit(Entity[ProcessUnitId], StreamPropagator, abc.ABC): +def create_process_unit_id() -> ProcessUnitId: + """Standalone id factory for units not extending GasProcessUnit (e.g. Pump, DirectMixer).""" + return ProcessUnitId(ecalc_id_generator()) + + +class ProcessUnit(Protocol): + """Any unit that can participate in a process pipeline. + + Satisfied structurally by Compressor, Pump, DirectMixer, DirectSplitter, + and any other unit with get_id() and propagate_stream(). + """ + + def get_id(self) -> ProcessUnitId: ... + + def propagate_stream(self, inlet_stream: Any) -> Any: ... + + +class GasProcessUnit(Entity[ProcessUnitId], abc.ABC): + """Process unit operating on gas streams (FluidStream).""" + @abc.abstractmethod def get_id(self) -> ProcessUnitId: ... + @abc.abstractmethod + def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: ... + @classmethod def _create_id(cls: type[Self]) -> ProcessUnitId: return ProcessUnitId(ecalc_id_generator()) diff --git a/src/libecalc/domain/process/process_solver/anti_surge/anti_surge_strategy.py b/src/libecalc/domain/process/process_solver/anti_surge/anti_surge_strategy.py index 3d59d92c5f..ffaf4320d6 100644 --- a/src/libecalc/domain/process/process_solver/anti_surge/anti_surge_strategy.py +++ b/src/libecalc/domain/process/process_solver/anti_surge/anti_surge_strategy.py @@ -4,30 +4,23 @@ from libecalc.domain.process.process_solver.configuration import Configuration from libecalc.domain.process.process_solver.solver import Solution from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class AntiSurgeStrategy(ABC): """ - Strategy for keeping the train within compressor chart capacity at the current speed. + Strategy for keeping the train within capacity at the current speed. - Used primarily during speed search when propagation may fail (e.g. RateTooLowError). - Implementations may adjust control elements (e.g. ASV recirculation) and may - propagate internally in order to establish a feasible operating point. + Used during speed search when propagation may fail (e.g. RateTooLowError). + Works for both compressor anti-surge and pump minimum-flow bypass. """ @abstractmethod - def apply(self, inlet_stream: FluidStream) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: - """ - Adjust the system so it can be propagated at the current speed without violating - minimum-flow / capacity constraints. - - Returns: - Outlet stream after applying anti-surge adjustments (e.g. setting ASV recirculation). - """ + def apply(self, inlet_stream: StreamWithPressure) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: + """Adjust recirculation so the system can be propagated without violating capacity.""" ... @abstractmethod def reset(self) -> None: - """Reset mutable control state used by the strategy (e.g. ASV recirculation).""" + """Reset mutable control state (e.g. recirculation rates) to zero.""" ... diff --git a/src/libecalc/domain/process/process_solver/anti_surge/common_asv.py b/src/libecalc/domain/process/process_solver/anti_surge/common_asv.py index 6fefdbaf05..0a717ace23 100644 --- a/src/libecalc/domain/process/process_solver/anti_surge/common_asv.py +++ b/src/libecalc/domain/process/process_solver/anti_surge/common_asv.py @@ -1,6 +1,5 @@ from collections.abc import Sequence -from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_solver.anti_surge.anti_surge_strategy import AntiSurgeStrategy from libecalc.domain.process.process_solver.configuration import Configuration, ConfigurationHandlerId from libecalc.domain.process.process_solver.process_runner import ProcessRunner @@ -10,7 +9,8 @@ RecirculationConfiguration, RecirculationSolver, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.process_solver.unit_protocol import RecirculatingUnit +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class CommonASVAntiSurgeStrategy(AntiSurgeStrategy): @@ -29,18 +29,18 @@ def __init__( self, simulator: ProcessRunner, recirculation_loop_id: ConfigurationHandlerId, - first_compressor: Compressor, + first_unit: RecirculatingUnit, root_finding_strategy: RootFindingStrategy, ): self._recirculation_loop_id = recirculation_loop_id - self._first_compressor = first_compressor + self._first_unit = first_unit self._root_finding_strategy = root_finding_strategy self._simulator = simulator def reset(self) -> None: self._apply_configuration(RecirculationConfiguration(recirculation_rate=0.0)) - def apply(self, inlet_stream: FluidStream) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: + def apply(self, inlet_stream: StreamWithPressure) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: # Increase recirculation to give minimum feasible flow and return outlet. recirculation_solution = self._increase_recirculation_to_minimum_feasible(inlet_stream) return Solution( @@ -59,13 +59,13 @@ def _apply_configuration(self, cfg: RecirculationConfiguration): ) def _increase_recirculation_to_minimum_feasible( - self, inlet_stream: FluidStream + self, inlet_stream: StreamWithPressure ) -> Solution[RecirculationConfiguration]: # The recirculation boundary depends on the inlet stream (and implicitly current speed). - compressor_inlet_stream = self._simulator.run(inlet_stream=inlet_stream, to_id=self._first_compressor.get_id()) - boundary = self._first_compressor.get_recirculation_range(compressor_inlet_stream) + unit_inlet_stream = self._simulator.run(inlet_stream=inlet_stream, to_id=self._first_unit.get_id()) + boundary = self._first_unit.get_recirculation_range(unit_inlet_stream) - def recirculation_func(cfg: RecirculationConfiguration) -> FluidStream: + def recirculation_func(cfg: RecirculationConfiguration) -> StreamWithPressure: self._apply_configuration(cfg) return self._simulator.run(inlet_stream=inlet_stream) diff --git a/src/libecalc/domain/process/process_solver/anti_surge/individual_asv.py b/src/libecalc/domain/process/process_solver/anti_surge/individual_asv.py index 5b68277ff4..8d8de80f3a 100644 --- a/src/libecalc/domain/process/process_solver/anti_surge/individual_asv.py +++ b/src/libecalc/domain/process/process_solver/anti_surge/individual_asv.py @@ -1,7 +1,6 @@ from collections.abc import Sequence from typing import override -from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_pipeline.process_error import RateTooHighError from libecalc.domain.process.process_solver.anti_surge.anti_surge_strategy import AntiSurgeStrategy from libecalc.domain.process.process_solver.configuration import Configuration, ConfigurationHandlerId @@ -12,29 +11,26 @@ SolverFailureStatus, ) from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.process_solver.unit_protocol import RecirculatingUnit +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class IndividualASVAntiSurgeStrategy(AntiSurgeStrategy): - """Anti-surge strategy for INDIVIDUAL ASV topology (one recirculation loop per stage). + """Anti-surge / minimum-flow strategy for INDIVIDUAL ASV topology. - Propagates stage-by-stage, setting each stage’s ASV recirculation to the minimum feasible - value based on that stage’s actual inlet stream, and returns the final outlet stream. - - Contract: - - Mutates each stage's RecirculationLoop by setting its recirculation rate. - - Returns the outlet stream after all stages have been propagated with minimum feasible recirculation. + One recirculation loop per stage. Works for both compressor anti-surge and + pump minimum-flow bypass — any unit satisfying RecirculatingUnit. """ def __init__( self, recirculation_loop_ids: Sequence[ConfigurationHandlerId], - compressors: Sequence[Compressor], + units: Sequence[RecirculatingUnit], simulator: ProcessRunner, ): - assert len(recirculation_loop_ids) == len(compressors) + assert len(recirculation_loop_ids) == len(units) self._recirculation_loop_ids = recirculation_loop_ids - self._compressors = compressors + self._units = units self._simulator = simulator def _apply_recirculation_configuration(self, loop_id: ConfigurationHandlerId, recirculation_rate: float): @@ -60,11 +56,11 @@ def reset(self) -> None: ) @override - def apply(self, inlet_stream: FluidStream) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: + def apply(self, inlet_stream: StreamWithPressure) -> Solution[Sequence[Configuration[RecirculationConfiguration]]]: configurations: Sequence[Configuration[RecirculationConfiguration]] = [] - for loop_id, compressor in zip(self._recirculation_loop_ids, self._compressors, strict=True): + for loop_id, unit in zip(self._recirculation_loop_ids, self._units, strict=True): try: - inlet_stream_compressor = self._simulator.run(inlet_stream=inlet_stream, to_id=compressor.get_id()) + inlet_stream_unit = self._simulator.run(inlet_stream=inlet_stream, to_id=unit.get_id()) except RateTooHighError as e: return Solution( success=False, @@ -76,19 +72,19 @@ def apply(self, inlet_stream: FluidStream) -> Solution[Sequence[Configuration[Re source_id=e.process_unit_id, ), ) - max_actual_rate = compressor.maximum_flow_rate - if inlet_stream_compressor.volumetric_rate_m3_per_hour > max_actual_rate: + max_actual_rate = unit.maximum_flow_rate + if inlet_stream_unit.volumetric_rate_m3_per_hour > max_actual_rate: # pyright: ignore[reportAttributeAccessIssue] return Solution( success=False, configuration=configurations, failure_event=OutsideCapacityEvent( status=SolverFailureStatus.ABOVE_MAXIMUM_FLOW_RATE, - actual_value=inlet_stream_compressor.volumetric_rate_m3_per_hour, + actual_value=inlet_stream_unit.volumetric_rate_m3_per_hour, # pyright: ignore[reportAttributeAccessIssue] boundary_value=max_actual_rate, - source_id=compressor.get_id(), + source_id=unit.get_id(), ), ) - boundary = compressor.get_recirculation_range(inlet_stream=inlet_stream_compressor) + boundary = unit.get_recirculation_range(inlet_stream=inlet_stream_unit) configuration: Configuration[RecirculationConfiguration] = Configuration( configuration_handler_id=loop_id, value=RecirculationConfiguration( diff --git a/src/libecalc/domain/process/process_solver/feasibility_solver.py b/src/libecalc/domain/process/process_solver/feasibility_solver.py index 77bc59bd05..63b3590e34 100644 --- a/src/libecalc/domain/process/process_solver/feasibility_solver.py +++ b/src/libecalc/domain/process/process_solver/feasibility_solver.py @@ -1,3 +1,5 @@ +from typing import cast + from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_solver.float_constraint import FloatConstraint from libecalc.domain.process.process_solver.outlet_pressure_solver import OutletPressureSolver @@ -49,7 +51,7 @@ def _find_feasible_rate( Returns the full inlet rate if the solver succeeds, otherwise finds the bottleneck compressor's stone wall limit at the current operating point. """ - solution = self._solver.find_solution(target_pressure, inlet_stream) + solution = self._solver.find_solution(target_pressure, inlet_stream) # pyright: ignore[reportArgumentType] - FluidStream satisfies StreamWithPressure at runtime if solution.success: # The train can handle the full rate — no need to search for a bottleneck. @@ -61,9 +63,12 @@ def _find_feasible_rate( # Search for compressor with the lowest max rate min_max_rate = float("inf") for compressor in self._compressors: - compressor_inlet = self._runner.run( - inlet_stream=inlet_stream, - to_id=compressor.get_id(), + compressor_inlet = cast( + FluidStream, + self._runner.run( # pyright: ignore[reportArgumentType] + inlet_stream=inlet_stream, + to_id=compressor.get_id(), + ), ) max_rate = compressor.get_maximum_standard_rate(compressor_inlet) min_max_rate = min(min_max_rate, max_rate) diff --git a/src/libecalc/domain/process/process_solver/multi_pressure_solver.py b/src/libecalc/domain/process/process_solver/multi_pressure_solver.py index fd66c61ffa..b8e96f40ed 100644 --- a/src/libecalc/domain/process/process_solver/multi_pressure_solver.py +++ b/src/libecalc/domain/process/process_solver/multi_pressure_solver.py @@ -15,7 +15,7 @@ TargetNotAchievableEvent, ) from libecalc.domain.process.process_solver.solvers.speed_solver import SpeedConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class MultiPressureSolver: @@ -67,7 +67,7 @@ def _validate_pressure_control_placement(segments: list[OutletPressureSolver]) - def find_solution( self, pressure_targets: list[FloatConstraint], - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration]]: if len(pressure_targets) != len(self._segments): raise DomainValidationException( diff --git a/src/libecalc/domain/process/process_solver/outlet_pressure_solver.py b/src/libecalc/domain/process/process_solver/outlet_pressure_solver.py index d9b1943a70..0f5068e9a6 100644 --- a/src/libecalc/domain/process/process_solver/outlet_pressure_solver.py +++ b/src/libecalc/domain/process/process_solver/outlet_pressure_solver.py @@ -19,7 +19,7 @@ RecirculationConfiguration, ) from libecalc.domain.process.process_solver.solvers.speed_solver import SpeedConfiguration, SpeedSolver -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class OutletPressureSolver: @@ -81,7 +81,7 @@ def _get_initial_speed_boundary(self) -> Boundary: def _find_speed_solution( self, pressure_constraint: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[SpeedConfiguration]: speed_solver = SpeedSolver( search_strategy=BinarySearchStrategy(), @@ -90,7 +90,7 @@ def _find_speed_solution( target_pressure=pressure_constraint.value, ) - def speed_func(configuration: SpeedConfiguration) -> FluidStream: + def speed_func(configuration: SpeedConfiguration) -> StreamWithPressure: self._simulator.apply_configuration( Configuration(configuration_handler_id=self._shaft_id, value=configuration) ) @@ -107,7 +107,7 @@ def speed_func(configuration: SpeedConfiguration) -> FluidStream: return speed_solution - def _get_outlet_stream(self, inlet_stream: FluidStream, configurations: Sequence[Configuration]): + def _get_outlet_stream(self, inlet_stream: StreamWithPressure, configurations: Sequence[Configuration]): self._simulator.apply_configurations(configurations) return self._simulator.run(inlet_stream=inlet_stream) @@ -118,7 +118,7 @@ def get_anti_surge_solution(self) -> Solution[Sequence[Configuration[Recirculati def find_solution( self, pressure_constraint: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration]]: """ Finds the speed and recirculation rates for each compressor to meet the pressure constraint. diff --git a/src/libecalc/domain/process/process_solver/pressure_control/common_asv.py b/src/libecalc/domain/process/process_solver/pressure_control/common_asv.py index 49c7540af0..d1c308df35 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/common_asv.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/common_asv.py @@ -1,6 +1,5 @@ from collections.abc import Sequence -from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_solver.configuration import Configuration, ConfigurationHandlerId from libecalc.domain.process.process_solver.float_constraint import FloatConstraint from libecalc.domain.process.process_solver.pressure_control.pressure_control_strategy import PressureControlStrategy @@ -16,7 +15,8 @@ RecirculationConfiguration, RecirculationSolver, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.process_solver.unit_protocol import RecirculatingUnit +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class CommonASVPressureControlStrategy(PressureControlStrategy): @@ -31,20 +31,20 @@ def __init__( self, simulator: ProcessRunner, recirculation_loop_id: ConfigurationHandlerId, - first_compressor: Compressor, + first_unit: RecirculatingUnit, root_finding_strategy: RootFindingStrategy, ): self._simulator = simulator self._recirculation_loop_id = recirculation_loop_id - self._first_compressor = first_compressor + self._first_unit = first_unit self._root_finding_strategy = root_finding_strategy def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: - def recirculation_func(config: RecirculationConfiguration) -> FluidStream: + def recirculation_func(config: RecirculationConfiguration) -> StreamWithPressure: self._simulator.apply_configuration( Configuration(configuration_handler_id=self._recirculation_loop_id, value=config) ) @@ -60,8 +60,8 @@ def recirculation_func(config: RecirculationConfiguration) -> FluidStream: value=RecirculationConfiguration(recirculation_rate=0.0), ) ) - compressor_inlet_stream = self._simulator.run(inlet_stream=inlet_stream, to_id=self._first_compressor.get_id()) - boundary = self._first_compressor.get_recirculation_range(compressor_inlet_stream) + unit_inlet_stream = self._simulator.run(inlet_stream=inlet_stream, to_id=self._first_unit.get_id()) + boundary = self._first_unit.get_recirculation_range(unit_inlet_stream) min_configuration = RecirculationConfiguration(recirculation_rate=boundary.max) min_pressure_stream = recirculation_func(min_configuration) diff --git a/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py b/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py index 007acdd03f..ee986dae7a 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py @@ -10,7 +10,7 @@ DownstreamChokeSolver, ) from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class DownstreamChokePressureControlStrategy(PressureControlStrategy): @@ -30,9 +30,9 @@ def __init__( def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: - def outlet_with_choke(cfg: ChokeConfiguration) -> FluidStream: + def outlet_with_choke(cfg: ChokeConfiguration) -> StreamWithPressure: self._simulator.apply_configuration( Configuration(configuration_handler_id=self._choke_configuration_handler_id, value=cfg) ) diff --git a/src/libecalc/domain/process/process_solver/pressure_control/individual_asv.py b/src/libecalc/domain/process/process_solver/pressure_control/individual_asv.py index fa685db87e..b0a094ad52 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/individual_asv.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/individual_asv.py @@ -1,7 +1,6 @@ from collections.abc import Sequence from libecalc.domain.process.compressor.core.train.utils.numeric_methods import find_root -from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.process_solver.configuration import Configuration, ConfigurationHandlerId from libecalc.domain.process.process_solver.float_constraint import FloatConstraint from libecalc.domain.process.process_solver.pressure_control.pressure_control_strategy import PressureControlStrategy @@ -17,7 +16,8 @@ RecirculationConfiguration, RecirculationSolver, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.process_solver.unit_protocol import RecirculatingUnit +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class IndividualASVPressureControlStrategy(PressureControlStrategy): @@ -33,23 +33,23 @@ def __init__( self, simulator: ProcessRunner, recirculation_loop_ids: Sequence[ConfigurationHandlerId], - compressors: Sequence[Compressor], + units: Sequence[RecirculatingUnit], root_finding_strategy: RootFindingStrategy, ): self._simulator = simulator self._recirculation_loop_ids = recirculation_loop_ids - self._compressors = compressors + self._units = units self._root_finding_strategy = root_finding_strategy def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: minimum_achievable_pressure_configurations = _minimum_achievable_pressure( simulator=self._simulator, recirculation_loop_ids=self._recirculation_loop_ids, - compressors=self._compressors, + units=self._units, inlet_stream=inlet_stream, ) self._simulator.apply_configurations(minimum_achievable_pressure_configurations) @@ -72,7 +72,7 @@ def apply( configurations: list[Configuration[RecirculationConfiguration | ChokeConfiguration]] = [] - for i, (recirculation_loop_id, compressor) in enumerate(zip(self._recirculation_loop_ids, self._compressors)): + for i, (recirculation_loop_id, compressor) in enumerate(zip(self._recirculation_loop_ids, self._units)): # Target pressure for this stage: cumulative from original inlet stage_target_pressure = inlet_stream.pressure_bara * (pressure_ratio_per_stage ** (i + 1)) @@ -85,7 +85,7 @@ def apply( current_stream = self._simulator.run(inlet_stream=inlet_stream, to_id=compressor.get_id()) boundary = compressor.get_recirculation_range(inlet_stream=current_stream) - def recirculation_func(config: RecirculationConfiguration) -> FluidStream: + def recirculation_func(config: RecirculationConfiguration) -> StreamWithPressure: self._simulator.apply_configuration( Configuration(configuration_handler_id=recirculation_loop_id, value=config) ) @@ -130,20 +130,20 @@ def __init__( self, simulator: ProcessRunner, recirculation_loop_ids: Sequence[ConfigurationHandlerId], - compressors: Sequence[Compressor], + units: Sequence[RecirculatingUnit], ): self._simulator = simulator self._recirculation_loop_ids = recirculation_loop_ids - self._compressors = compressors + self._units = units def _get_configurations_from_fraction( self, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, asv_rate_fraction: float, ) -> Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]: """Propagate stream through all stages, interpolating recirculation between min and max per stage.""" configurations: list[Configuration[RecirculationConfiguration | ChokeConfiguration]] = [] - for recirculation_loop_id, compressor in zip(self._recirculation_loop_ids, self._compressors): + for recirculation_loop_id, compressor in zip(self._recirculation_loop_ids, self._units): self._simulator.apply_configuration( Configuration( configuration_handler_id=recirculation_loop_id, @@ -166,12 +166,12 @@ def _get_configurations_from_fraction( def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: minimum_achievable_pressure_configurations = _minimum_achievable_pressure( simulator=self._simulator, recirculation_loop_ids=self._recirculation_loop_ids, - compressors=self._compressors, + units=self._units, inlet_stream=inlet_stream, ) self._simulator.apply_configurations(minimum_achievable_pressure_configurations) @@ -189,7 +189,7 @@ def apply( ), ) - def get_outlet_stream(rate_fraction: float) -> FluidStream: + def get_outlet_stream(rate_fraction: float) -> StreamWithPressure: test_configurations = self._get_configurations_from_fraction( inlet_stream=inlet_stream, asv_rate_fraction=rate_fraction, @@ -219,20 +219,20 @@ def get_outlet_stream(rate_fraction: float) -> FluidStream: def _minimum_achievable_pressure( simulator: ProcessRunner, recirculation_loop_ids: Sequence[ConfigurationHandlerId], - compressors: Sequence[Compressor], - inlet_stream: FluidStream, + units: Sequence[RecirculatingUnit], + inlet_stream: StreamWithPressure, ) -> Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]: """Propagate with maximum recirculation on every stage to find the lowest achievable pressure.""" configurations: list[Configuration[RecirculationConfiguration | ChokeConfiguration]] = [] - for loop, compressor in zip(recirculation_loop_ids, compressors): + for loop, unit in zip(recirculation_loop_ids, units): simulator.apply_configuration( Configuration( configuration_handler_id=loop, value=RecirculationConfiguration(recirculation_rate=0.0), ) ) - current_stream = simulator.run(inlet_stream=inlet_stream, to_id=compressor.get_id()) - boundary = compressor.get_recirculation_range(current_stream) + current_stream = simulator.run(inlet_stream=inlet_stream, to_id=unit.get_id()) + boundary = unit.get_recirculation_range(current_stream) configuration: Configuration[RecirculationConfiguration | ChokeConfiguration] = Configuration( configuration_handler_id=loop, value=RecirculationConfiguration(recirculation_rate=boundary.max) ) diff --git a/src/libecalc/domain/process/process_solver/pressure_control/pressure_control_strategy.py b/src/libecalc/domain/process/process_solver/pressure_control/pressure_control_strategy.py index 4edec1d95d..1fe192b4fc 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/pressure_control_strategy.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/pressure_control_strategy.py @@ -6,28 +6,20 @@ from libecalc.domain.process.process_solver.solver import Solution from libecalc.domain.process.process_solver.solvers.downstream_choke_solver import ChokeConfiguration from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class PressureControlStrategy(ABC): """Strategy for meeting a target outlet pressure at fixed speed. - Implementations close over the mutable system state at construction time. + Works for both compressor trains (via ASV/choke) and pump systems (via bypass). """ @abstractmethod def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: - """Adjust the system to meet the target pressure. - - Args: - target_pressure: Target outlet pressure. - inlet_stream: The inlet fluid stream. - - Returns: - Solution containing the manipulations (e.g. recirculation rate, choke ΔP) needed to meet the target pressure. - """ + """Adjust the system to meet the target pressure.""" ... diff --git a/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py b/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py index f23f688019..81a84d7c07 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py @@ -11,7 +11,7 @@ from libecalc.domain.process.process_solver.solvers.downstream_choke_solver import ChokeConfiguration from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration from libecalc.domain.process.process_solver.solvers.upstream_choke_solver import UpstreamChokeSolver -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class UpstreamChokePressureControlStrategy(PressureControlStrategy): @@ -36,7 +36,7 @@ def __init__( def apply( self, target_pressure: FloatConstraint, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, ) -> Solution[Sequence[Configuration[RecirculationConfiguration | ChokeConfiguration]]]: # Use a small margin to avoid evaluating exactly at the physical/numerical extremes: # ΔP = 0 (no choke) and ΔP = inlet_pressure (zero/negative suction pressure). @@ -51,7 +51,7 @@ def apply( delta_pressure_boundary=delta_pressure_boundary, ) - def choke_func(config: ChokeConfiguration) -> FluidStream: + def choke_func(config: ChokeConfiguration) -> StreamWithPressure: # The runner is responsible for interpreting upstream ΔP as reduced suction pressure # seen by the downstream process system. self._simulator.apply_configuration( diff --git a/src/libecalc/domain/process/process_solver/process_pipeline_runner.py b/src/libecalc/domain/process/process_solver/process_pipeline_runner.py index f01be1b4f2..4577feeae9 100644 --- a/src/libecalc/domain/process/process_solver/process_pipeline_runner.py +++ b/src/libecalc/domain/process/process_solver/process_pipeline_runner.py @@ -4,10 +4,10 @@ from libecalc.domain.process.process_solver.configuration import Configuration, ConfigurationHandlerId from libecalc.domain.process.process_solver.configuration_handler import ConfigurationHandler from libecalc.domain.process.process_solver.process_runner import ProcessRunner -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure -def propagate_stream_many(process_units: Sequence[ProcessUnit], inlet_stream: FluidStream) -> FluidStream: +def propagate_stream_many(process_units: Sequence[ProcessUnit], inlet_stream: StreamWithPressure) -> StreamWithPressure: current_stream = inlet_stream for process_unit in process_units: current_stream = process_unit.propagate_stream(current_stream) @@ -34,10 +34,10 @@ def _get_configuration_handler(self, configuration_handler_id: ConfigurationHand def _propagate_stream_to_id( self, - inlet_stream: FluidStream, + inlet_stream: StreamWithPressure, units: Iterable[ProcessUnit], to_id: ProcessUnitId, - ) -> tuple[FluidStream, bool]: + ) -> tuple[StreamWithPressure, bool]: current_stream = inlet_stream for unit in units: if unit.get_id() == to_id: @@ -46,7 +46,7 @@ def _propagate_stream_to_id( return current_stream, False - def run(self, inlet_stream: FluidStream, to_id: ProcessUnitId | None = None) -> FluidStream: + def run(self, inlet_stream: StreamWithPressure, to_id: ProcessUnitId | None = None) -> StreamWithPressure: if to_id is not None: current_stream, found = self._propagate_stream_to_id( inlet_stream=inlet_stream, units=self._units.values(), to_id=to_id diff --git a/src/libecalc/domain/process/process_solver/process_runner.py b/src/libecalc/domain/process/process_solver/process_runner.py index 4221603f5b..0db97f7aa6 100644 --- a/src/libecalc/domain/process/process_solver/process_runner.py +++ b/src/libecalc/domain/process/process_solver/process_runner.py @@ -3,7 +3,7 @@ from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId from libecalc.domain.process.process_solver.configuration import Configuration, OperatingConfiguration -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class ProcessRunner(abc.ABC): @@ -18,11 +18,11 @@ def apply_configurations(self, configurations: Sequence[Configuration[OperatingC self.apply_configuration(configuration) @abc.abstractmethod - def run(self, inlet_stream: FluidStream, to_id: ProcessUnitId | None = None) -> FluidStream: + def run(self, inlet_stream: StreamWithPressure, to_id: ProcessUnitId | None = None) -> StreamWithPressure: """ Simulate the process Args: - inlet_stream: inlet stream to the process. + inlet_stream: inlet stream to the process (FluidStream or LiquidStream). to_id: If provided, simulates the process up to, not including, the specified simulation unit id. If None, simulates the entire process. Returns: The outlet stream diff --git a/src/libecalc/domain/process/process_solver/solvers/downstream_choke_solver.py b/src/libecalc/domain/process/process_solver/solvers/downstream_choke_solver.py index 01460f56f4..8c72bf5b35 100644 --- a/src/libecalc/domain/process/process_solver/solvers/downstream_choke_solver.py +++ b/src/libecalc/domain/process/process_solver/solvers/downstream_choke_solver.py @@ -2,14 +2,14 @@ from libecalc.domain.process.process_solver.configuration import ChokeConfiguration from libecalc.domain.process.process_solver.solver import Solution, Solver -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure class DownstreamChokeSolver(Solver[ChokeConfiguration]): def __init__(self, target_pressure: float): self._target_pressure = target_pressure - def solve(self, func: Callable[[ChokeConfiguration], FluidStream]) -> Solution[ChokeConfiguration]: + def solve(self, func: Callable[[ChokeConfiguration], StreamWithPressure]) -> Solution[ChokeConfiguration]: configuration = ChokeConfiguration(delta_pressure=0) outlet_stream = func(configuration) if outlet_stream.pressure_bara <= self._target_pressure: diff --git a/src/libecalc/domain/process/process_solver/solvers/recirculation_solver.py b/src/libecalc/domain/process/process_solver/solvers/recirculation_solver.py index b52d82161d..482aad1267 100644 --- a/src/libecalc/domain/process/process_solver/solvers/recirculation_solver.py +++ b/src/libecalc/domain/process/process_solver/solvers/recirculation_solver.py @@ -13,7 +13,7 @@ SolverFailureStatus, TargetNotAchievableEvent, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamT class RecirculationSolver(Solver): @@ -29,7 +29,7 @@ def __init__( self._search_strategy = search_strategy self._root_finding_strategy = root_finding_strategy - def solve(self, func: Callable[[RecirculationConfiguration], FluidStream]) -> Solution[RecirculationConfiguration]: + def solve(self, func: Callable[[RecirculationConfiguration], StreamT]) -> Solution[RecirculationConfiguration]: def bool_func(x: float, mode: Literal["minimize", "maximize"]) -> tuple[bool, bool]: """ Return a tuple where first bool is True for higher value, diff --git a/src/libecalc/domain/process/process_solver/solvers/speed_solver.py b/src/libecalc/domain/process/process_solver/solvers/speed_solver.py index 38b8005586..389cffb2a2 100644 --- a/src/libecalc/domain/process/process_solver/solvers/speed_solver.py +++ b/src/libecalc/domain/process/process_solver/solvers/speed_solver.py @@ -12,7 +12,7 @@ SolverFailureStatus, TargetNotAchievableEvent, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamT logger = logging.getLogger(__name__) @@ -30,8 +30,8 @@ def __init__( self._search_strategy = search_strategy self._root_finding_strategy = root_finding_strategy - def solve(self, func: Callable[[SpeedConfiguration], FluidStream]) -> Solution[SpeedConfiguration]: - def get_outlet_stream(speed: float) -> FluidStream: + def solve(self, func: Callable[[SpeedConfiguration], StreamT]) -> Solution[SpeedConfiguration]: + def get_outlet_stream(speed: float) -> StreamT: return func(SpeedConfiguration(speed=speed)) max_speed_configuration = SpeedConfiguration(speed=self._boundary.max) diff --git a/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py b/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py index 5f3c1ef56b..b7095146ce 100644 --- a/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py +++ b/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py @@ -9,7 +9,7 @@ SolverFailureStatus, TargetNotAchievableEvent, ) -from libecalc.domain.process.value_objects.fluid_stream import FluidStream +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure from .downstream_choke_solver import ChokeConfiguration @@ -25,7 +25,7 @@ def __init__( self._delta_pressure_boundary = delta_pressure_boundary self._root_finding_strategy = root_finding_strategy - def solve(self, func: Callable[[ChokeConfiguration], FluidStream]) -> Solution[ChokeConfiguration]: + def solve(self, func: Callable[[ChokeConfiguration], StreamWithPressure]) -> Solution[ChokeConfiguration]: def outlet_pressure(config: ChokeConfiguration) -> float: """Evaluate outlet pressure, treating RateTooHighError as infeasible (pressure = 0). diff --git a/src/libecalc/domain/process/process_solver/unit_protocol.py b/src/libecalc/domain/process/process_solver/unit_protocol.py new file mode 100644 index 0000000000..f6a3b3ef93 --- /dev/null +++ b/src/libecalc/domain/process/process_solver/unit_protocol.py @@ -0,0 +1,22 @@ +from typing import Any, Protocol + +from libecalc.domain.process.process_pipeline.process_unit import ProcessUnitId +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.value_objects.stream_protocol import StreamWithPressure + + +class RecirculatingUnit(Protocol): + """Protocol for process units that support recirculation-based capacity control. + + Satisfied by both Compressor (anti-surge) and Pump (minimum-flow bypass). + Used by IndividualASV strategies to stay decoupled from concrete unit types. + """ + + def get_id(self) -> ProcessUnitId: ... + + @property + def maximum_flow_rate(self) -> float: ... + + def get_recirculation_range(self, inlet_stream: Any) -> Boundary: ... + + def propagate_stream(self, inlet_stream: Any) -> StreamWithPressure: ... diff --git a/src/libecalc/domain/process/stream_distribution/common_stream_distribution.py b/src/libecalc/domain/process/stream_distribution/common_stream_distribution.py index 9fce051ce3..ef01ee5159 100644 --- a/src/libecalc/domain/process/stream_distribution/common_stream_distribution.py +++ b/src/libecalc/domain/process/stream_distribution/common_stream_distribution.py @@ -1,9 +1,7 @@ import abc from collections import defaultdict -from collections.abc import Hashable, Iterable -from collections.abc import Mapping +from collections.abc import Hashable, Iterable, Mapping from dataclasses import dataclass -from typing import Generic, TypeVar import networkx as nx @@ -17,16 +15,13 @@ class HasExcessRate(abc.ABC): def get_excess_rate(self, inlet_stream: FluidStream) -> float: ... -T = TypeVar("T", bound=Hashable) - - @dataclass -class Overflow(Generic[T]): +class Overflow[T: Hashable]: from_id: T to_id: T -class CommonStreamDistribution(StreamDistribution, Generic[T]): +class CommonStreamDistribution[T: Hashable](StreamDistribution): def __init__( self, inlet_stream: FluidStream, diff --git a/src/libecalc/domain/process/value_objects/fluid_stream/fluid_stream.py b/src/libecalc/domain/process/value_objects/fluid_stream/fluid_stream.py index bd81186066..94b6bdf6e3 100644 --- a/src/libecalc/domain/process/value_objects/fluid_stream/fluid_stream.py +++ b/src/libecalc/domain/process/value_objects/fluid_stream/fluid_stream.py @@ -100,6 +100,11 @@ def standard_density_gas_phase_after_flash(self) -> float: """Get gas phase density at standard conditions [kg/Sm3].""" return self.fluid.standard_density_gas_phase_after_flash + @property + def standard_density(self) -> float: + """Gas phase density at standard conditions [kg/Sm³].""" + return self.fluid.standard_density_gas_phase_after_flash + @property def enthalpy_joule_per_kg(self) -> float: """Get specific enthalpy [J/kg].""" diff --git a/src/libecalc/domain/process/value_objects/liquid_stream.py b/src/libecalc/domain/process/value_objects/liquid_stream.py new file mode 100644 index 0000000000..571ed0e647 --- /dev/null +++ b/src/libecalc/domain/process/value_objects/liquid_stream.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from libecalc.common.units import UnitConstants + + +@dataclass(frozen=True) +class LiquidStream: + """A fully-specified incompressible liquid stream. + + Density is constant (incompressible assumption) — no thermodynamic flash needed. + Used as the inlet/outlet type for LiquidProcessUnit (e.g. Pump). + """ + + pressure_bara: float + density_kg_per_m3: float + mass_rate_kg_per_h: float + + def __post_init__(self) -> None: + if self.density_kg_per_m3 <= 0: + raise ValueError(f"Density must be positive, got {self.density_kg_per_m3}") + if self.mass_rate_kg_per_h < 0: + raise ValueError(f"Mass rate cannot be negative, got {self.mass_rate_kg_per_h}") + + @property + def volumetric_rate_m3_per_hour(self) -> float: + return self.mass_rate_kg_per_h / self.density_kg_per_m3 + + @property + def standard_density(self) -> float: + """Liquid density [kg/m³]. + + For incompressible liquids, operating density ≈ standard density + (no significant pressure/temperature correction needed). + """ + return self.density_kg_per_m3 + + @property + def standard_rate_sm3_per_day(self) -> float: + """Volumetric flow rate [Sm³/day]. + + For incompressible liquids, standard ≈ actual volume, so this is + equivalent to actual m³/day. Matches the FluidStream interface so + both stream types work with the same RecirculationLoop logic. + """ + return self.mass_rate_kg_per_h * UnitConstants.HOURS_PER_DAY / self.density_kg_per_m3 + + def with_pressure(self, pressure_bara: float) -> LiquidStream: + return LiquidStream( + pressure_bara=pressure_bara, + density_kg_per_m3=self.density_kg_per_m3, + mass_rate_kg_per_h=self.mass_rate_kg_per_h, + ) + + def with_mass_rate(self, mass_rate_kg_per_h: float) -> LiquidStream: + return LiquidStream( + pressure_bara=self.pressure_bara, + density_kg_per_m3=self.density_kg_per_m3, + mass_rate_kg_per_h=mass_rate_kg_per_h, + ) diff --git a/src/libecalc/domain/process/value_objects/stream_protocol.py b/src/libecalc/domain/process/value_objects/stream_protocol.py new file mode 100644 index 0000000000..3ab19ae0e9 --- /dev/null +++ b/src/libecalc/domain/process/value_objects/stream_protocol.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import Protocol, Self, TypeVar + + +class StreamWithPressure(Protocol): + """Minimal stream interface required by pressure-based solvers. + + Satisfied by both FluidStream and LiquidStream. + """ + + @property + def pressure_bara(self) -> float: ... + + +StreamT = TypeVar("StreamT", bound=StreamWithPressure) + + +class MixableStream(StreamWithPressure, Protocol): + """Stream interface required by DirectMixer and DirectSplitter. + + Extends StreamWithPressure with the mass-rate and density attributes needed + to add/remove a recirculation flow in Sm³/day. + Satisfied by both FluidStream and LiquidStream. + """ + + @property + def standard_density(self) -> float: ... + + @property + def mass_rate_kg_per_h(self) -> float: ... + + def with_mass_rate(self, mass_rate_kg_per_h: float) -> Self: ... diff --git a/src/libecalc/presentation/yaml/mappers/process_simulation_mapper.py b/src/libecalc/presentation/yaml/mappers/process_simulation_mapper.py index 86ecf1690c..48140c9028 100644 --- a/src/libecalc/presentation/yaml/mappers/process_simulation_mapper.py +++ b/src/libecalc/presentation/yaml/mappers/process_simulation_mapper.py @@ -265,14 +265,14 @@ def map_anti_surge_strategy( return CommonASVAntiSurgeStrategy( simulator=simulator, root_finding_strategy=ScipyRootFindingStrategy(), - first_compressor=compressors[0], + first_unit=compressors[0], # pyright: ignore[reportArgumentType] - Compressor satisfies RecirculatingUnit at runtime recirculation_loop_id=recirculation_loop_ids[0], ) case "INDIVIDUAL_ASV": return IndividualASVAntiSurgeStrategy( simulator=simulator, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, # pyright: ignore[reportArgumentType] ) case _: assert_never(recirculation_type) diff --git a/src/libecalc/testing/yaml_builder.py b/src/libecalc/testing/yaml_builder.py index a486618764..827dbd5381 100644 --- a/src/libecalc/testing/yaml_builder.py +++ b/src/libecalc/testing/yaml_builder.py @@ -1,12 +1,10 @@ import abc from enum import Enum -from typing import Generic, Literal, Self, TypeVar, get_args - -from typing_extensions import get_original_bases +from types import get_original_bases +from typing import Literal, Self, get_args from libecalc.common.utils.rates import RateType from libecalc.presentation.yaml.consumer_category import ConsumerUserDefinedCategoryType -from libecalc.presentation.yaml.yaml_types import YamlBase from libecalc.presentation.yaml.yaml_types.components.legacy.energy_usage_model import ( YamlElectricityEnergyUsageModel, YamlEnergyUsageModelCompressor, @@ -23,8 +21,8 @@ from libecalc.presentation.yaml.yaml_types.components.yaml_expression_type import YamlExpressionType from libecalc.presentation.yaml.yaml_types.components.yaml_generator_set import YamlGeneratorSet from libecalc.presentation.yaml.yaml_types.components.yaml_installation import ( - YamlInstallation, InstallationUserDefinedCategoryType, + YamlInstallation, ) from libecalc.presentation.yaml.yaml_types.emitters.yaml_venting_emitter import ( YamlDirectTypeEmitter, @@ -42,7 +40,7 @@ ) from libecalc.presentation.yaml.yaml_types.facility_model.yaml_facility_model_type import YamlFacilityModelType from libecalc.presentation.yaml.yaml_types.fuel_type.yaml_emission import YamlEmission -from libecalc.presentation.yaml.yaml_types.fuel_type.yaml_fuel_type import YamlFuelType, FuelTypeUserDefinedCategoryType +from libecalc.presentation.yaml.yaml_types.fuel_type.yaml_fuel_type import FuelTypeUserDefinedCategoryType, YamlFuelType from libecalc.presentation.yaml.yaml_types.models import YamlConsumerModel, YamlTurbine from libecalc.presentation.yaml.yaml_types.models.model_reference_validation import ( CompressorEnergyUsageModelModelReference, @@ -62,10 +60,8 @@ from libecalc.presentation.yaml.yaml_types.yaml_temporal_model import YamlTemporalModel from libecalc.presentation.yaml.yaml_types.yaml_variable import YamlVariables -T = TypeVar("T") - -class Builder(abc.ABC, Generic[T]): +class Builder[T](abc.ABC): __model__: type[T] def __init_subclass__(cls, **kwargs): @@ -270,9 +266,6 @@ def with_test_data(self): self.discharge_pressure = 80 -TYamlClass = TypeVar("TYamlClass", bound=YamlBase) - - class YamlFuelConsumerBuilder(Builder[YamlFuelConsumer]): def __init__(self): self.name = None diff --git a/tests/libecalc/application/test_stream_distribution.py b/tests/libecalc/application/test_stream_distribution.py index 949f4ba246..8e7d1d5085 100644 --- a/tests/libecalc/application/test_stream_distribution.py +++ b/tests/libecalc/application/test_stream_distribution.py @@ -2,12 +2,13 @@ import pytest from inline_snapshot import snapshot + +from libecalc.domain.component_validation_error import DomainValidationException from libecalc.domain.process.stream_distribution.common_stream_distribution import ( CommonStreamDistribution, HasExcessRate, Overflow, ) -from libecalc.domain.component_validation_error import DomainValidationException from libecalc.domain.process.value_objects.fluid_stream import FluidStream diff --git a/tests/libecalc/domain/process/conftest.py b/tests/libecalc/domain/process/conftest.py index 458dceae23..e7193d3c71 100644 --- a/tests/libecalc/domain/process/conftest.py +++ b/tests/libecalc/domain/process/conftest.py @@ -5,7 +5,7 @@ from libecalc.domain.process.entities.process_units.compressor import Compressor from libecalc.domain.process.entities.process_units.temperature_setter import TemperatureSetter from libecalc.domain.process.entities.shaft import Shaft -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnit, ProcessUnitId from libecalc.domain.process.process_solver.search_strategies import ( CONVERGENCE_TOLERANCE, BinarySearchStrategy, @@ -138,14 +138,14 @@ def fluid_stream_mock(mock_fluid) -> FluidStream: return FluidStream(fluid=mock_fluid, mass_rate_kg_per_h=100.0) -class SimpleProcessUnit(ProcessUnit): +class SimpleProcessUnit(GasProcessUnit): def __init__( self, pressure_multiplier: float, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._pressure_multiplier = pressure_multiplier self._fluid_service = fluid_service diff --git a/tests/libecalc/domain/process/entities/process_units/test_liquid_recirculation_loop.py b/tests/libecalc/domain/process/entities/process_units/test_liquid_recirculation_loop.py new file mode 100644 index 0000000000..24bfaf2aa2 --- /dev/null +++ b/tests/libecalc/domain/process/entities/process_units/test_liquid_recirculation_loop.py @@ -0,0 +1,148 @@ +import pytest + +from libecalc.domain.process.entities.process_units.direct_mixer import DirectMixer +from libecalc.domain.process.entities.process_units.direct_splitter import DirectSplitter +from libecalc.domain.process.entities.process_units.pump import Pump +from libecalc.domain.process.process_pipeline.process_error import RateTooHighError, RateTooLowError +from libecalc.domain.process.process_pipeline.process_unit import create_process_unit_id +from libecalc.domain.process.value_objects.chart import ChartCurve +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream +from libecalc.presentation.yaml.mappers.charts.user_defined_chart_data import UserDefinedChartData + +# Chart: single-speed at 3000 rpm, rates 10–100 m³/h, head 50_000–20_000 J/kg, η=0.7 +_CHART = UserDefinedChartData( + curves=[ + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 50.0, 100.0], + polytropic_head_joule_per_kg=[50_000.0, 40_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7, 0.7], + ) + ], + control_margin=0.0, +) + + +def _make_pump() -> Pump: + return Pump(process_unit_id=create_process_unit_id(), pump_chart=_CHART) + + +def _stream(rate_m3h: float, pressure: float = 10.0, density: float = 800.0) -> LiquidStream: + return LiquidStream( + pressure_bara=pressure, + density_kg_per_m3=density, + mass_rate_kg_per_h=rate_m3h * density, + ) + + +def _m3h_to_sm3_day(rate_m3h: float) -> float: + return rate_m3h * 24.0 + + +class _LiquidLoop: + """Test helper: mixer → pump → splitter loop. + + On main, RecirculationLoop is a ConfigurationHandler that delegates + set/get_recirculation_rate to mixer/splitter. Stream propagation is + done by ProcessPipelineRunner. This helper wires the three units + together for direct testing without the runner. + """ + + def __init__(self, recirculation_rate_sm3_day: float = 0.0): + self.mixer = DirectMixer(process_unit_id=create_process_unit_id(), mix_rate=recirculation_rate_sm3_day) + self.pump = _make_pump() + self.splitter = DirectSplitter(process_unit_id=create_process_unit_id(), split_rate=recirculation_rate_sm3_day) + + def set_recirculation_rate(self, rate: float): + self.mixer.set_mix_rate(rate) + self.splitter.set_split_rate(rate) + + def get_recirculation_rate(self) -> float: + return self.mixer.get_mix_rate() + + def propagate_stream(self, inlet: LiquidStream) -> LiquidStream: + mixed = self.mixer.propagate_stream(inlet) + pumped = self.pump.propagate_stream(mixed) + return self.splitter.propagate_stream(pumped) + + +def _make_loop(recirculation_rate_m3_per_h: float = 0.0) -> _LiquidLoop: + return _LiquidLoop(recirculation_rate_sm3_day=_m3h_to_sm3_day(recirculation_rate_m3_per_h)) + + +class TestDirectMixerWithLiquidStream: + def test_adds_flow_to_mass_rate(self): + mixer = DirectMixer(process_unit_id=create_process_unit_id(), mix_rate=_m3h_to_sm3_day(20.0)) + inlet = _stream(rate_m3h=30.0, density=800.0) + outlet = mixer.propagate_stream(inlet) + assert outlet.volumetric_rate_m3_per_hour == pytest.approx(50.0) + assert outlet.density_kg_per_m3 == inlet.density_kg_per_m3 + assert outlet.pressure_bara == inlet.pressure_bara + + def test_zero_recirc_is_passthrough(self): + mixer = DirectMixer(process_unit_id=create_process_unit_id(), mix_rate=0.0) + inlet = _stream(rate_m3h=30.0) + outlet = mixer.propagate_stream(inlet) + assert outlet.mass_rate_kg_per_h == inlet.mass_rate_kg_per_h + + +class TestDirectSplitterWithLiquidStream: + def test_removes_flow_from_mass_rate(self): + splitter = DirectSplitter(process_unit_id=create_process_unit_id(), split_rate=_m3h_to_sm3_day(20.0)) + inlet = _stream(rate_m3h=50.0, density=800.0) + outlet = splitter.propagate_stream(inlet) + assert outlet.volumetric_rate_m3_per_hour == pytest.approx(30.0) + assert outlet.density_kg_per_m3 == inlet.density_kg_per_m3 + assert outlet.pressure_bara == inlet.pressure_bara + + def test_mixer_splitter_are_inverse(self): + rate = _m3h_to_sm3_day(20.0) + mixer = DirectMixer(process_unit_id=create_process_unit_id(), mix_rate=rate) + splitter = DirectSplitter(process_unit_id=create_process_unit_id(), split_rate=rate) + inlet = _stream(rate_m3h=40.0) + mixed = mixer.propagate_stream(inlet) + split = splitter.propagate_stream(mixed) + assert split.mass_rate_kg_per_h == pytest.approx(inlet.mass_rate_kg_per_h) + + +class TestRecirculationLoopPumpBehaviour: + def test_zero_recirc_is_passthrough(self): + """At zero recirculation, loop behaves identically to bare Pump.""" + loop = _make_loop(recirculation_rate_m3_per_h=0.0) + inlet = _stream(rate_m3h=50.0) + outlet = loop.propagate_stream(inlet) + assert outlet.mass_rate_kg_per_h == pytest.approx(inlet.mass_rate_kg_per_h) + assert outlet.pressure_bara > inlet.pressure_bara + + def test_recirc_raises_effective_pump_flow_above_minimum(self): + """Recirculation flow lifts the pump's effective rate above the chart minimum.""" + loop = _make_loop(recirculation_rate_m3_per_h=8.0) + inlet = _stream(rate_m3h=4.0) + outlet = loop.propagate_stream(inlet) + assert outlet.pressure_bara > inlet.pressure_bara + + def test_insufficient_recirc_still_raises(self): + """When recirculation is not enough to reach minimum flow, RateTooLowError propagates.""" + loop = _make_loop(recirculation_rate_m3_per_h=0.0) + inlet = _stream(rate_m3h=1.0) + with pytest.raises(RateTooLowError): + loop.propagate_stream(inlet) + + def test_stonewall_still_raises(self): + """Even with recirc, stonewall (rate > max) raises RateTooHighError.""" + loop = _make_loop(recirculation_rate_m3_per_h=200.0) + inlet = _stream(rate_m3h=200.0) + with pytest.raises(RateTooHighError): + loop.propagate_stream(inlet) + + def test_set_get_recirculation_rate(self): + loop = _make_loop(recirculation_rate_m3_per_h=0.0) + loop.set_recirculation_rate(_m3h_to_sm3_day(15.0)) + assert loop.get_recirculation_rate() == pytest.approx(_m3h_to_sm3_day(15.0)) + + def test_net_throughput_unchanged(self): + """Outlet mass rate equals inlet mass rate regardless of recirculation.""" + loop = _make_loop(recirculation_rate_m3_per_h=30.0) + inlet = _stream(rate_m3h=50.0) + outlet = loop.propagate_stream(inlet) + assert outlet.mass_rate_kg_per_h == pytest.approx(inlet.mass_rate_kg_per_h) diff --git a/tests/libecalc/domain/process/entities/process_units/test_pump.py b/tests/libecalc/domain/process/entities/process_units/test_pump.py new file mode 100644 index 0000000000..436660b357 --- /dev/null +++ b/tests/libecalc/domain/process/entities/process_units/test_pump.py @@ -0,0 +1,146 @@ +import pytest + +from libecalc.domain.process.entities.process_units.pump import Pump +from libecalc.domain.process.process_pipeline.process_error import RateTooHighError, RateTooLowError +from libecalc.domain.process.process_pipeline.process_unit import create_process_unit_id +from libecalc.domain.process.value_objects.chart import ChartCurve +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream +from libecalc.presentation.yaml.mappers.charts.user_defined_chart_data import UserDefinedChartData + +# Pump chart: one single-speed curve at 3000 rpm +# rate [m³/h]: 10 → 100, head [J/kg]: 50_000 → 20_000, efficiency: 0.7 constant +_SINGLE_SPEED_CURVE = ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 50.0, 100.0], + polytropic_head_joule_per_kg=[50_000.0, 40_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7, 0.7], +) + +_SINGLE_SPEED_CHART = UserDefinedChartData(curves=[_SINGLE_SPEED_CURVE], control_margin=0.0) + +# Variable-speed chart: two curves at 2500 and 3000 rpm +_VAR_SPEED_CHART = UserDefinedChartData( + curves=[ + ChartCurve( + speed_rpm=2500.0, + rate_actual_m3_hour=[10.0, 80.0], + polytropic_head_joule_per_kg=[35_000.0, 14_000.0], + efficiency_fraction=[0.7, 0.7], + ), + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 80.0], + polytropic_head_joule_per_kg=[50_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7], + ), + ], + control_margin=0.0, +) + + +def _make_pump(chart=_SINGLE_SPEED_CHART) -> Pump: + return Pump(process_unit_id=create_process_unit_id(), pump_chart=chart) + + +def _stream(rate_m3h: float = 50.0, pressure: float = 10.0, density: float = 800.0) -> LiquidStream: + """Liquid stream: mass_rate from volumetric rate × density.""" + return LiquidStream( + pressure_bara=pressure, + density_kg_per_m3=density, + mass_rate_kg_per_h=rate_m3h * density, + ) + + +class TestLiquidStream: + def test_volumetric_rate(self): + stream = _stream(rate_m3h=50.0, density=800.0) + assert stream.volumetric_rate_m3_per_hour == pytest.approx(50.0) + + def test_with_pressure(self): + stream = _stream(pressure=10.0) + updated = stream.with_pressure(50.0) + assert updated.pressure_bara == 50.0 + assert updated.density_kg_per_m3 == stream.density_kg_per_m3 + assert updated.mass_rate_kg_per_h == stream.mass_rate_kg_per_h + + def test_negative_mass_rate_raises(self): + with pytest.raises(ValueError): + LiquidStream(pressure_bara=10.0, density_kg_per_m3=800.0, mass_rate_kg_per_h=-1.0) + + def test_zero_density_raises(self): + with pytest.raises(ValueError): + LiquidStream(pressure_bara=10.0, density_kg_per_m3=0.0, mass_rate_kg_per_h=1000.0) + + +class TestPumpSingleSpeed: + def test_outlet_pressure_increases(self): + """Pump adds pressure — outlet must be above inlet.""" + pump = _make_pump() + inlet = _stream(rate_m3h=50.0, pressure=10.0, density=800.0) + outlet = pump.propagate_stream(inlet) + assert outlet.pressure_bara > inlet.pressure_bara + + def test_outlet_pressure_formula(self): + """P_out = P_in + head × density / 1e5.""" + pump = _make_pump() + inlet = _stream(rate_m3h=50.0, pressure=10.0, density=800.0) + # head at 50 m³/h = 40_000 J/kg (from chart) + expected_pressure_rise = 40_000.0 * 800.0 / 1e5 # = 320 bara + outlet = pump.propagate_stream(inlet) + assert outlet.pressure_bara == pytest.approx(10.0 + expected_pressure_rise, rel=1e-3) + + def test_density_and_mass_rate_unchanged(self): + """Pump does not change fluid density or mass rate.""" + pump = _make_pump() + inlet = _stream(rate_m3h=50.0, density=800.0) + outlet = pump.propagate_stream(inlet) + assert outlet.density_kg_per_m3 == inlet.density_kg_per_m3 + assert outlet.mass_rate_kg_per_h == inlet.mass_rate_kg_per_h + + def test_below_minimum_rate_raises(self): + """Below minimum flow: pump raises RateTooLowError (recirc is handled externally).""" + pump = _make_pump() + inlet = _stream(rate_m3h=1.0) # well below chart minimum of 10 m³/h + with pytest.raises(RateTooLowError): + pump.propagate_stream(inlet) + + def test_above_maximum_rate_raises(self): + pump = _make_pump() + inlet = _stream(rate_m3h=200.0) # above max of 100 m³/h + with pytest.raises(RateTooHighError): + pump.propagate_stream(inlet) + + def test_no_speed_needed_for_single_speed(self): + """Single-speed pump: set_speed() not required.""" + pump = _make_pump() + inlet = _stream(rate_m3h=50.0) + outlet = pump.propagate_stream(inlet) # should not raise + assert outlet.pressure_bara > inlet.pressure_bara + + +class TestPumpVariableSpeed: + def test_exact_speed_match(self): + """At exact chart speed, head is taken directly from that curve.""" + pump = _make_pump(_VAR_SPEED_CHART) + pump.set_speed(3000.0) + inlet = _stream(rate_m3h=10.0, pressure=5.0, density=800.0) + outlet = pump.propagate_stream(inlet) + # head at rate=10, speed=3000 = 50_000 J/kg + expected_rise = 50_000.0 * 800.0 / 1e5 + assert outlet.pressure_bara == pytest.approx(5.0 + expected_rise, rel=1e-3) + + def test_interpolated_speed(self): + """At speed halfway between curves, head is linearly interpolated.""" + pump = _make_pump(_VAR_SPEED_CHART) + pump.set_speed(2750.0) # halfway between 2500 and 3000 + inlet = _stream(rate_m3h=10.0, pressure=5.0, density=800.0) + outlet = pump.propagate_stream(inlet) + # head at rate=10: 35_000 (2500rpm) and 50_000 (3000rpm), alpha=0.5 → 42_500 + expected_rise = 42_500.0 * 800.0 / 1e5 + assert outlet.pressure_bara == pytest.approx(5.0 + expected_rise, rel=1e-3) + + def test_missing_speed_raises(self): + """Variable-speed pump without set_speed() raises on propagate_stream().""" + pump = _make_pump(_VAR_SPEED_CHART) + with pytest.raises(ValueError, match="Speed not set"): + pump.propagate_stream(_stream()) diff --git a/tests/libecalc/domain/process/entities/process_units/test_pump_legacy_vs_new.py b/tests/libecalc/domain/process/entities/process_units/test_pump_legacy_vs_new.py new file mode 100644 index 0000000000..c31b889483 --- /dev/null +++ b/tests/libecalc/domain/process/entities/process_units/test_pump_legacy_vs_new.py @@ -0,0 +1,194 @@ +"""Legacy vs new pump comparison tests. + +These tests verify that the new Pump domain entity produces the same outlet pressure +as the legacy PumpModel for equivalent operating conditions. + +API difference: +- Legacy PumpModel.simulate(rate_m3_per_day, Ps, Pd, density) — Pd is an INPUT; + head is derived from the pressure difference. +- New Pump.propagate_stream(LiquidStream(Ps, density, mass_rate)) — head comes from + the pump chart; Pd is an OUTPUT. + +For the comparison to be valid we pick operating points that lie exactly on the +chart curve, then derive the expected Pd from the chart head so both models see +the same hydraulic operating point. +""" + +import pytest + +from libecalc.domain.process.entities.process_units.pump import Pump +from libecalc.domain.process.process_pipeline.process_unit import create_process_unit_id +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.process_solver.configuration import SpeedConfiguration +from libecalc.domain.process.process_solver.solvers.speed_solver import SpeedSolver +from libecalc.domain.process.pump.pump import PumpModel +from libecalc.domain.process.value_objects.chart import ChartCurve +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream +from libecalc.presentation.yaml.mappers.charts.user_defined_chart_data import UserDefinedChartData + +# --------------------------------------------------------------------------- +# Shared chart data (identical for legacy and new). +# Single speed: 3000 rpm, rates 10–100 m³/h, heads 50_000–20_000 J/kg, η=0.7 +# --------------------------------------------------------------------------- +_SINGLE_SPEED_CHART = UserDefinedChartData( + curves=[ + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 50.0, 100.0], + polytropic_head_joule_per_kg=[50_000.0, 40_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7, 0.7], + ) + ], + control_margin=0.0, +) + +# Variable speed: two curves at 2500 and 3000 rpm. +# Both use constant efficiency 0.7 so legacy 2D lookup and new curve interpolation +# return the same efficiency at any operating point. +_VAR_SPEED_CHART = UserDefinedChartData( + curves=[ + ChartCurve( + speed_rpm=2500.0, + rate_actual_m3_hour=[10.0, 80.0], + polytropic_head_joule_per_kg=[35_000.0, 14_000.0], + efficiency_fraction=[0.7, 0.7], + ), + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 80.0], + polytropic_head_joule_per_kg=[50_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7], + ), + ], + control_margin=0.0, +) + +_DENSITY = 800.0 # kg/m³ +_PS = 10.0 # bara + +_BARA_TO_PASCAL = 1e5 + + +def _stream(rate_m3h: float, pressure: float = _PS, density: float = _DENSITY) -> LiquidStream: + return LiquidStream(pressure_bara=pressure, density_kg_per_m3=density, mass_rate_kg_per_h=rate_m3h * density) + + +def _pd_from_head(head_j_per_kg: float, density: float = _DENSITY, ps: float = _PS) -> float: + """Compute discharge pressure from head, density, and suction pressure.""" + return ps + head_j_per_kg * density / _BARA_TO_PASCAL + + +# --------------------------------------------------------------------------- +# Single-speed: normal operating range +# --------------------------------------------------------------------------- +class TestSingleSpeedLegacyVsNew: + def test_outlet_pressure_matches_expected_head(self): + """New Pump outlet pressure corresponds to the head from the chart.""" + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=_SINGLE_SPEED_CHART) + outlet = pump.propagate_stream(_stream(50.0)) + expected_pd = _pd_from_head(40_000.0) + assert outlet.pressure_bara == pytest.approx(expected_pd, rel=1e-4) + + def test_legacy_zero_rate_is_no_op(self): + """Legacy pump returns zero power at zero rate; new pump is not invoked (rate=0 is an upstream concern).""" + legacy = PumpModel(pump_chart=_SINGLE_SPEED_CHART) + power, head, status = legacy.simulate( + rate=0.0, suction_pressure=_PS, discharge_pressure=50.0, fluid_density=_DENSITY + ) + assert power == 0.0 + assert head == 0.0 + + +# --------------------------------------------------------------------------- +# Variable-speed: exact speed match (lies on a known chart curve) +# --------------------------------------------------------------------------- +class TestVariableSpeedLegacyVsNew: + def test_speed_solver_finds_correct_speed(self, search_strategy_factory, root_finding_strategy): + """SpeedSolver finds the speed that delivers a target outlet pressure. + At rate=10 m³/h with target Pd derived from speed=3000 head, solver returns ~3000 rpm.""" + target_head = 50_000.0 # J/kg — corresponds to 3000 rpm at 10 m³/h + target_pd = _pd_from_head(target_head) + inlet = _stream(10.0) + + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=_VAR_SPEED_CHART) + solver = SpeedSolver( + search_strategy=search_strategy_factory(tolerance=1.0), + root_finding_strategy=root_finding_strategy, + boundary=Boundary(min=2500.0, max=3000.0), + target_pressure=target_pd, + ) + + def func(cfg: SpeedConfiguration) -> LiquidStream: + pump.set_speed(cfg.speed) + return pump.propagate_stream(inlet) + + solution = solver.solve(func) + + assert solution.success + assert solution.configuration.speed == pytest.approx(3000.0, rel=1e-3) + + def test_speed_solver_interpolated_speed(self, search_strategy_factory, root_finding_strategy): + """Solver finds an intermediate speed when target pressure is between min and max curve pressures.""" + # At rate=10 m³/h, speed=2750 (halfway): head = (35000+50000)/2 = 42500 J/kg + # Pd = 10 + 42500 * 800 / 1e5 = 350 bara + target_pd = _pd_from_head(42_500.0) + inlet = _stream(10.0) + + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=_VAR_SPEED_CHART) + solver = SpeedSolver( + search_strategy=search_strategy_factory(tolerance=1.0), + root_finding_strategy=root_finding_strategy, + boundary=Boundary(min=2500.0, max=3000.0), + target_pressure=target_pd, + ) + + def func(cfg: SpeedConfiguration) -> LiquidStream: + pump.set_speed(cfg.speed) + return pump.propagate_stream(inlet) + + solution = solver.solve(func) + + assert solution.success + assert solution.configuration.speed == pytest.approx(2750.0, rel=1e-2) + + def test_speed_solver_target_too_high_returns_failure(self, search_strategy_factory, root_finding_strategy): + """When target pressure exceeds what max speed can deliver, solver fails.""" + target_pd = _pd_from_head(50_000.0) + 100.0 # 100 bara above max achievable + inlet = _stream(10.0) + + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=_VAR_SPEED_CHART) + solver = SpeedSolver( + search_strategy=search_strategy_factory(tolerance=1.0), + root_finding_strategy=root_finding_strategy, + boundary=Boundary(min=2500.0, max=3000.0), + target_pressure=target_pd, + ) + + def func(cfg: SpeedConfiguration) -> LiquidStream: + pump.set_speed(cfg.speed) + return pump.propagate_stream(inlet) + + solution = solver.solve(func) + + assert not solution.success + + def test_speed_solver_target_too_low_returns_failure(self, search_strategy_factory, root_finding_strategy): + """When target pressure is below what min speed delivers, solver fails.""" + target_pd = _pd_from_head(35_000.0) - 100.0 # below min-speed delivery + inlet = _stream(10.0) + + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=_VAR_SPEED_CHART) + solver = SpeedSolver( + search_strategy=search_strategy_factory(tolerance=1.0), + root_finding_strategy=root_finding_strategy, + boundary=Boundary(min=2500.0, max=3000.0), + target_pressure=target_pd, + ) + + def func(cfg: SpeedConfiguration) -> LiquidStream: + pump.set_speed(cfg.speed) + return pump.propagate_stream(inlet) + + solution = solver.solve(func) + + assert not solution.success diff --git a/tests/libecalc/domain/process/process_solver/anti_surge/test_common_asv_anti_surge_strategy.py b/tests/libecalc/domain/process/process_solver/anti_surge/test_common_asv_anti_surge_strategy.py index 49c4a8a86e..4cb67d3e35 100644 --- a/tests/libecalc/domain/process/process_solver/anti_surge/test_common_asv_anti_surge_strategy.py +++ b/tests/libecalc/domain/process/process_solver/anti_surge/test_common_asv_anti_surge_strategy.py @@ -63,7 +63,7 @@ def test_common_asv_anti_surge_uses_compressor_inlet_for_boundary( strategy = common_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_id=recirculation_loop.get_id(), - first_compressor=compressor, + first_unit=compressor, ) # Compute both candidate boundaries independently, to verify difference diff --git a/tests/libecalc/domain/process/process_solver/conftest.py b/tests/libecalc/domain/process/process_solver/conftest.py index 65fe9465e3..4113ffce04 100644 --- a/tests/libecalc/domain/process/process_solver/conftest.py +++ b/tests/libecalc/domain/process/process_solver/conftest.py @@ -31,7 +31,7 @@ def with_common_asv(recirculation_loop_factory, direct_mixer_factory, direct_splitter_factory): """Factory fixture: wrap all units in a single RecirculationLoop. - Returns the loop, its ID, and the first compressor found in units. + Returns the loop and the pipeline units [mixer, *units, splitter]. """ def create(units: list[ProcessUnit]) -> tuple[RecirculationLoop, Sequence[ProcessUnit]]: @@ -48,7 +48,7 @@ def with_individual_asv(with_common_asv): """Factory fixture: wrap each Compressor in its own RecirculationLoop. Non-compressor units (e.g. TemperatureSetter) are kept in-place outside the loop. - Returns the transformed unit list, the loop (configuration handler) list, and the compressor references. + Returns the transformed unit list and the loop (configuration handler) list. """ def create( @@ -94,12 +94,12 @@ def common_asv_anti_surge_strategy_factory(root_finding_strategy): def create( runner: ProcessRunner, recirculation_loop_id: ConfigurationHandlerId, - first_compressor: Compressor, + first_unit: Compressor, ) -> CommonASVAntiSurgeStrategy: return CommonASVAntiSurgeStrategy( simulator=runner, recirculation_loop_id=recirculation_loop_id, - first_compressor=first_compressor, + first_unit=first_unit, root_finding_strategy=root_finding_strategy, ) @@ -111,11 +111,11 @@ def individual_asv_anti_surge_strategy_factory(): def create( runner: ProcessRunner, recirculation_loop_ids: list[ConfigurationHandlerId], - compressors: list[Compressor], + units: list[Compressor], ) -> IndividualASVAntiSurgeStrategy: return IndividualASVAntiSurgeStrategy( recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=units, simulator=runner, ) @@ -127,12 +127,12 @@ def common_asv_pressure_control_strategy_factory(root_finding_strategy): def create( runner: ProcessRunner, recirculation_loop_id: ConfigurationHandlerId, - first_compressor: Compressor, + first_unit: Compressor, ) -> CommonASVPressureControlStrategy: return CommonASVPressureControlStrategy( simulator=runner, recirculation_loop_id=recirculation_loop_id, - first_compressor=first_compressor, + first_unit=first_unit, root_finding_strategy=root_finding_strategy, ) @@ -144,12 +144,12 @@ def individual_asv_rate_control_strategy_factory(): def create( runner: ProcessRunner, recirculation_loop_ids: list[ConfigurationHandlerId], - compressors: list[Compressor], + units: list[Compressor], ) -> IndividualASVRateControlStrategy: return IndividualASVRateControlStrategy( simulator=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=units, ) return create @@ -160,12 +160,12 @@ def individual_asv_pressure_control_strategy_factory(root_finding_strategy): def create( runner: ProcessRunner, recirculation_loop_ids: list[ConfigurationHandlerId], - compressors: list[Compressor], + units: list[Compressor], ) -> IndividualASVPressureControlStrategy: return IndividualASVPressureControlStrategy( simulator=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=units, root_finding_strategy=root_finding_strategy, ) diff --git a/tests/libecalc/domain/process/process_solver/pressure_control/test_common_asv_pressure_control_strategy.py b/tests/libecalc/domain/process/process_solver/pressure_control/test_common_asv_pressure_control_strategy.py index 6d11216220..69cf4a81d2 100644 --- a/tests/libecalc/domain/process/process_solver/pressure_control/test_common_asv_pressure_control_strategy.py +++ b/tests/libecalc/domain/process/process_solver/pressure_control/test_common_asv_pressure_control_strategy.py @@ -64,7 +64,7 @@ def test_common_asv_pressure_control_uses_compressor_inlet_for_boundary( strategy = common_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_id=recirculation_loop.get_id(), - first_compressor=compressor, + first_unit=compressor, ) # Compute both candidate boundaries independently, to verify difference diff --git a/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_pressure_pressure_control_strategy.py b/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_pressure_pressure_control_strategy.py index 02f029e897..2630c09d39 100644 --- a/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_pressure_pressure_control_strategy.py +++ b/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_pressure_pressure_control_strategy.py @@ -65,7 +65,7 @@ def test_individual_asv_pressure_control_reaches_target_pressure( strategy = individual_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_ids=loop_ids, - compressors=compressors, + units=compressors, ) target = FloatConstraint(target_pressure_bara) @@ -137,7 +137,7 @@ def test_individual_asv_pressure_each_stage_meets_geometric_target( strategy = individual_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_ids=[loop.get_id() for loop in loops], - compressors=compressors, + units=compressors, ) # Solve and apply the resulting per-loop recirculation rates. diff --git a/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_rate_control_pressure_strategy.py b/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_rate_control_pressure_strategy.py index b88bbf3aed..75db97ab09 100644 --- a/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_rate_control_pressure_strategy.py +++ b/tests/libecalc/domain/process/process_solver/pressure_control/test_individual_asv_rate_control_pressure_strategy.py @@ -64,7 +64,7 @@ def test_individual_asv_rate_control_reaches_target_pressure( strategy = individual_asv_rate_control_strategy_factory( runner=runner, recirculation_loop_ids=loop_ids, - compressors=compressors, + units=compressors, ) target = FloatConstraint(target_pressure_bara) diff --git a/tests/libecalc/domain/process/process_solver/solvers/test_liquid_recirculation_solver.py b/tests/libecalc/domain/process/process_solver/solvers/test_liquid_recirculation_solver.py new file mode 100644 index 0000000000..8686a3fbf4 --- /dev/null +++ b/tests/libecalc/domain/process/process_solver/solvers/test_liquid_recirculation_solver.py @@ -0,0 +1,143 @@ +import pytest + +from libecalc.domain.process.entities.process_units.direct_mixer import DirectMixer +from libecalc.domain.process.entities.process_units.direct_splitter import DirectSplitter +from libecalc.domain.process.entities.process_units.pump import Pump +from libecalc.domain.process.process_pipeline.process_unit import create_process_unit_id +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.process_solver.configuration import RecirculationConfiguration +from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationSolver +from libecalc.domain.process.value_objects.chart import ChartCurve +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream +from libecalc.presentation.yaml.mappers.charts.user_defined_chart_data import UserDefinedChartData + +# Chart: single-speed at 3000 rpm, rates 10–100 m³/h, head 50_000–20_000 J/kg, η=0.7 +_CHART = UserDefinedChartData( + curves=[ + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[10.0, 50.0, 100.0], + polytropic_head_joule_per_kg=[50_000.0, 40_000.0, 20_000.0], + efficiency_fraction=[0.7, 0.7, 0.7], + ) + ], + control_margin=0.0, +) + + +def _make_pump() -> Pump: + return Pump(process_unit_id=create_process_unit_id(), pump_chart=_CHART) + + +class _LiquidLoop: + """Test helper: mixer → pump → splitter loop for liquid recirculation tests.""" + + def __init__(self): + self.mixer = DirectMixer(process_unit_id=create_process_unit_id(), mix_rate=0.0) + self.pump = _make_pump() + self.splitter = DirectSplitter(process_unit_id=create_process_unit_id(), split_rate=0.0) + + def set_recirculation_rate(self, rate: float): + self.mixer.set_mix_rate(rate) + self.splitter.set_split_rate(rate) + + def propagate_stream(self, inlet: LiquidStream) -> LiquidStream: + mixed = self.mixer.propagate_stream(inlet) + pumped = self.pump.propagate_stream(mixed) + return self.splitter.propagate_stream(pumped) + + +def _stream(rate_m3h: float, pressure: float = 10.0, density: float = 800.0) -> LiquidStream: + return LiquidStream( + pressure_bara=pressure, + density_kg_per_m3=density, + mass_rate_kg_per_h=rate_m3h * density, + ) + + +def _make_solver(**kwargs) -> RecirculationSolver: + return RecirculationSolver( + search_strategy=kwargs.pop("search_strategy"), + root_finding_strategy=kwargs.pop("root_finding_strategy"), + recirculation_rate_boundary=kwargs.pop("recirculation_rate_boundary", Boundary(min=0.0, max=200.0 * 24)), + **kwargs, + ) + + +class TestRecirculationSolverMinimumFlow: + def test_no_recirc_needed_when_rate_above_minimum(self, search_strategy_factory, root_finding_strategy): + """When inlet rate is already above minimum, solver returns zero recirculation.""" + loop = _LiquidLoop() + inlet = _stream(rate_m3h=50.0) + + solver = _make_solver( + search_strategy=search_strategy_factory(tolerance=1e-3), + root_finding_strategy=root_finding_strategy, + ) + + def func(cfg: RecirculationConfiguration) -> LiquidStream: + loop.set_recirculation_rate(cfg.recirculation_rate) + return loop.propagate_stream(inlet) + + solution = solver.solve(func) + + assert solution.success + assert solution.configuration.recirculation_rate == pytest.approx(0.0, abs=1e-3) + + def test_recirc_found_when_rate_below_minimum(self, search_strategy_factory, root_finding_strategy): + """When inlet rate is below minimum, solver finds recirc that lifts pump to min flow.""" + loop = _LiquidLoop() + inlet = _stream(rate_m3h=4.0) + + solver = _make_solver( + search_strategy=search_strategy_factory(tolerance=1e-3), + root_finding_strategy=root_finding_strategy, + ) + + def func(cfg: RecirculationConfiguration) -> LiquidStream: + loop.set_recirculation_rate(cfg.recirculation_rate) + return loop.propagate_stream(inlet) + + solution = solver.solve(func) + + assert solution.success + assert solution.configuration.recirculation_rate >= 6.0 * 24 - 0.1 + + def test_net_throughput_unchanged_after_solve(self, search_strategy_factory, root_finding_strategy): + """After solving, net mass rate through the loop equals the original inlet rate.""" + loop = _LiquidLoop() + inlet = _stream(rate_m3h=4.0, density=800.0) + + solver = _make_solver( + search_strategy=search_strategy_factory(tolerance=1e-3), + root_finding_strategy=root_finding_strategy, + ) + + def func(cfg: RecirculationConfiguration) -> LiquidStream: + loop.set_recirculation_rate(cfg.recirculation_rate) + return loop.propagate_stream(inlet) + + solution = solver.solve(func) + outlet = loop.propagate_stream(inlet) + + assert solution.success + assert outlet.mass_rate_kg_per_h == pytest.approx(inlet.mass_rate_kg_per_h, rel=1e-4) + + def test_stonewall_returns_failure(self, search_strategy_factory, root_finding_strategy): + """When rate is above chart maximum, adding recirc cannot help — solver fails.""" + loop = _LiquidLoop() + inlet = _stream(rate_m3h=200.0) + + solver = _make_solver( + search_strategy=search_strategy_factory(tolerance=1e-3), + root_finding_strategy=root_finding_strategy, + recirculation_rate_boundary=Boundary(min=0.0, max=50.0), + ) + + def func(cfg: RecirculationConfiguration) -> LiquidStream: + loop.set_recirculation_rate(cfg.recirculation_rate) + return loop.propagate_stream(inlet) + + solution = solver.solve(func) + + assert not solution.success diff --git a/tests/libecalc/domain/process/process_solver/solvers/test_recirculation_solver.py b/tests/libecalc/domain/process/process_solver/solvers/test_recirculation_solver.py index e3ab197fb8..961b394ce1 100644 --- a/tests/libecalc/domain/process/process_solver/solvers/test_recirculation_solver.py +++ b/tests/libecalc/domain/process/process_solver/solvers/test_recirculation_solver.py @@ -3,7 +3,7 @@ import pytest from libecalc.domain.process.process_pipeline.process_error import ProcessError, RateTooHighError, RateTooLowError -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.process_solver.boundary import Boundary from libecalc.domain.process.process_solver.process_pipeline_runner import propagate_stream_many from libecalc.domain.process.process_solver.solvers.recirculation_solver import ( @@ -13,7 +13,7 @@ from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class RateCompressor(ProcessUnit): +class RateCompressor(GasProcessUnit): def __init__( self, fluid_service: FluidService, @@ -21,7 +21,7 @@ def __init__( maximum_rate: float, process_unit_id: ProcessUnitId | None = None, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._fluid_service = fluid_service self._minimum_rate = minimum_rate self._maximum_rate = maximum_rate diff --git a/tests/libecalc/domain/process/process_solver/solvers/test_speed_solver.py b/tests/libecalc/domain/process/process_solver/solvers/test_speed_solver.py index 9afae5b68f..f99f7de85f 100644 --- a/tests/libecalc/domain/process/process_solver/solvers/test_speed_solver.py +++ b/tests/libecalc/domain/process/process_solver/solvers/test_speed_solver.py @@ -3,21 +3,21 @@ import pytest from libecalc.domain.process.entities.shaft import Shaft, VariableSpeedShaft -from libecalc.domain.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId +from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId from libecalc.domain.process.process_solver.boundary import Boundary from libecalc.domain.process.process_solver.process_pipeline_runner import propagate_stream_many from libecalc.domain.process.process_solver.solvers.speed_solver import SpeedConfiguration, SpeedSolver from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream -class SpeedProcessUnit(ProcessUnit): +class SpeedProcessUnit(GasProcessUnit): def __init__( self, shaft: Shaft, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None, ): - self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id() + self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id() self._shaft = shaft self._fluid_service = fluid_service diff --git a/tests/libecalc/domain/process/process_solver/test_common_asv_solver_vs_legacy_train.py b/tests/libecalc/domain/process/process_solver/test_common_asv_solver_vs_legacy_train.py index d985226373..4199d31228 100644 --- a/tests/libecalc/domain/process/process_solver/test_common_asv_solver_vs_legacy_train.py +++ b/tests/libecalc/domain/process/process_solver/test_common_asv_solver_vs_legacy_train.py @@ -138,12 +138,12 @@ def test_common_asv_solver_vs_legacy_train( anti_surge_strategy = common_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_id=common_asv.get_id(), - first_compressor=first_compressor, + first_unit=first_compressor, ) pressure_control_strategy = common_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_id=common_asv.get_id(), - first_compressor=first_compressor, + first_unit=first_compressor, ) process_pipeline = process_pipeline_factory(units=process_units) train_solver = outlet_pressure_solver_factory( diff --git a/tests/libecalc/domain/process/process_solver/test_individual_asv_solver_vs_legacy_train.py b/tests/libecalc/domain/process/process_solver/test_individual_asv_solver_vs_legacy_train.py index 74ff08f275..5aea010663 100644 --- a/tests/libecalc/domain/process/process_solver/test_individual_asv_solver_vs_legacy_train.py +++ b/tests/libecalc/domain/process/process_solver/test_individual_asv_solver_vs_legacy_train.py @@ -129,12 +129,12 @@ def test_individual_asv_rate_solver_vs_legacy_train( anti_surge_strategy = individual_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) pressure_control_strategy = individual_asv_rate_control_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) train_solver = outlet_pressure_solver_factory( shaft=shaft_new, @@ -271,12 +271,12 @@ def test_individual_asv_pressure_solver_vs_legacy_train( anti_surge_strategy = individual_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) pressure_control_strategy = individual_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) train_solver = outlet_pressure_solver_factory( shaft=shaft_new, @@ -358,10 +358,10 @@ def test_individual_asv_anti_surge_returns_failure_when_rate_above_stonewall( runner = process_runner_factory(units=individual_asvs, configuration_handlers=[shaft, *loops]) process_pipeline = process_pipeline_factory(units=individual_asvs) anti_surge = individual_asv_anti_surge_strategy_factory( - runner=runner, recirculation_loop_ids=loop_ids, compressors=compressors + runner=runner, recirculation_loop_ids=loop_ids, units=compressors ) pressure_control = individual_asv_rate_control_strategy_factory( - runner=runner, recirculation_loop_ids=loop_ids, compressors=compressors + runner=runner, recirculation_loop_ids=loop_ids, units=compressors ) solver = outlet_pressure_solver_factory( shaft=shaft, @@ -416,7 +416,7 @@ def test_individual_asv_anti_surge_single_stage_returns_failure_when_rate_above_ loop_ids = [loop.get_id() for loop in loops] runner = process_runner_factory(units=individual_asvs, configuration_handlers=[shaft, *loops]) anti_surge = individual_asv_anti_surge_strategy_factory( - runner=runner, recirculation_loop_ids=loop_ids, compressors=compressors + runner=runner, recirculation_loop_ids=loop_ids, units=compressors ) inlet_stream = stream_factory(standard_rate_m3_per_day=5_000_000, pressure_bara=30.0) diff --git a/tests/libecalc/domain/process/process_solver/test_multi_pressure_solver.py b/tests/libecalc/domain/process/process_solver/test_multi_pressure_solver.py index 41cbf70b35..b04c30f11d 100644 --- a/tests/libecalc/domain/process/process_solver/test_multi_pressure_solver.py +++ b/tests/libecalc/domain/process/process_solver/test_multi_pressure_solver.py @@ -131,12 +131,12 @@ def test_two_stage_train_with_interstage_pressure_vs_legacy( anti_surge_strategy=individual_asv_anti_surge_strategy_factory( runner=low_pressure_runner, recirculation_loop_ids=low_pressure_loop_ids, - compressors=low_pressure_compressors, + units=low_pressure_compressors, ), pressure_control_strategy=individual_asv_rate_control_strategy_factory( runner=low_pressure_runner, recirculation_loop_ids=low_pressure_loop_ids, - compressors=low_pressure_compressors, + units=low_pressure_compressors, ), root_finding_strategy=root_finding_strategy, speed_boundary=speed_boundary, @@ -148,12 +148,12 @@ def test_two_stage_train_with_interstage_pressure_vs_legacy( anti_surge_strategy=individual_asv_anti_surge_strategy_factory( runner=high_pressure_runner, recirculation_loop_ids=high_pressure_loop_ids, - compressors=high_pressure_compressors, + units=high_pressure_compressors, ), pressure_control_strategy=individual_asv_rate_control_strategy_factory( runner=high_pressure_runner, recirculation_loop_ids=high_pressure_loop_ids, - compressors=high_pressure_compressors, + units=high_pressure_compressors, ), root_finding_strategy=root_finding_strategy, speed_boundary=speed_boundary, @@ -299,12 +299,12 @@ def make_segment(runner, loop_ids, compressors, process_pipeline): anti_surge_strategy=individual_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_ids=loop_ids, - compressors=compressors, + units=compressors, ), pressure_control_strategy=individual_asv_rate_control_strategy_factory( runner=runner, recirculation_loop_ids=loop_ids, - compressors=compressors, + units=compressors, ), root_finding_strategy=root_finding_strategy, speed_boundary=speed_boundary, @@ -399,10 +399,10 @@ def test_target_not_achievable_event_identifies_failing_segment( shaft_id=shaft.get_id(), runner=lp_runner, anti_surge_strategy=individual_asv_anti_surge_strategy_factory( - runner=lp_runner, recirculation_loop_ids=lp_loop_ids, compressors=lp_compressors + runner=lp_runner, recirculation_loop_ids=lp_loop_ids, units=lp_compressors ), pressure_control_strategy=individual_asv_rate_control_strategy_factory( - runner=lp_runner, recirculation_loop_ids=lp_loop_ids, compressors=lp_compressors + runner=lp_runner, recirculation_loop_ids=lp_loop_ids, units=lp_compressors ), root_finding_strategy=root_finding_strategy, speed_boundary=speed_boundary, @@ -412,10 +412,10 @@ def test_target_not_achievable_event_identifies_failing_segment( shaft_id=shaft.get_id(), runner=hp_runner, anti_surge_strategy=individual_asv_anti_surge_strategy_factory( - runner=hp_runner, recirculation_loop_ids=hp_loop_ids, compressors=hp_compressors + runner=hp_runner, recirculation_loop_ids=hp_loop_ids, units=hp_compressors ), pressure_control_strategy=individual_asv_rate_control_strategy_factory( - runner=hp_runner, recirculation_loop_ids=hp_loop_ids, compressors=hp_compressors + runner=hp_runner, recirculation_loop_ids=hp_loop_ids, units=hp_compressors ), root_finding_strategy=root_finding_strategy, speed_boundary=speed_boundary, diff --git a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_common_asv.py b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_common_asv.py index e825cc363e..b6b69c1f63 100644 --- a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_common_asv.py +++ b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_common_asv.py @@ -44,12 +44,12 @@ def test_outlet_pressure_solver_with_common_asv( anti_surge_strategy = common_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_id=common_asv.get_id(), - first_compressor=first_compressor, + first_unit=first_compressor, ) pressure_control_strategy = common_asv_pressure_control_strategy_factory( runner=runner, recirculation_loop_id=common_asv.get_id(), - first_compressor=first_compressor, + first_unit=first_compressor, ) common_asv_solver = outlet_pressure_solver_factory( shaft=shaft, @@ -134,10 +134,10 @@ def test_find_solution_returns_failure_when_rate_above_stonewall( runner = process_runner_factory(units=process_units, configuration_handlers=[shaft, common_asv]) process_pipeline = process_pipeline_factory(units=process_units) anti_surge = common_asv_anti_surge_strategy_factory( - runner=runner, recirculation_loop_id=common_asv.get_id(), first_compressor=small_chart_compressor + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=small_chart_compressor ) pressure_control = common_asv_pressure_control_strategy_factory( - runner=runner, recirculation_loop_id=common_asv.get_id(), first_compressor=small_chart_compressor + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=small_chart_compressor ) solver = outlet_pressure_solver_factory( shaft=shaft, @@ -184,10 +184,10 @@ def test_pressure_control_boundary_not_affected_by_residual_anti_surge_recircula runner = process_runner_factory(units=process_units, configuration_handlers=[shaft, common_asv]) process_pipeline = process_pipeline_factory(units=process_units) anti_surge = common_asv_anti_surge_strategy_factory( - runner=runner, recirculation_loop_id=common_asv.get_id(), first_compressor=small_chart_compressor + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=small_chart_compressor ) pressure_control = common_asv_pressure_control_strategy_factory( - runner=runner, recirculation_loop_id=common_asv.get_id(), first_compressor=small_chart_compressor + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=small_chart_compressor ) solver = outlet_pressure_solver_factory( shaft=shaft, diff --git a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_downstream_choke.py b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_downstream_choke.py index de899ba45c..dfbf8fb6ea 100644 --- a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_downstream_choke.py +++ b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_downstream_choke.py @@ -58,7 +58,7 @@ def test_outlet_pressure_solver_applies_downstream_choke_when_speed_solution_is_ anti_surge_strategy = common_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_id=recirculation_loop.get_id(), - first_compressor=compressor, + first_unit=compressor, ) pressure_control_strategy = downstream_choke_pressure_control_strategy_factory( runner=runner, diff --git a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_pump.py b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_pump.py new file mode 100644 index 0000000000..029712ac4f --- /dev/null +++ b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_pump.py @@ -0,0 +1,238 @@ +"""Integration test: ProcessPipelineRunner end-to-end with pump. + +Tests that the entire solver stack (OutletPressureSolver → SpeedSolver → +RecirculationSolver) works with a variable-speed pump pipeline using LiquidStream. +This verifies Decision 8: the unified ProcessPipelineRunner accepts both gas +compressor and liquid pump pipelines via the ProcessUnit protocol. +""" + +import pytest + +from libecalc.domain.process.entities.process_units.pump import Pump +from libecalc.domain.process.entities.shaft import VariableSpeedShaft +from libecalc.domain.process.process_pipeline.process_unit import create_process_unit_id +from libecalc.domain.process.process_solver.float_constraint import FloatConstraint +from libecalc.domain.process.process_solver.solvers.recirculation_solver import RecirculationConfiguration +from libecalc.domain.process.value_objects.chart import ChartCurve +from libecalc.domain.process.value_objects.liquid_stream import LiquidStream +from libecalc.testing.chart_data_factory import ChartDataFactory + +# Variable-speed pump chart: 2 speeds (3000, 4200 rpm) +# Rate range: 20–120 m³/h +# Head range: ~5000–2000 J/kg → ~50–20 bar rise for water at 1030 kg/m³ +# Efficiency: 0.72 +_PUMP_CHART = ChartDataFactory.from_curves( + curves=[ + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[20.0, 60.0, 120.0], + polytropic_head_joule_per_kg=[5000.0, 4000.0, 2000.0], + efficiency_fraction=[0.72, 0.72, 0.72], + ), + ChartCurve( + speed_rpm=4200.0, + rate_actual_m3_hour=[20.0, 60.0, 120.0], + polytropic_head_joule_per_kg=[5250.0, 4200.0, 2100.0], + efficiency_fraction=[0.72, 0.72, 0.72], + ), + ], + control_margin=0.0, +) + + +def _make_inlet(rate_m3h: float = 50.0, pressure: float = 10.0, density: float = 1030.0) -> LiquidStream: + """Water injection stream at typical conditions.""" + return LiquidStream( + pressure_bara=pressure, + density_kg_per_m3=density, + mass_rate_kg_per_h=rate_m3h * density, + ) + + +@pytest.fixture +def pump(): + return Pump(process_unit_id=create_process_unit_id(), pump_chart=_PUMP_CHART) + + +@pytest.fixture +def shaft(): + return VariableSpeedShaft() + + +class TestPumpPipelineIntegration: + """End-to-end tests: ProcessPipelineRunner with pump, shaft, recirculation.""" + + def test_pump_pipeline_finds_speed_for_target_pressure( + self, + pump, + shaft, + with_common_asv, + process_pipeline_factory, + process_runner_factory, + common_asv_anti_surge_strategy_factory, + common_asv_pressure_control_strategy_factory, + outlet_pressure_solver_factory, + ): + """SpeedSolver finds the RPM needed to reach target outlet pressure.""" + shaft.connect(pump) + common_asv, pipeline_units = with_common_asv([pump]) + + runner = process_runner_factory(units=pipeline_units, configuration_handlers=[shaft, common_asv]) + process_pipeline = process_pipeline_factory(units=pipeline_units) + + anti_surge = common_asv_anti_surge_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + pressure_control = common_asv_pressure_control_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + solver = outlet_pressure_solver_factory( + shaft=shaft, + runner=runner, + anti_surge_strategy=anti_surge, + pressure_control_strategy=pressure_control, + process_pipeline_id=process_pipeline.get_id(), + ) + + inlet = _make_inlet(rate_m3h=50.0, pressure=10.0) + + # Target: 40 bara outlet from 10 bara inlet (30 bar rise) + # Head needed: 30e5 / 1030 ≈ 2913 J/kg — within chart range + target = FloatConstraint(40.0) + solution = solver.find_solution(pressure_constraint=target, inlet_stream=inlet) + + assert solution.success + + # Verify speed was found + config_dict = {c.configuration_handler_id: c.value for c in solution.configuration} + speed = config_dict[shaft.get_id()].speed + assert 3000.0 <= speed <= 4200.0 + + # Verify outlet pressure matches target + runner.apply_configurations(solution.configuration) + outlet = runner.run(inlet_stream=inlet) + assert outlet.pressure_bara == pytest.approx(target.value, rel=1e-3) + + def test_pump_exposes_shaft_power_after_solve( + self, + pump, + shaft, + with_common_asv, + process_pipeline_factory, + process_runner_factory, + common_asv_anti_surge_strategy_factory, + common_asv_pressure_control_strategy_factory, + outlet_pressure_solver_factory, + ): + """After solving, pump exposes shaft power, head, and efficiency.""" + shaft.connect(pump) + common_asv, pipeline_units = with_common_asv([pump]) + + runner = process_runner_factory(units=pipeline_units, configuration_handlers=[shaft, common_asv]) + process_pipeline = process_pipeline_factory(units=pipeline_units) + + anti_surge = common_asv_anti_surge_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + pressure_control = common_asv_pressure_control_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + solver = outlet_pressure_solver_factory( + shaft=shaft, + runner=runner, + anti_surge_strategy=anti_surge, + pressure_control_strategy=pressure_control, + process_pipeline_id=process_pipeline.get_id(), + ) + + inlet = _make_inlet(rate_m3h=50.0, pressure=10.0) + target = FloatConstraint(40.0) + solution = solver.find_solution(pressure_constraint=target, inlet_stream=inlet) + + assert solution.success + + # Apply solution and run to populate pump state + runner.apply_configurations(solution.configuration) + runner.run(inlet_stream=inlet) + + # Pump should now expose valid results + assert pump.last_shaft_power_mw > 0 + assert pump.last_head_joule_per_kg > 0 + assert 0 < pump.last_efficiency <= 1.0 + + # Verify physics: P = ṁ × head / η / 1e6 + # Use pump's actual mass rate (may include recirculation) + mass_rate_kg_s = pump._last_mass_rate_kg_per_s + expected_power = mass_rate_kg_s * pump.last_head_joule_per_kg / pump.last_efficiency / 1e6 + assert pump.last_shaft_power_mw == pytest.approx(expected_power, rel=1e-6) + + def test_pump_pipeline_with_recirculation( + self, + shaft, + with_common_asv, + process_pipeline_factory, + process_runner_factory, + common_asv_anti_surge_strategy_factory, + common_asv_pressure_control_strategy_factory, + outlet_pressure_solver_factory, + ): + """When rate is below minimum, anti-surge recirculation kicks in.""" + # Use a chart with higher minimum rate to force recirculation + chart = ChartDataFactory.from_curves( + curves=[ + ChartCurve( + speed_rpm=3000.0, + rate_actual_m3_hour=[40.0, 80.0, 120.0], + polytropic_head_joule_per_kg=[5000.0, 4000.0, 2000.0], + efficiency_fraction=[0.72, 0.72, 0.72], + ), + ChartCurve( + speed_rpm=4200.0, + rate_actual_m3_hour=[40.0, 80.0, 120.0], + polytropic_head_joule_per_kg=[5250.0, 4200.0, 2100.0], + efficiency_fraction=[0.72, 0.72, 0.72], + ), + ], + control_margin=0.0, + ) + pump = Pump(process_unit_id=create_process_unit_id(), pump_chart=chart) + shaft.connect(pump) + + common_asv, pipeline_units = with_common_asv([pump]) + runner = process_runner_factory(units=pipeline_units, configuration_handlers=[shaft, common_asv]) + process_pipeline = process_pipeline_factory(units=pipeline_units) + + anti_surge = common_asv_anti_surge_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + pressure_control = common_asv_pressure_control_strategy_factory( + runner=runner, recirculation_loop_id=common_asv.get_id(), first_unit=pump + ) + solver = outlet_pressure_solver_factory( + shaft=shaft, + runner=runner, + anti_surge_strategy=anti_surge, + pressure_control_strategy=pressure_control, + process_pipeline_id=process_pipeline.get_id(), + ) + + # Inlet rate of 20 m³/h is below chart minimum of 40 m³/h → needs recirculation + inlet = _make_inlet(rate_m3h=20.0, pressure=10.0) + target = FloatConstraint(40.0) + + solution = solver.find_solution(pressure_constraint=target, inlet_stream=inlet) + + assert solution.success + + # Verify recirculation was applied + recirc_configs = [c for c in solution.configuration if isinstance(c.value, RecirculationConfiguration)] + assert len(recirc_configs) > 0 + assert recirc_configs[0].value.recirculation_rate > 0 + + # Verify outlet pressure matches target + runner.apply_configurations(solution.configuration) + outlet = runner.run(inlet_stream=inlet) + assert outlet.pressure_bara == pytest.approx(target.value, rel=1e-3) + + # Net throughput should equal inlet rate (recirculation is internal) + assert outlet.mass_rate_kg_per_h == pytest.approx(inlet.mass_rate_kg_per_h, rel=1e-3) diff --git a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_upstream_choke.py b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_upstream_choke.py index 2e6f4930b2..3c6f54d45b 100644 --- a/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_upstream_choke.py +++ b/tests/libecalc/domain/process/process_solver/test_outlet_pressure_solver_with_upstream_choke.py @@ -59,7 +59,7 @@ def test_outlet_pressure_solver_applies_upstream_choke_when_speed_solution_is_at anti_surge_strategy = common_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_id=recirculation_loop.get_id(), - first_compressor=compressor, + first_unit=compressor, ) pressure_control_strategy = upstream_choke_pressure_control_strategy_factory( runner=runner, diff --git a/tests/libecalc/domain/process/process_solver/test_two_stage_train_with_interstage_splitter_vs_legacy.py b/tests/libecalc/domain/process/process_solver/test_two_stage_train_with_interstage_splitter_vs_legacy.py index ce82a57030..f6bc458fb4 100644 --- a/tests/libecalc/domain/process/process_solver/test_two_stage_train_with_interstage_splitter_vs_legacy.py +++ b/tests/libecalc/domain/process/process_solver/test_two_stage_train_with_interstage_splitter_vs_legacy.py @@ -109,12 +109,12 @@ def test_two_stage_train_with_interstage_splitter_vs_legacy( anti_surge_strategy = individual_asv_anti_surge_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) pressure_control_strategy = individual_asv_rate_control_strategy_factory( runner=runner, recirculation_loop_ids=recirculation_loop_ids, - compressors=compressors, + units=compressors, ) train_solver = outlet_pressure_solver_factory( shaft=shaft_new,