-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathmixer.py
More file actions
31 lines (23 loc) · 1.32 KB
/
mixer.py
File metadata and controls
31 lines (23 loc) · 1.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Final
from libecalc.domain.process.entities.process_units.simplified_stream_mixer.simplified_stream_mixer import (
SimplifiedStreamMixer,
)
from libecalc.domain.process.process_pipeline.process_unit import GasProcessUnit, ProcessUnitId
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream
class Mixer(GasProcessUnit):
"""Mixes one external stream into the through-stream.
The external stream is set via `set_stream` before propagation. This models
a sidestream injection point in an interstage manifold.
"""
def __init__(self, fluid_service: FluidService, process_unit_id: ProcessUnitId | None = None):
self._id: Final[ProcessUnitId] = process_unit_id or GasProcessUnit._create_id()
self._mixer = SimplifiedStreamMixer(fluid_service)
self._external_stream: FluidStream | None = None
def get_id(self) -> ProcessUnitId:
return self._id
def set_stream(self, stream: FluidStream) -> None:
self._external_stream = stream
def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
if self._external_stream is None:
raise ValueError("Mixer has no external stream set; call set_stream() before propagating.")
return self._mixer.mix_streams([inlet_stream, self._external_stream])