forked from openWB/core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbat.py
More file actions
69 lines (55 loc) · 3.06 KB
/
Copy pathbat.py
File metadata and controls
69 lines (55 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
from typing import TypedDict, Any
from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo, FaultState
from modules.common.simcount import SimCounter
from modules.common.utils.peak_filter import PeakFilter
from modules.common.store import get_bat_value_store
from modules.devices.deye.deye_solarman.config import DeyeSolarmanBatSetup
from modules.devices.deye.deye_solarman.device_type import DeviceType
from modules.common.component_type import ComponentType
from pysolarmanv5 import PySolarmanV5 as ModbusSolarmanClient_
log = logging.getLogger(__name__)
class KwargsDict(TypedDict):
device_id: int
client: ModbusSolarmanClient_
class DeyeSolarmanBat(AbstractBat):
def __init__(self, component_config: DeyeSolarmanBatSetup, **kwargs: Any) -> None:
self.component_config = component_config
self.kwargs: KwargsDict = kwargs
def initialize(self) -> None:
self.__device_id: int = self.kwargs['device_id']
self.client: ModbusSolarmanClient_ = self.kwargs['client']
self.store = get_bat_value_store(self.component_config.id)
self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
self.peak_filter = PeakFilter(ComponentType.BAT, self.component_config.id, self.fault_state)
self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
self.device_type = DeviceType(self.client.read_holding_registers(0, 1)[0])
def update(self) -> None:
if self.device_type == DeviceType.SINGLE_PHASE_STRING or self.device_type == DeviceType.SINGLE_PHASE_HYBRID:
power = self.client.read_holding_registers(190, 1)[0] * -1
soc = self.client.read_holding_registers(184, 1)[0]
if self.device_type == DeviceType.SINGLE_PHASE_HYBRID:
imported = self.client.read_holding_registers(72, 1)[0] * 100
exported = self.client.read_holding_registers(74, 1)[0] * 100
imported, exported = self.peak_filter.check_values(power, imported, exported)
elif self.device_type == DeviceType.SINGLE_PHASE_STRING:
self.peak_filter.check_values(power)
imported, exported = self.sim_counter.sim_count(power)
else: # THREE_PHASE_LV (0x0500, 0x0005), THREE_PHASE_HV (0x0006)
power = self.client.read_holding_registers(590, 1)[0] * -1
if self.device_type == DeviceType.THREE_PHASE_HV:
power = power * 10
soc = self.client.read_holding_registers(588, 1)[0]
self.peak_filter.check_values(power)
imported, exported = self.sim_counter.sim_count(power)
bat_state = BatState(
power=power,
soc=soc,
imported=imported,
exported=exported
)
self.store.set(bat_state)
component_descriptor = ComponentDescriptor(configuration_factory=DeyeSolarmanBatSetup)