Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Added SMA Energy Meter / Sunny Home Manager support via Speedwire multicast with auto-detection and per-phase readings ([#252](https://github.com/tomquist/astrameter/pull/252))
- Added SML powermeter support for smart meters over a local serial port (IR head), with optional per-phase OBIS overrides ([#229](https://github.com/tomquist/astrameter/pull/229))
- Added multi-phase support for Tasmota (`JSON_POWER_MQTT_LABEL`) and MQTT (`TOPICS` / `JSON_PATHS`) powermeters ([#136](https://github.com/tomquist/astrameter/issues/136), [#280](https://github.com/tomquist/astrameter/pull/280))
- Added PID controller support for any powermeter via `PID_KP`, `PID_KI`, `PID_KD`, `PID_OUTPUT_MAX`, and `PID_MODE` config options (global or per-section), with built-in anti-windup
- Added `POWER_OFFSET` and `POWER_MULTIPLIER` transforms for any powermeter, including per-phase calibration, sign flipping, and phase nulling ([#250](https://github.com/tomquist/astrameter/pull/250)); the Home Assistant app exposes both as optional advanced fields
- Added optional Marstek cloud auto-registration for managed fake CT devices at startup ([#237](https://github.com/tomquist/astrameter/pull/237))
- Switched the Home Assistant powermeter integration from REST polling to the WebSocket API ([#232](https://github.com/tomquist/astrameter/pull/232))
Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,39 @@ POWER_MULTIPLIER = 1,0,1

**Note:** Transforms are applied when readings are taken from the powermeter, before values are passed to the emulated device (Shelly, CT002/CT003, etc.).

### PID Controller

You can optionally layer a PID (Proportional-Integral-Derivative) controller on top of any powermeter. The controller uses the grid power reading as its process variable and steers the reported value toward zero (net-zero grid exchange). This creates a second, software-level closed loop that can accelerate convergence or compensate for slow storage device response.

**How it works:**

- `mode = bias` (default) — adds the PID output to the raw meter reading. The storage device's own closed-loop controller still acts, so the effective gain is `(1 − Kp) × Kb` where `Kb` is the device's internal gain. Use `0 < Kp < 1`; `Kp = 0.5` is the recommended starting point.
- `mode = replace` — uses only the PID output as the reported value, bypassing the device's own loop entirely.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

**Anti-windup** is built in: the integral term is clamped so that the total PID output never exceeds `±PID_OUTPUT_MAX`, and accumulation pauses while the output is saturated.

All parameters can be set globally in `[GENERAL]` or per powermeter section (per-section values override the global ones):

| Parameter | Description | Default |
|-----------|-------------|---------|
| `PID_KP` | Proportional gain. Set > 0 to enable the PID. | `0` (disabled) |
| `PID_KI` | Integral gain. Usually not needed; risks windup. | `0` |
| `PID_KD` | Derivative gain. Noisy on real meters; leave at 0. | `0` |
| `PID_OUTPUT_MAX` | Maximum absolute PID output in watts. | `800` |
| `PID_MODE` | `bias` or `replace`. | `bias` |

For a small import safety buffer that prevents accidental export, combine with a negative `POWER_OFFSET` (applied before the PID):

```ini
[SHELLY]
TYPE = 1PM
IP = 192.168.1.100
POWER_OFFSET = -20 # 20 W safety buffer toward import
PID_KP = 0.5
PID_OUTPUT_MAX = 800
PID_MODE = bias
```

### Shelly

#### Shelly 1PM
Expand Down
28 changes: 28 additions & 0 deletions config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,34 @@ THROTTLE_INTERVAL = 0
#OBIS_POWER_L2 = 0100380700ff
#OBIS_POWER_L3 = 01004c0700ff

## --- PID controller (grid-balance feedback loop) ---
## When PID_KP > 0, a Proportional-Integral-Derivative controller is layered
## on top of the configured powermeter. It uses the measured grid power as its
## process variable and steers the reported value toward zero (net-zero grid).
##
## Recommended starting point (P-only, bias mode):
## PID_KP = 0.5 # gain; 0.5 is a safe start for one storage device
## PID_KI = 0 # integral — usually not needed; risks windup
## PID_KD = 0 # derivative — noisy on real meters; leave at 0
## PID_OUTPUT_MAX = 800 # cap the PID output at ± 800 W
## PID_MODE = bias # add PID output to raw reading (alternative: replace)
##
## In bias mode the PID and the storage device's own closed-loop controller
## act together; the system is stable for 0 < Kp < 1.
## In replace mode the PID output replaces the raw reading entirely.
##
## To keep a small import safety buffer (prevent accidental export), combine
## with a negative POWER_OFFSET applied before the PID:
## POWER_OFFSET = -20 # bias 20 W toward import
##
## Parameters can be set globally in [GENERAL] or per powermeter section.
## Per-section values override the global ones.
#PID_KP = 0.5
#PID_KI = 0
#PID_KD = 0
#PID_OUTPUT_MAX = 800
#PID_MODE = bias

## --- MQTT Insights (optional) ---
## Publishes internal state (grid power, targets, saturation, consumer topology)
## to MQTT with Home Assistant Device Discovery support.
Expand Down
46 changes: 46 additions & 0 deletions src/astrameter/config/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
JsonHttpPowermeter,
ModbusPowermeter,
MqttPowermeter,
PidPowermeter,
Powermeter,
Script,
Shelly1PM,
Expand Down Expand Up @@ -149,6 +150,11 @@ def read_all_powermeter_configs(
global_throttle_interval = config.getfloat(
"GENERAL", "THROTTLE_INTERVAL", fallback=0.0
)
global_pid_kp = config.getfloat("GENERAL", "PID_KP", fallback=0.0)
global_pid_ki = config.getfloat("GENERAL", "PID_KI", fallback=0.0)
global_pid_kd = config.getfloat("GENERAL", "PID_KD", fallback=0.0)
global_pid_output_max = config.getfloat("GENERAL", "PID_OUTPUT_MAX", fallback=800.0)
global_pid_mode = config.get("GENERAL", "PID_MODE", fallback="bias").strip().lower()

for section in config.sections():
powermeter = create_powermeter(section, config)
Expand Down Expand Up @@ -190,6 +196,46 @@ def read_all_powermeter_configs(
)
powermeter = ThrottledPowermeter(powermeter, section_throttle_interval)

section_pid_kp = config.getfloat(section, "PID_KP", fallback=global_pid_kp)
if section_pid_kp > 0:
pid_source = (
"section-specific"
if config.has_option(section, "PID_KP")
else "global"
)
section_pid_ki = config.getfloat(
section, "PID_KI", fallback=global_pid_ki
)
section_pid_kd = config.getfloat(
section, "PID_KD", fallback=global_pid_kd
)
section_pid_output_max = config.getfloat(
section, "PID_OUTPUT_MAX", fallback=global_pid_output_max
)
section_pid_mode = (
config.get(section, "PID_MODE", fallback=global_pid_mode)
.strip()
.lower()
)
logger.info(
"Applying %s PID controller (Kp=%s, Ki=%s, Kd=%s, max=%sW, mode=%s) to %s",
pid_source,
section_pid_kp,
section_pid_ki,
section_pid_kd,
section_pid_output_max,
section_pid_mode,
section,
)
powermeter = PidPowermeter(
powermeter,
kp=section_pid_kp,
ki=section_pid_ki,
kd=section_pid_kd,
output_max=section_pid_output_max,
mode=section_pid_mode,
)

client_filter = create_client_filter(section, config)
powermeters.append((powermeter, client_filter))
return powermeters
Expand Down
2 changes: 2 additions & 0 deletions src/astrameter/powermeter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .json_http import JsonHttpPowermeter
from .modbus import ModbusPowermeter
from .mqtt import MqttPowermeter
from .pid import PidPowermeter
from .script import Script
from .shelly import Shelly, Shelly1PM, Shelly3EM, Shelly3EMPro, ShellyEM, ShellyPlus1PM
from .shrdzm import Shrdzm
Expand All @@ -29,6 +30,7 @@
"JsonHttpPowermeter",
"ModbusPowermeter",
"MqttPowermeter",
"PidPowermeter",
"Powermeter",
"Script",
"Shelly",
Expand Down
159 changes: 159 additions & 0 deletions src/astrameter/powermeter/pid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import asyncio
import time

from .base import Powermeter


class PidPowermeter(Powermeter):
"""
A wrapper around a powermeter that applies a PID (Proportional-Integral-
Derivative) controller to steer the reported power toward zero (grid balance).

The PID controller uses the raw power-meter reading as its *process
variable* and computes an adjustment that is either **added** to the raw
reading (``mode="bias"``) or **used in place of** the raw reading
(``mode="replace"``).

Positive PID output motivates the storage device to increase feed-in power;
negative output motivates it to decrease feed-in power.

**Gain sensitivity:** in ``mode="bias"`` the PID and the storage device's own
closed-loop controller act *together*. The effective closed-loop gain is
``(1 - Kp) * Kb``, where ``Kb`` is the device's internal gain.
The system is stable for ``0 < Kp < 1``. Use ``Kp = 0.5`` as the
recommended starting value.

**Anti-windup** is built in: the integral term is clamped so that the
total PID output never exceeds ``[-output_max, +output_max]``, and
integration is paused while the output is saturated.

Error convention:
error = -measurement
A positive grid import produces a negative error, causing the PID to
reduce the reported value and motivate the storage device to cover the import.

To maintain a small import safety buffer (prevent export), set a small
negative ``POWER_OFFSET`` (e.g. ``POWER_OFFSET = -20``) in the filter
chain *before* the PID.

The controller runs on the **sum** of all phases (total grid power)
and distributes its output equally across phases.

Config parameters:
PID_KP Proportional gain (default 0 → PID disabled)
PID_KI Integral gain (default 0)
PID_KD Derivative gain (default 0)
PID_OUTPUT_MAX Output clamp magnitude in watts (default 800)
PID_MODE "bias" or "replace" (default "bias")
"""

VALID_MODES = ("bias", "replace")

def __init__(
self,
wrapped_powermeter: Powermeter,
kp: float = 0.0,
ki: float = 0.0,
kd: float = 0.0,
output_max: float = 800.0,
mode: str = "bias",
):
"""
Initialise the PID powermeter wrapper.

Args:
wrapped_powermeter: The actual powermeter instance to wrap.
kp: Proportional gain.
ki: Integral gain.
kd: Derivative gain.
output_max: Maximum absolute PID output in watts. Must be > 0.
mode: ``"bias"`` — add PID output to raw reading, or
``"replace"`` — use PID output as the reported value.
"""
if output_max <= 0:
raise ValueError(f"PID output_max must be positive, got {output_max}")
mode = mode.lower()
if mode not in self.VALID_MODES:
raise ValueError(
f"PID mode must be one of {self.VALID_MODES}, got '{mode}'"
)

self.wrapped_powermeter = wrapped_powermeter
self.kp = kp
self.ki = ki
self.kd = kd
self.output_max = output_max
self.mode = mode

# PID state
self._integral: float = 0.0
self._prev_error: float | None = None
self._prev_time: float | None = None
self._lock = asyncio.Lock()

async def wait_for_message(self, timeout=5):
"""Pass through to wrapped powermeter."""
return await self.wrapped_powermeter.wait_for_message(timeout)

async def start(self):
await self.wrapped_powermeter.start()

async def stop(self):
await self.wrapped_powermeter.stop()

async def get_powermeter_watts(self) -> list[float]:
async with self._lock:
raw_values = await self.wrapped_powermeter.get_powermeter_watts()
current_time = time.monotonic()

# Compute error on the total power across all phases
total_power = sum(raw_values)
error = -total_power
if self._prev_time is None:
# First call — initialise state, no derivative yet
self._prev_error = error
self._prev_time = current_time
dt = 0.0
else:
dt = current_time - self._prev_time
if dt <= 0:
dt = 0.0

# --- Proportional ---
p_term = self.kp * error

# --- Integral with anti-windup ---
if dt > 0:
# Tentatively accumulate
tentative_integral = self._integral + error * dt
tentative_output = p_term + self.ki * tentative_integral
# Only accept the new integral if output is not saturated,
# or if the integral is moving toward zero (unwinding).
if abs(tentative_output) <= self.output_max or (
self._integral != 0 and self._integral * error < 0
):
self._integral = tentative_integral
Comment thread
coderabbitai[bot] marked this conversation as resolved.
i_term = self.ki * self._integral

# --- Derivative ---
if dt > 0 and self._prev_error is not None:
d_term = self.kd * (error - self._prev_error) / dt
else:
d_term = 0.0

self._prev_error = error
self._prev_time = current_time

# --- Total output with clamping ---
pid_output = p_term + i_term + d_term
pid_output = max(-self.output_max, min(self.output_max, pid_output))

# --- Apply to readings ---
num_phases = len(raw_values)
per_phase = pid_output / num_phases if num_phases > 0 else 0.0

if self.mode == "bias":
return [value + per_phase for value in raw_values]
else:
# replace mode: distribute PID output equally across phases
return [per_phase] * num_phases
Loading