-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcompressor.py
More file actions
164 lines (145 loc) · 7.42 KB
/
compressor.py
File metadata and controls
164 lines (145 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from typing import Final
from libecalc.domain.process.compressor.core.exceptions import CompressorThermodynamicCalculationError
from libecalc.domain.process.compressor.core.train.utils.common import (
RECIRCULATION_BOUNDARY_TOLERANCE,
calculate_outlet_pressure_and_stream,
)
from libecalc.domain.process.value_objects.chart.chart import ChartData
from libecalc.domain.process.value_objects.chart.compressor import CompressorChart
from libecalc.process.fluid_stream.fluid_service import FluidService
from libecalc.process.fluid_stream.fluid_stream import FluidStream
from libecalc.process.process_pipeline.process_error import (
CompressorOperatingPoint,
OutletFluidNotAchievableError,
RateTooHighError,
RateTooLowError,
)
from libecalc.process.process_pipeline.process_unit import ProcessUnit, ProcessUnitId
from libecalc.process.process_solver.boundary import Boundary
class Compressor(ProcessUnit):
def __init__(
self,
compressor_chart: ChartData,
fluid_service: FluidService,
process_unit_id: ProcessUnitId | None = None,
):
self._id: Final[ProcessUnitId] = process_unit_id or ProcessUnit._create_id()
self._compressor_chart = CompressorChart(compressor_chart)
self._fluid_service = fluid_service
self._speed: float | None = None
def get_id(self) -> ProcessUnitId:
return self._id
def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
actual_rate = inlet_stream.volumetric_rate_m3_per_hour
if actual_rate < self.minimum_flow_rate:
raise RateTooLowError(
actual_rate=actual_rate,
boundary_rate=self.minimum_flow_rate,
process_unit_id=self._id,
)
if actual_rate > self.maximum_flow_rate:
raise RateTooHighError(
actual_rate=actual_rate,
boundary_rate=self.maximum_flow_rate,
process_unit_id=self._id,
)
chart_curve_at_given_speed = self.compressor_chart.get_curve_by_speed(speed=self.speed)
if chart_curve_at_given_speed is not None:
polytropic_head = float(chart_curve_at_given_speed.head_as_function_of_rate(actual_rate))
polytropic_efficiency = float(chart_curve_at_given_speed.efficiency_as_function_of_rate(actual_rate))
else:
(
polytropic_head,
polytropic_efficiency,
) = self.compressor_chart.calculate_head_and_efficiency_for_internal_point_between_given_speeds(
speed=self.speed,
rate=actual_rate,
)
try:
return calculate_outlet_pressure_and_stream(
polytropic_efficiency=polytropic_efficiency,
polytropic_head_joule_per_kg=polytropic_head,
inlet_stream=inlet_stream,
fluid_service=self._fluid_service,
)
except CompressorThermodynamicCalculationError as exc:
# The compressor outlet thermodynamics could not produce a usable state
# (invalid Campbell pressure guess, PH flash failure, or invalid PH result).
raise OutletFluidNotAchievableError(
process_unit_id=self._id,
unachievable_operating_point=CompressorOperatingPoint(
inlet_pressure_bara=inlet_stream.pressure_bara,
inlet_temperature_kelvin=inlet_stream.temperature_kelvin,
actual_rate_m3_per_hour=actual_rate,
polytropic_head_joule_per_kg=polytropic_head,
polytropic_efficiency=polytropic_efficiency,
speed=self.speed,
),
) from exc
@property
def compressor_chart(self) -> CompressorChart:
return self._compressor_chart
@property
def speed(self) -> float:
if self._speed is None:
raise ValueError("Speed not set. Compressor must be registered on a Shaft.")
return self._speed
@property
def minimum_flow_rate(self) -> float:
"""Minimum flow rate in m3/h, as a function of speed if speed is set, otherwise the minimum flow rate at minimum speed."""
return self.compressor_chart.minimum_rate_as_function_of_speed(self.speed)
@property
def maximum_flow_rate(self) -> float:
"""Maximum flow rate in m3/h, as a function of speed if speed is set, otherwise the maximum flow rate at maximum speed."""
return self.compressor_chart.maximum_rate_as_function_of_speed(self.speed)
def set_speed(self, speed: float) -> None:
"""Set the rotational speed used for chart lookup."""
self._speed = speed
def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
"""Minimum standard volumetric rate [sm³/day] at current speed.
The inlet_stream should already be the stream entering the compressor
(i.e., after any upstream conditioning units such as TemperatureSetter
and LiquidRemover have processed it).
Note: since there is conversion between actual and standard rate, and numerical accuracy issues may later provide
a RateTooHigh or RateTooLow error when it should not be there, we give the minimum rate a slight nudge upwards
to make sure we stay inside capacity when we are supposed to stay inside capacity
"""
density = inlet_stream.density
min_actual_rate = self.compressor_chart.minimum_rate_as_function_of_speed(self.speed)
min_mass_rate_kg_per_h = min_actual_rate * density
return (
self._fluid_service.mass_rate_to_standard_rate(
fluid_model=inlet_stream.fluid_model,
mass_rate_kg_per_h=min_mass_rate_kg_per_h,
)
+ RECIRCULATION_BOUNDARY_TOLERANCE
)
def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
"""Maximum standard volumetric rate [sm³/day] at current speed.
Note: since there is conversion between actual and standard rate, and numerical accuracy issues may later provide
a RateTooHigh or RateTooLow error when it should not be there, we give the maximum rate a slight nudge downwards
to make sure we stay inside capacity when we are supposed to stay inside capacity
"""
density = inlet_stream.density
max_actual_rate = self.compressor_chart.maximum_rate_as_function_of_speed(self.speed)
max_mass_rate_kg_per_h = max_actual_rate * density
return (
self._fluid_service.mass_rate_to_standard_rate(
fluid_model=inlet_stream.fluid_model,
mass_rate_kg_per_h=max_mass_rate_kg_per_h,
)
- RECIRCULATION_BOUNDARY_TOLERANCE
)
def get_recirculation_range(self, inlet_stream: FluidStream) -> Boundary:
"""How much recirculation (sm³/day) can be added while keeping the compressor within capacity.
Returns:
Boundary where:
min = additional rate needed to reach the minimum operating point (surge limit)
max = additional rate available before the maximum operating point is exceeded
"""
min_rate = self.get_minimum_standard_rate(inlet_stream)
max_rate = self.get_maximum_standard_rate(inlet_stream)
return Boundary(
min=max(0.0, min_rate - inlet_stream.standard_rate_sm3_per_day),
max=max(0.0, max_rate - inlet_stream.standard_rate_sm3_per_day),
)