-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathspeed_solver.py
More file actions
230 lines (206 loc) · 9.58 KB
/
speed_solver.py
File metadata and controls
230 lines (206 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import logging
from collections.abc import Callable
from libecalc.process.fluid_stream.fluid_stream import FluidStream
from libecalc.process.process_pipeline.process_error import (
OutletFluidNotAchievableError,
RateTooHighError,
RateTooLowError,
)
from libecalc.process.process_solver.boundary import Boundary
from libecalc.process.process_solver.configuration import SpeedConfiguration
from libecalc.process.process_solver.search_strategies import (
ACCEPT_AND_GO_HIGHER,
ACCEPT_AND_GO_LOWER,
REJECT_AND_GO_HIGHER,
REJECT_AND_GO_LOWER,
BinarySearchResult,
RootFindingStrategy,
SearchStrategy,
)
from libecalc.process.process_solver.solver import (
OutletFluidNotAchievableEvent,
OutsideCapacityEvent,
Solution,
Solver,
SolverFailureStatus,
TargetNotAchievableEvent,
)
logger = logging.getLogger(__name__)
class SpeedSolver(Solver[SpeedConfiguration]):
def __init__(
self,
search_strategy: SearchStrategy,
root_finding_strategy: RootFindingStrategy,
boundary: Boundary,
target_pressure: float,
):
self._boundary = boundary
self._target_pressure = target_pressure
self._search_strategy = search_strategy
self._root_finding_strategy = root_finding_strategy
def solve(self, func: Callable[[SpeedConfiguration], FluidStream]) -> Solution[SpeedConfiguration]:
def get_outlet_stream(speed: float) -> FluidStream:
return func(SpeedConfiguration(speed=speed))
max_speed_configuration = SpeedConfiguration(speed=self._boundary.max)
try:
maximum_speed_outlet_stream = func(max_speed_configuration)
except RateTooHighError as e:
logger.debug(f"No solution found for maximum speed: {max_speed_configuration}", exc_info=e)
return Solution(
success=False,
configuration=max_speed_configuration,
failure_event=OutsideCapacityEvent(
status=SolverFailureStatus.ABOVE_MAXIMUM_FLOW_RATE,
actual_value=e.actual_rate,
boundary_value=e.boundary_rate,
source_id=e.process_unit_id,
),
)
except RateTooLowError as e:
logger.debug(f"No solution found for maximum speed: {max_speed_configuration}", exc_info=e)
return Solution(
success=False,
configuration=max_speed_configuration,
failure_event=OutsideCapacityEvent(
status=SolverFailureStatus.BELOW_MINIMUM_FLOW_RATE,
actual_value=e.actual_rate,
boundary_value=e.boundary_rate,
source_id=e.process_unit_id,
),
)
except OutletFluidNotAchievableError:
result = self._search_highest_eos_feasible_speed(get_outlet_stream)
if isinstance(result, Solution):
return result
max_speed_configuration = result
maximum_speed_outlet_stream = get_outlet_stream(speed=result.speed)
if maximum_speed_outlet_stream.pressure_bara < self._target_pressure:
return Solution(
success=False,
configuration=SpeedConfiguration(self._boundary.max),
failure_event=TargetNotAchievableEvent(
status=SolverFailureStatus.MAXIMUM_ACHIEVABLE_DISCHARGE_PRESSURE_BELOW_TARGET,
achievable_value=maximum_speed_outlet_stream.pressure_bara,
target_value=self._target_pressure,
),
)
try:
minimum_speed_configuration = SpeedConfiguration(speed=self._boundary.min)
minimum_speed_outlet_stream = func(minimum_speed_configuration)
except RateTooHighError as e:
logger.debug(f"No solution found for minimum speed: {self._boundary.min}", exc_info=e)
# rate is above maximum rate for minimum speed. Find the lowest minimum speed which gives a valid result
def bool_speed_func(x: float):
try:
get_outlet_stream(speed=x)
return False, True
except RateTooHighError:
return True, False
except RateTooLowError:
return False, False
minimum_speed_within_capacity = self._search_strategy.search(
boundary=self._boundary,
func=bool_speed_func,
)
minimum_speed_configuration = SpeedConfiguration(speed=minimum_speed_within_capacity)
minimum_speed_outlet_stream = func(minimum_speed_configuration)
except OutletFluidNotAchievableError:
result = self._search_lowest_eos_feasible_speed(get_outlet_stream)
if isinstance(result, Solution):
return result
minimum_speed_configuration = result
minimum_speed_outlet_stream = get_outlet_stream(speed=result.speed)
if minimum_speed_outlet_stream.pressure_bara > self._target_pressure:
# Solution 2, target pressure is too low
return Solution(
success=False,
configuration=minimum_speed_configuration,
failure_event=TargetNotAchievableEvent(
status=SolverFailureStatus.MINIMUM_ACHIEVABLE_DISCHARGE_PRESSURE_ABOVE_TARGET,
achievable_value=minimum_speed_outlet_stream.pressure_bara,
target_value=self._target_pressure,
),
)
assert (
minimum_speed_outlet_stream.pressure_bara
<= self._target_pressure
<= maximum_speed_outlet_stream.pressure_bara
)
# Solution 1, iterate on speed until target discharge pressure is found
def root_speed_func(x: float) -> float:
# We should be able to produce an outlet stream since we adjust minimum speed above,
# or exit if max speed is not enough
out = get_outlet_stream(speed=x)
return out.pressure_bara - self._target_pressure
try:
speed = self._root_finding_strategy.find_root(
boundary=Boundary(min=minimum_speed_configuration.speed, max=max_speed_configuration.speed),
func=root_speed_func,
)
except OutletFluidNotAchievableError as e:
return self._fluid_not_achievable_solution(
e, SpeedConfiguration(speed=e.unachievable_operating_point.speed)
)
return Solution(success=True, configuration=SpeedConfiguration(speed=speed))
def _fluid_not_achievable_solution(
self,
e: OutletFluidNotAchievableError,
configuration: SpeedConfiguration,
) -> Solution[SpeedConfiguration]:
logger.warning(f"Outlet fluid not achievable at speed {configuration.speed}", exc_info=e)
return Solution(
success=False,
configuration=configuration,
failure_event=OutletFluidNotAchievableEvent(
status=SolverFailureStatus.OUTLET_FLUID_NOT_ACHIEVABLE,
source_id=e.process_unit_id,
unachievable_operating_point=e.unachievable_operating_point,
),
)
def _search_highest_eos_feasible_speed(
self,
get_outlet_stream: Callable[[float], FluidStream],
) -> SpeedConfiguration | Solution[SpeedConfiguration]:
"""Binary-search downward for the highest speed where the EOS produces a valid outlet.
Pre-checks that boundary.min is feasible; if not, returns a failed Solution
(the search would otherwise exhaust iterations without converging).
"""
logger.debug(
"Outlet fluid not achievable at maximum speed %.1f — searching for highest feasible speed.",
self._boundary.max,
)
try:
get_outlet_stream(self._boundary.min)
except OutletFluidNotAchievableError as e:
return self._fluid_not_achievable_solution(e, SpeedConfiguration(speed=self._boundary.min))
def feasible(x: float) -> BinarySearchResult:
try:
get_outlet_stream(x)
return ACCEPT_AND_GO_HIGHER
except (OutletFluidNotAchievableError, RateTooHighError, RateTooLowError):
return REJECT_AND_GO_LOWER
speed = self._search_strategy.search(boundary=self._boundary, func=feasible)
return SpeedConfiguration(speed=speed)
def _search_lowest_eos_feasible_speed(
self,
get_outlet_stream: Callable[[float], FluidStream],
) -> SpeedConfiguration | Solution[SpeedConfiguration]:
"""Binary-search upward for the lowest speed where the EOS produces a valid outlet.
Pre-checks that boundary.max is feasible; if not, returns a failed Solution.
"""
logger.debug(
"Outlet fluid not achievable at minimum speed %.1f — searching for lowest feasible speed.",
self._boundary.min,
)
try:
get_outlet_stream(self._boundary.max)
except OutletFluidNotAchievableError as e:
return self._fluid_not_achievable_solution(e, SpeedConfiguration(speed=self._boundary.max))
def feasible(x: float) -> BinarySearchResult:
try:
get_outlet_stream(x)
return ACCEPT_AND_GO_LOWER
except (OutletFluidNotAchievableError, RateTooHighError, RateTooLowError):
return REJECT_AND_GO_HIGHER
speed = self._search_strategy.search(boundary=self._boundary, func=feasible)
return SpeedConfiguration(speed=speed)