Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ def main():
for future in futures:
future.result()
finally:
# Close all powermeters to prevent memory leaks
logger.info("Closing powermeters...")
for powermeter, _ in powermeters:
try:
powermeter.close()
except Exception as e:
logger.error(f"Error closing powermeter: {e}")

# Ensure health service is properly stopped on exit
logger.info("Stopping health check service...")
stop_health_service()
Expand Down
5 changes: 5 additions & 0 deletions powermeter/amisreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ def get_json(self, path):
def get_powermeter_watts(self):
response = self.get_json("/rest")
return [int(response["saldo"])]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
4 changes: 4 additions & 0 deletions powermeter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ def wait_for_message(self, timeout=5):

def get_powermeter_watts(self):
raise NotImplementedError()

def close(self):
"""Close any open resources (sessions, connections, etc.)"""
pass
5 changes: 5 additions & 0 deletions powermeter/emlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ def get_powermeter_watts(self):
power_in = response["Leistung170"]
power_out = response["Leistung270"]
return [int(power_in) - int(power_out)]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/esphome.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ def get_json(self, path):
def get_powermeter_watts(self):
ParsedData = self.get_json(f"/{self.domain}/{self.id}")
return [int(ParsedData["value"])]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,8 @@ def get_powermeter_watts(self):
power_out = float(response["state"])
results.append(power_in - power_out)
return results

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/iobroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ def get_powermeter_watts(self):
if item["id"] == self.power_output_alias:
power_out = int(item["val"])
return [power_in - power_out]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/json_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ def get_powermeter_watts(self) -> List[float]:
for path in self.json_paths:
values.append(extract_json_value(data, path))
return values

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/shelly.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def get_rpc_json(self, path):
def get_powermeter_watts(self):
raise NotImplementedError()

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()


class Shelly1PM(Shelly):
def get_powermeter_watts(self):
Expand Down
5 changes: 5 additions & 0 deletions powermeter/shrdzm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ def get_powermeter_watts(self):
f"/getLastData?user={self.user}&password={self.password}"
)
return [int(int(response["1.7.0"]) - int(response["2.7.0"]))]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
5 changes: 5 additions & 0 deletions powermeter/tasmota.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ def get_powermeter_watts(self):
power_in = value[self.json_power_input_mqtt_label]
power_out = value[self.json_power_output_mqtt_label]
return [int(power_in) - int(power_out)]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
4 changes: 4 additions & 0 deletions powermeter/throttling.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ def get_powermeter_watts(self) -> List[float]:
)
return self.last_values
raise

def close(self):
"""Close the wrapped powermeter to prevent memory leaks"""
self.wrapped_powermeter.close()
5 changes: 5 additions & 0 deletions powermeter/tq_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ def _read_live_json(self) -> dict:
raise _SessionExpired
return data

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self._sess:
self._sess.close()


class _SessionExpired(RuntimeError):
"""Internal marker – triggers transparent re-login."""
Expand Down
5 changes: 5 additions & 0 deletions powermeter/vzlogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ def get_json(self):

def get_powermeter_watts(self):
return [int(self.get_json()["data"][0]["tuples"][0][1])]

def close(self):
"""Close the HTTP session to prevent memory leaks"""
if self.session:
self.session.close()
212 changes: 212 additions & 0 deletions test_memory_leak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
Test script to reproduce the memory leak reported in issue #218.

This script simulates the vzlogger powermeter behavior with high-frequency
HTTP requests and monitors memory usage over time.
"""

import requests
import time
import os
import gc
import tracemalloc
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json


class MockVZLoggerHandler(BaseHTTPRequestHandler):
"""Mock HTTP server simulating vzlogger responses."""

def do_GET(self):
response = {
"data": [{
"tuples": [[int(time.time() * 1000), 1500]]
}]
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode())

def log_message(self, format, *args):
# Suppress log messages
pass


class VZLoggerSimulator:
"""Simulates VZLogger powermeter without closing session."""

def __init__(self, ip: str, port: str, uuid: str):
self.ip = ip
self.port = port
self.uuid = uuid
self.session = requests.Session()

def get_json(self):
url = f"http://{self.ip}:{self.port}/{self.uuid}"
return self.session.get(url, timeout=10).json()

def get_powermeter_watts(self):
return [int(self.get_json()["data"][0]["tuples"][0][1])]


class VZLoggerWithClose:
"""Simulates VZLogger powermeter WITH closing session."""

def __init__(self, ip: str, port: str, uuid: str):
self.ip = ip
self.port = port
self.uuid = uuid
self.session = requests.Session()

def get_json(self):
url = f"http://{self.ip}:{self.port}/{self.uuid}"
return self.session.get(url, timeout=10).json()

def get_powermeter_watts(self):
return [int(self.get_json()["data"][0]["tuples"][0][1])]

def close(self):
if self.session:
self.session.close()


def get_memory_mb():
"""Get current process memory usage in MB using tracemalloc."""
current, peak = tracemalloc.get_traced_memory()
return current / 1024 / 1024


def test_without_close(num_requests=1000, delay=0.01):
"""Test memory usage without closing session."""
print(f"\n{'='*60}")
print("TEST 1: WITHOUT closing session")
print(f"{'='*60}")

# Start mock server
server = HTTPServer(('127.0.0.1', 8888), MockVZLoggerHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
time.sleep(0.5) # Let server start

pm = VZLoggerSimulator('127.0.0.1', '8888', 'test-uuid')

start_memory = get_memory_mb()
print(f"Initial memory: {start_memory:.2f} MB")

# Make many requests
for i in range(num_requests):
try:
pm.get_powermeter_watts()
except Exception as e:
print(f"Error at request {i}: {e}")
break

# Report memory every 100 requests
if (i + 1) % 100 == 0:
current_memory = get_memory_mb()
growth = current_memory - start_memory
print(f"Request {i+1:4d}: {current_memory:.2f} MB (growth: {growth:+.2f} MB)")

time.sleep(delay)

final_memory = get_memory_mb()
total_growth = final_memory - start_memory
print(f"\nFinal memory: {final_memory:.2f} MB")
print(f"Total growth: {total_growth:+.2f} MB ({(total_growth/start_memory)*100:+.1f}%)")

server.shutdown()
return start_memory, final_memory, total_growth


def test_with_close(num_requests=1000, delay=0.01):
"""Test memory usage WITH closing session."""
print(f"\n{'='*60}")
print("TEST 2: WITH closing session")
print(f"{'='*60}")

# Start mock server
server = HTTPServer(('127.0.0.1', 8889), MockVZLoggerHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
time.sleep(0.5) # Let server start

pm = VZLoggerWithClose('127.0.0.1', '8889', 'test-uuid')

start_memory = get_memory_mb()
print(f"Initial memory: {start_memory:.2f} MB")

# Make many requests
for i in range(num_requests):
try:
pm.get_powermeter_watts()
except Exception as e:
print(f"Error at request {i}: {e}")
break

# Report memory every 100 requests
if (i + 1) % 100 == 0:
current_memory = get_memory_mb()
growth = current_memory - start_memory
print(f"Request {i+1:4d}: {current_memory:.2f} MB (growth: {growth:+.2f} MB)")

time.sleep(delay)

pm.close() # Close session after use

final_memory = get_memory_mb()
total_growth = final_memory - start_memory
print(f"\nFinal memory: {final_memory:.2f} MB")
print(f"Total growth: {total_growth:+.2f} MB ({(total_growth/start_memory)*100:+.1f}%)")

server.shutdown()
return start_memory, final_memory, total_growth


def main():
# Start memory tracking
tracemalloc.start()

print("Memory Leak Reproduction Test for Issue #218")
print("=" * 60)

# Test parameters - simulating 1-second polling
# Using 1000 requests with 0.01s delay = 10 seconds total
# (scaled down from 5 days for quick testing)
num_requests = 1000
delay = 0.01

print(f"\nTest parameters:")
print(f" Requests: {num_requests}")
print(f" Delay: {delay}s between requests")
print(f" Total duration: ~{num_requests * delay:.1f} seconds")

# Run test without closing session
start1, end1, growth1 = test_without_close(num_requests, delay)

# Give system time to settle
time.sleep(2)

# Run test with closing session
start2, end2, growth2 = test_with_close(num_requests, delay)

# Summary
print(f"\n{'='*60}")
print("SUMMARY")
print(f"{'='*60}")
print(f"WITHOUT close(): {growth1:+.2f} MB growth ({(growth1/start1)*100:+.1f}%)")
print(f"WITH close(): {growth2:+.2f} MB growth ({(growth2/start2)*100:+.1f}%)")
print(f"Difference: {growth1-growth2:.2f} MB")

if abs(growth1 - growth2) > 1.0: # More than 1MB difference
print("\n⚠️ MEMORY LEAK DETECTED!")
print(" Closing the session makes a significant difference.")
else:
print("\n✓ No significant memory leak detected.")
print(" The issue may be elsewhere or require longer testing.")


if __name__ == "__main__":
main()
Loading