-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcompressor.py
More file actions
150 lines (129 loc) · 6.61 KB
/
compressor.py
File metadata and controls
150 lines (129 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from typing import Final
from libecalc.domain.process.compressor.core.train.utils.common import (
RECIRCULATION_BOUNDARY_TOLERANCE,
calculate_outlet_pressure_and_stream,
)
from libecalc.domain.process.process_pipeline.process_error import RateTooHighError, RateTooLowError
from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId
from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.value_objects.chart.chart import ChartData
from libecalc.domain.process.value_objects.chart.compressor import CompressorChart
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream
class Compressor(GasProcessUnit):
def __init__(
self,
compressor_chart: ChartData,
fluid_service: FluidService,
process_unit_id: ProcessUnitId | None = None,
):
self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id()
self._compressor_chart = CompressorChart(compressor_chart)
self._fluid_service = fluid_service
self._speed: float | None = None
def get_id(self) -> ProcessUnitId:
return self._id
def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
actual_rate = inlet_stream.volumetric_rate_m3_per_hour
if actual_rate < self.minimum_flow_rate:
raise RateTooLowError(
actual_rate=actual_rate,
boundary_rate=self.minimum_flow_rate,
process_unit_id=self._id,
)
if actual_rate > self.maximum_flow_rate:
raise RateTooHighError(
actual_rate=actual_rate,
boundary_rate=self.maximum_flow_rate,
process_unit_id=self._id,
)
chart_curve_at_given_speed = self.compressor_chart.get_curve_by_speed(speed=self.speed)
if chart_curve_at_given_speed is not None:
polytropic_head = float(chart_curve_at_given_speed.head_as_function_of_rate(actual_rate))
polytropic_efficiency = float(chart_curve_at_given_speed.efficiency_as_function_of_rate(actual_rate))
else:
(
polytropic_head,
polytropic_efficiency,
) = self.compressor_chart.calculate_head_and_efficiency_for_internal_point_between_given_speeds(
speed=self.speed,
rate=actual_rate,
)
return calculate_outlet_pressure_and_stream(
polytropic_efficiency=polytropic_efficiency,
polytropic_head_joule_per_kg=polytropic_head,
inlet_stream=inlet_stream,
fluid_service=self._fluid_service,
)
@property
def compressor_chart(self) -> CompressorChart:
return self._compressor_chart
@property
def speed(self) -> float:
if self._speed is None:
raise ValueError("Speed not set. Compressor must be registered on a Shaft.")
return self._speed
@property
def minimum_flow_rate(self) -> float:
"""Minimum flow rate in m3/h, as a function of speed if speed is set, otherwise the minimum flow rate at minimum speed."""
return self.compressor_chart.minimum_rate_as_function_of_speed(self.speed)
@property
def maximum_flow_rate(self) -> float:
"""Maximum flow rate in m3/h, as a function of speed if speed is set, otherwise the maximum flow rate at maximum speed."""
return self.compressor_chart.maximum_rate_as_function_of_speed(self.speed)
def set_speed(self, speed: float) -> None:
"""Set the rotational speed used for chart lookup."""
self._speed = speed
@property
def minimum_speed(self) -> float:
return self._compressor_chart.minimum_speed
@property
def maximum_speed(self) -> float:
return self._compressor_chart.maximum_speed
def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
"""Minimum standard volumetric rate [sm³/day] at current speed.
The inlet_stream should already be the stream entering the compressor
(i.e., after any upstream conditioning units such as TemperatureSetter
and LiquidRemover have processed it).
Note: since there is conversion between actual and standard rate, and numerical accuracy issues may later provide
a RateTooHigh or RateTooLow error when it should not be there, we give the minimum rate a slight nudge upwards
to make sure we stay inside capacity when we are supposed to stay inside capacity
"""
density = inlet_stream.density
min_actual_rate = self.compressor_chart.minimum_rate_as_function_of_speed(self.speed)
min_mass_rate_kg_per_h = min_actual_rate * density
return (
self._fluid_service.mass_rate_to_standard_rate(
fluid_model=inlet_stream.fluid_model,
mass_rate_kg_per_h=min_mass_rate_kg_per_h,
)
+ RECIRCULATION_BOUNDARY_TOLERANCE
)
def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
"""Maximum standard volumetric rate [sm³/day] at current speed.
Note: since there is conversion between actual and standard rate, and numerical accuracy issues may later provide
a RateTooHigh or RateTooLow error when it should not be there, we give the maximum rate a slight nudge downwards
to make sure we stay inside capacity when we are supposed to stay inside capacity
"""
density = inlet_stream.density
max_actual_rate = self.compressor_chart.maximum_rate_as_function_of_speed(self.speed)
max_mass_rate_kg_per_h = max_actual_rate * density
return (
self._fluid_service.mass_rate_to_standard_rate(
fluid_model=inlet_stream.fluid_model,
mass_rate_kg_per_h=max_mass_rate_kg_per_h,
)
- RECIRCULATION_BOUNDARY_TOLERANCE
)
def get_recirculation_range(self, inlet_stream: FluidStream) -> Boundary:
"""How much recirculation (sm³/day) can be added while keeping the compressor within capacity.
Returns:
Boundary where:
min = additional rate needed to reach the minimum operating point (surge limit)
max = additional rate available before the maximum operating point is exceeded
"""
min_rate = self.get_minimum_standard_rate(inlet_stream)
max_rate = self.get_maximum_standard_rate(inlet_stream)
return Boundary(
min=max(0.0, min_rate - inlet_stream.standard_rate_sm3_per_day),
max=max(0.0, max_rate - inlet_stream.standard_rate_sm3_per_day),
)