-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
1731 lines (1535 loc) · 66.4 KB
/
server.py
File metadata and controls
1731 lines (1535 loc) · 66.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Brouwerij Admin - backend server
Serves the HTML app and stores all data as JSON files in /data/
Supports Home Assistant Ingress: requests arrive with a path prefix like
/api/hassio_ingress/<TOKEN>/api/data/<key>
The server strips any prefix and looks for /api/data/<key> anywhere in the path.
"""
import base64
import datetime
import email.message
import email.utils
import http.server
import io
import ipaddress
import json
import os
import re
import shutil
import smtplib
import socket
import ssl
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
import zipfile
from collections import defaultdict
from pathlib import Path
DATA_DIR = Path('/data')
STATIC_FILE = Path('/app/static/index.html')
MAX_CONTENT_LENGTH = 10 * 1024 * 1024 # 10 MB — bescherming tegen DoS via grote requests
UPLOAD_DIR = DATA_DIR / 'inkoop_facturen'
BACKUP_DIR = DATA_DIR / 'backups'
DATA_DIR.mkdir(parents=True, exist_ok=True)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
API_DATA_PREFIX = '/api/data/'
BF_API_BASE = 'https://api.brewfather.app/v2'
BF_PROXY_PREFIX = '/api/brewfather/'
BF_TEST_PATH = '/api/brewfather/test'
BF_PATCH_PREFIX = '/api/brewfather/patch/'
WC_API_PATH = '/wp-json/wc/v3'
WC_PROXY_PREFIX = '/api/woocommerce/'
WC_PING_PATH = '/api/woocommerce/ping'
WC_TEST_PATH = '/api/woocommerce/test'
WC_PUT_PREFIX = '/api/woocommerce/put/'
UPLOAD_PREFIX = '/api/upload/'
FILE_PREFIX = '/api/file/'
DELETE_UPLOAD_PREFIX = '/api/delete_upload/'
DOWNLOAD_BIJLAGEN_PREFIX = '/api/download_bijlagen/'
BACKUPS_PREFIX = '/api/backups'
BACKUPS_TRIGGER_PATH = '/api/backups/trigger'
CLAUDE_PROXY_PREFIX = '/api/claude/'
ANTHROPIC_API_BASE = 'https://api.anthropic.com'
CLAUDE_MAX_CONTENT = 20 * 1024 * 1024 # 20 MB — PDF + images can be large
HA_PROXY_PREFIX = '/api/homeassistant/'
HA_SUPERVISOR_BASE = 'http://supervisor/core/api'
MAIL_SEND_PATH = '/api/mail/send'
MAIL_TEST_PATH = '/api/mail/test'
MAIL_MAX_CONTENT = 20 * 1024 * 1024 # 20 MB — mail + attachments
MAIL_SECURITY_VALUES = {'none', 'starttls', 'ssl'}
MAIL_EMAIL_RE = re.compile(r'^[^@\s,;<>"]+@[^@\s,;<>"]+\.[^@\s,;<>"]+$')
# Whitelist van toegestane HA service-calls. Houdt de attack-surface klein:
# alleen schrijfacties die de UI expliciet aanbiedt zijn toegestaan. Voeg een
# nieuwe service pas toe als er ook een UI-knop of automatisering voor bestaat.
HA_ALLOWED_SERVICES = {
('climate', 'set_temperature'),
('climate', 'set_hvac_mode'),
('climate', 'set_preset_mode'),
('climate', 'turn_on'),
('climate', 'turn_off'),
('light', 'turn_on'),
('light', 'turn_off'),
('light', 'toggle'),
('switch', 'turn_on'),
('switch', 'turn_off'),
('switch', 'toggle'),
}
# HA-domeinen die via het list-endpoint uit te filteren zijn. Onbekende waarden
# worden afgewezen zodat de frontend niet per ongeluk (of kwaadaardig) een
# heel andere integratie kan opvragen.
HA_ALLOWED_LIST_DOMAINS = {'sensor', 'climate', 'light', 'switch', 'binary_sensor'}
_ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'webp', 'tiff', 'bmp', 'heic', 'heif'}
_CONTENT_TYPES = {
'pdf': 'application/pdf',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'webp': 'image/webp',
'tiff': 'image/tiff',
'bmp': 'image/bmp',
'heic': 'image/heic',
'heif': 'image/heif',
}
# Rate limiting: max requests per window per IP
_RATE_WINDOW = 60 # seconds
_RATE_MAX = 120 # requests per window
_rate_buckets: dict = defaultdict(list)
def _check_rate(ip: str) -> bool:
now = time.monotonic()
bucket = _rate_buckets[ip]
_rate_buckets[ip] = [t for t in bucket if now - t < _RATE_WINDOW]
if len(_rate_buckets[ip]) >= _RATE_MAX:
return False
_rate_buckets[ip].append(now)
return True
def _retry_after(ip: str) -> int:
# Seconden tot de oudste request in het venster vervalt (min. 1s).
bucket = _rate_buckets.get(ip) or []
if not bucket:
return 1
wait = _RATE_WINDOW - (time.monotonic() - bucket[0])
return max(1, int(wait) + 1)
# Security headers added to every response
_SEC_HEADERS = [
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
('Permissions-Policy', 'geolocation=(), microphone=(), camera=()'),
]
# Extra headers only on the HTML page
_CSP = (
"default-src 'none'; "
"script-src 'unsafe-inline' "
"https://unpkg.com https://cdn.tailwindcss.com https://cdn.sheetjs.com; "
"style-src 'unsafe-inline'; "
"worker-src blob: https://unpkg.com; "
"connect-src 'self' https://unpkg.com; "
"img-src 'self' data: blob:; "
"frame-src blob: 'self'; "
"font-src 'self' data:; "
"base-uri 'self'; "
"form-action 'self'"
)
_HTML_EXTRA = [('Content-Security-Policy', _CSP), ('Cache-Control', 'no-cache, must-revalidate')]
_TRUSTED_ORIGINS = frozenset((
'http://localhost:5173', # Vite dev server
'http://localhost:8099', # production preview
'http://127.0.0.1:5173',
'http://127.0.0.1:8099',
))
def _trusted_origin(origin: str) -> str | None:
"""Return origin if it is a known dev/preview origin, else None."""
return origin if origin in _TRUSTED_ORIGINS else None
def _is_private_url(url: str) -> bool:
"""Block requests to private/internal IP ranges (SSRF protection)."""
try:
host = urllib.parse.urlparse(url).hostname or ''
for info in socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM):
addr = info[4][0]
ip = ipaddress.ip_address(addr)
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
return True
except Exception:
return True # als DNS niet lukt, blokkeer
return False
def _valid_key(key: str) -> bool:
return bool(key) and all(c.isalnum() or c == '_' for c in key)
def _valid_upload_filename(name: str) -> bool:
"""Allow only safe characters in upload filenames; extension must be allowed."""
if not name or len(name) > 200 or name.startswith('.'):
return False
if '.' not in name:
return False
base, ext = name.rsplit('.', 1)
ext = ext.lower()
if ext not in _ALLOWED_EXTENSIONS:
return False
return bool(base) and all(c.isalnum() or c in '-_.' for c in name)
def _extract_upload_filename(path: str, prefix: str) -> str | None:
"""Extract and validate filename from an upload/file/delete_upload path."""
idx = path.find(prefix)
if idx < 0:
return None
filename = path[idx + len(prefix):].strip('/')
return filename if _valid_upload_filename(filename) else None
def _valid_bf_path(path: str) -> bool:
"""Allow only safe characters in a Brewfather sub-path + query string."""
return bool(path) and all(c.isalnum() or c in '-_/?=&.' for c in path)
def _valid_wc_path(path: str) -> bool:
"""Allow safe characters for a WooCommerce API sub-path + query string."""
return bool(path) and all(c.isalnum() or c in '-_/?=&.:%+,[]@' for c in path)
def _load_wc_creds() -> dict | None:
"""Read stored WooCommerce credentials; returns dict or None."""
creds_file = DATA_DIR / 'woocommerce_creds.json'
if not creds_file.exists():
return None
try:
creds = json.loads(creds_file.read_bytes())
url = str(creds.get('storeUrl', '')).strip().rstrip('/')
key = str(creds.get('consumerKey', '')).strip()
secret = str(creds.get('consumerSecret', '')).strip()
if not (url.startswith('https://') and key and secret):
return None
return {'url': url, 'key': key, 'secret': secret}
except Exception:
return None
def _wc_request(creds: dict, method: str, subpath: str, body: bytes | None = None) -> tuple[int, bytes]:
"""Make a GET or PUT request to the WooCommerce REST API."""
auth = base64.b64encode(f'{creds["key"]}:{creds["secret"]}'.encode()).decode()
url = f'{creds["url"]}{WC_API_PATH}/{subpath}'
req = urllib.request.Request(
url,
data=body,
headers={
'Authorization': f'Basic {auth}',
'Accept': 'application/json',
'Content-Type': 'application/json',
},
method=method,
)
try:
with urllib.request.urlopen(req, timeout=8) as resp:
return resp.status, resp.read()
except urllib.error.HTTPError as e:
return e.code, e.read() or b'{}'
except Exception:
return 502, json.dumps({'error': 'upstream request failed'}).encode()
def extract_key(path: str) -> str | None:
"""Extract data key from path, supporting ingress path prefixes."""
path = path.split('?')[0]
idx = path.find(API_DATA_PREFIX)
if idx < 0:
return None
key = path[idx + len(API_DATA_PREFIX):].strip('/')
return key if _valid_key(key) else None
def _load_claude_creds() -> str | None:
"""Read stored Claude/Anthropic API key; returns the key string or None."""
creds_file = DATA_DIR / 'claude_creds.json'
if not creds_file.exists():
return None
try:
creds = json.loads(creds_file.read_bytes())
key = str(creds.get('apiKey', '')).strip()
return key if key.startswith('sk-ant-') else None
except Exception:
return None
def _load_bf_creds() -> tuple[str, str] | None:
"""Read stored Brewfather credentials; returns (userId, apiKey) or None."""
creds_file = DATA_DIR / 'brewfather_creds.json'
if not creds_file.exists():
return None
try:
creds = json.loads(creds_file.read_bytes())
uid = str(creds.get('userId', '')).strip()
key = str(creds.get('apiKey', '')).strip()
return (uid, key) if uid and key else None
except Exception:
return None
def _load_smtp_creds() -> dict | None:
"""Read stored SMTP credentials; returns a sanitized dict or None.
Verwacht JSON-vorm:
{host, port, username, password, fromEmail, fromName, security, enabled}
`security` is een van 'none', 'starttls', 'ssl'.
"""
creds_file = DATA_DIR / 'smtp_creds.json'
if not creds_file.exists():
return None
try:
c = json.loads(creds_file.read_bytes())
except Exception:
return None
host = str(c.get('host', '')).strip()
try:
port = int(c.get('port', 0))
except (TypeError, ValueError):
return None
if not host or port < 1 or port > 65535:
return None
if _is_private_url(f'http://{host}'):
# Lokale/private SMTP-hosts blokkeren we niet: een interne mailserver
# is een legitiem scenario voor een HA-addon. Maar we geven hier wel
# een hint terug via False voor de privacy-check zodat aanroepers
# kunnen besluiten dit alleen tijdens een test te accepteren. In de
# praktijk laten we het door — alleen het _send_mail-pad gebruikt deze.
pass
security = str(c.get('security', 'starttls')).strip().lower()
if security not in MAIL_SECURITY_VALUES:
security = 'starttls'
from_email = str(c.get('fromEmail', '')).strip()
if from_email and not MAIL_EMAIL_RE.match(from_email):
return None
return {
'host': host,
'port': port,
'username': str(c.get('username', '')),
'password': str(c.get('password', '')),
'fromEmail': from_email,
'fromName': str(c.get('fromName', '')).strip(),
'security': security,
'enabled': bool(c.get('enabled')),
}
def _smtp_connect(creds: dict, timeout: float = 15.0) -> smtplib.SMTP:
"""Open een SMTP-verbinding volgens de opgegeven beveiligingsmodus.
De caller is verantwoordelijk voor .quit()."""
host, port, security = creds['host'], creds['port'], creds['security']
if security == 'ssl':
ctx = ssl.create_default_context()
client: smtplib.SMTP = smtplib.SMTP_SSL(host, port, timeout=timeout, context=ctx)
else:
client = smtplib.SMTP(host, port, timeout=timeout)
client.ehlo()
if security == 'starttls':
ctx = ssl.create_default_context()
client.starttls(context=ctx)
client.ehlo()
if creds.get('username'):
client.login(creds['username'], creds.get('password', ''))
return client
def _valid_recipient_list(recipients) -> list[str] | None:
"""Valideer een lijst van e-mailadressen; max 50 ontvangers per request."""
if isinstance(recipients, str):
recipients = [recipients]
if not isinstance(recipients, list) or not recipients:
return None
if len(recipients) > 50:
return None
cleaned: list[str] = []
for r in recipients:
if not isinstance(r, str):
return None
addr = r.strip()
if not addr or not MAIL_EMAIL_RE.match(addr):
return None
cleaned.append(addr)
return cleaned
def _build_email(creds: dict, payload: dict) -> tuple[email.message.EmailMessage, list[str]] | None:
"""Bouw een EmailMessage uit de request-payload. Returns (msg, recipients)
of None bij invalide input."""
to_list = _valid_recipient_list(payload.get('to'))
if not to_list:
return None
cc_list = _valid_recipient_list(payload.get('cc')) if payload.get('cc') else []
bcc_list = _valid_recipient_list(payload.get('bcc')) if payload.get('bcc') else []
if payload.get('cc') and cc_list is None:
return None
if payload.get('bcc') and bcc_list is None:
return None
subject = str(payload.get('subject', '')).strip()
if not subject or len(subject) > 500:
return None
text_body = str(payload.get('text', ''))
html_body = payload.get('html')
if html_body is not None and not isinstance(html_body, str):
return None
msg = email.message.EmailMessage()
from_name = creds.get('fromName', '')
from_email = creds.get('fromEmail') or creds.get('username', '')
if not MAIL_EMAIL_RE.match(from_email or ''):
return None
msg['From'] = email.utils.formataddr((from_name, from_email)) if from_name else from_email
msg['To'] = ', '.join(to_list)
if cc_list:
msg['Cc'] = ', '.join(cc_list)
msg['Subject'] = subject
msg['Date'] = email.utils.formatdate(localtime=True)
msg['Message-ID'] = email.utils.make_msgid()
reply_to = str(payload.get('replyTo', '')).strip()
if reply_to:
if not MAIL_EMAIL_RE.match(reply_to):
return None
msg['Reply-To'] = reply_to
msg.set_content(text_body or ' ')
if html_body:
msg.add_alternative(html_body, subtype='html')
# Bijlagen: lijst van {filename, contentBase64, mimeType?}
attachments = payload.get('attachments') or []
if not isinstance(attachments, list):
return None
if len(attachments) > 10:
return None
total_size = 0
for att in attachments:
if not isinstance(att, dict):
return None
fname = str(att.get('filename', '')).strip()
if not fname or len(fname) > 200:
return None
# Voorkom path-componenten in de bijlagenaam — naam mag geen / of \ bevatten.
if '/' in fname or '\\' in fname or fname.startswith('.'):
return None
b64 = att.get('contentBase64', '')
if not isinstance(b64, str):
return None
try:
content = base64.b64decode(b64, validate=True)
except Exception:
return None
total_size += len(content)
if total_size > 15 * 1024 * 1024: # 15 MB totaal aan bijlagen
return None
mt = str(att.get('mimeType', 'application/octet-stream'))
if '/' not in mt:
mt = 'application/octet-stream'
maintype, _, subtype = mt.partition('/')
msg.add_attachment(content, maintype=maintype, subtype=subtype, filename=fname)
# Inline images: lijst van {filename, contentBase64, mimeType, contentId}.
# Worden via add_related op de HTML-alternative gekoppeld zodat het de
# standaard multipart/related-structuur krijgt (Gmail, Outlook, Apple Mail).
inline_imgs = payload.get('inlineImages') or []
if not isinstance(inline_imgs, list):
return None
if len(inline_imgs) > 10:
return None
if inline_imgs and not html_body:
# Inline images zonder HTML-body hebben geen verwijzingspunt.
return None
for img in inline_imgs:
if not isinstance(img, dict):
return None
cid = str(img.get('contentId', '')).strip()
# CID moet veilig zijn voor in een header — strikt alfanum + . _ -
if not cid or not re.match(r'^[A-Za-z0-9._-]{1,80}$', cid):
return None
ifname = str(img.get('filename', '')).strip() or f'{cid}.png'
if len(ifname) > 200 or '/' in ifname or '\\' in ifname or ifname.startswith('.'):
return None
ib64 = img.get('contentBase64', '')
if not isinstance(ib64, str):
return None
try:
icontent = base64.b64decode(ib64, validate=True)
except Exception:
return None
total_size += len(icontent)
if total_size > 15 * 1024 * 1024:
return None
imt = str(img.get('mimeType', 'image/png'))
if not imt.startswith('image/') or '/' not in imt:
return None
imaintype, _, isubtype = imt.partition('/')
html_part = msg.get_body(preferencelist=('html',))
if html_part is None:
return None
# `disposition='inline'` zorgt dat strikte mailclients het beeld
# daadwerkelijk in de body renderen i.p.v. als losse download tonen.
html_part.add_related(icontent, maintype=imaintype, subtype=isubtype,
cid=f'<{cid}>', filename=ifname, disposition='inline')
return msg, to_list + cc_list + bcc_list
def _bf_request(uid: str, api_key: str, url: str, method: str = 'GET', data: bytes | None = None) -> tuple[int, bytes]:
"""Make a request to the Brewfather API; returns (status, body)."""
auth = base64.b64encode(f'{uid}:{api_key}'.encode()).decode()
headers = {'Authorization': f'Basic {auth}', 'Accept': 'application/json'}
if data is not None:
headers['Content-Type'] = 'application/json'
req = urllib.request.Request(
url, data=data, headers=headers, method=method,
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, resp.read()
except urllib.error.HTTPError as e:
return e.code, b'{}'
except Exception:
return 502, b'{}'
# ── Backup system (AGP 7-year retention) ──────────────────────────────────
def _run_backup() -> str:
"""Copy all /data/*.json files into /data/backups/YYYY-MM-DD/.
Returns the backup date string."""
today = datetime.date.today().isoformat()
dest = BACKUP_DIR / today
dest.mkdir(parents=True, exist_ok=True)
for f in DATA_DIR.glob('*.json'):
shutil.copy2(f, dest / f.name)
return today
def _should_keep_backup(backup_date: datetime.date, today: datetime.date) -> bool:
"""Determine whether a backup should be retained based on AGP policy.
- Daily backups: keep for 30 days
- Weekly (Monday) backups: keep for 1 year
- Monthly (1st of month) backups: keep for 7 years
"""
age = (today - backup_date).days
# Monthly backups (1st of month): keep 7 years
if backup_date.day == 1 and age <= 7 * 365:
return True
# Weekly backups (Monday): keep 1 year
if backup_date.weekday() == 0 and age <= 365:
return True
# Daily backups: keep 30 days
if age <= 30:
return True
return False
def _cleanup_backups() -> None:
"""Remove backup directories that no longer meet the retention policy."""
today = datetime.date.today()
for entry in sorted(BACKUP_DIR.iterdir()):
if not entry.is_dir():
continue
try:
backup_date = datetime.date.fromisoformat(entry.name)
except ValueError:
continue
if not _should_keep_backup(backup_date, today):
shutil.rmtree(entry, ignore_errors=True)
def _backup_loop(interval: float = 86400.0) -> None:
"""Background loop: run backup + cleanup once per day."""
while True:
try:
_run_backup()
_cleanup_backups()
except Exception as exc:
print(f'[backup] error: {exc}', flush=True)
time.sleep(interval)
# ── Automatische gistmetingen ─────────────────────────────────────────────
_data_lock = threading.Lock()
def _read_json(key: str, default=None):
"""Lees een JSON-databestand uit /data/. Geeft default terug als bestand niet bestaat."""
filepath = DATA_DIR / f'{key}.json'
if not filepath.exists():
return default
try:
return json.loads(filepath.read_text(encoding='utf-8'))
except (json.JSONDecodeError, OSError):
return default
def _write_json(key: str, data) -> None:
"""Schrijf data als JSON naar /data/."""
filepath = DATA_DIR / f'{key}.json'
filepath.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8')
def _ha_fetch_state(entity_id: str) -> float | None:
"""Haal de huidige waarde van een HA-entiteit op. Geeft None terug bij fout."""
token = os.environ.get('SUPERVISOR_TOKEN', '')
if not token:
return None
try:
req = urllib.request.Request(
f'{HA_SUPERVISOR_BASE}/states/{entity_id}',
headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req, timeout=5) as r:
data = json.loads(r.read())
val = float(data.get('state', ''))
return val
except (urllib.error.URLError, ValueError, TypeError, OSError):
return None
def _ha_fetch_climate_setpoint(entity_id: str) -> float | None:
"""Haal het huidige setpoint (`attributes.temperature`) van een climate-entity op."""
token = os.environ.get('SUPERVISOR_TOKEN', '')
if not token:
return None
try:
req = urllib.request.Request(
f'{HA_SUPERVISOR_BASE}/states/{entity_id}',
headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req, timeout=5) as r:
data = json.loads(r.read())
attrs = data.get('attributes', {}) or {}
sp = attrs.get('temperature')
return float(sp) if sp is not None else None
except (urllib.error.URLError, ValueError, TypeError, OSError):
return None
def _ha_set_climate_temperature(entity_id: str, temperature: float) -> bool:
"""Stuur een climate.set_temperature service-call naar HA. Return True bij success."""
token = os.environ.get('SUPERVISOR_TOKEN', '')
if not token:
return False
if not re.match(r'^climate\.[a-z0-9][a-z0-9_-]*$', entity_id):
return False
try:
payload = json.dumps({'entity_id': entity_id, 'temperature': temperature}).encode('utf-8')
req = urllib.request.Request(
f'{HA_SUPERVISOR_BASE}/services/climate/set_temperature',
data=payload,
headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'},
method='POST',
)
with urllib.request.urlopen(req, timeout=10):
return True
except (urllib.error.URLError, OSError):
return False
def _auto_metingen_loop(interval: float = 600.0) -> None:
"""Achtergrondloop: haal elke 10 minuten temperatuurmetingen op van HA sensoren
voor batches in status Vergisten of Conditioneren, en sla deze op."""
time.sleep(30) # wacht even tot server volledig opgestart is
while True:
try:
_auto_metingen_tick()
except Exception as exc:
print(f'[auto-metingen] error: {exc}', flush=True)
time.sleep(interval)
def _cold_crash_loop(interval: float = 60.0) -> None:
"""Achtergrondloop: check elke minuut of er een cold-crash-batch is die
een stap naar beneden moet. De daadwerkelijke stap gebeurt alleen als er
>= 1 uur is verstreken sinds de vorige stap — dit interval bepaalt slechts
hoe snel de app na dat uur reageert."""
time.sleep(20) # kort wachten zodat eerste tick snel na start draait
while True:
try:
_cold_crash_tick()
except Exception as exc:
print(f'[cold-crash] error: {exc}', flush=True)
time.sleep(interval)
def _auto_metingen_tick() -> None:
"""Eén ronde automatische metingen: check HA-instellingen, lees batches,
haal temperatuur op voor elke actieve batch met sensor, en sla metingen op."""
with _data_lock:
ha_inst = _read_json('ha_instellingen', {})
if not ha_inst.get('enabled'):
return
sensors = ha_inst.get('sensors', [])
if not sensors:
return
with _data_lock:
batches = _read_json('batches', [])
active = [b for b in batches
if b.get('tank') and b.get('status') in ('Vergisten', 'Conditioneren')]
if not active:
return
new_entries = []
now = datetime.datetime.now()
datum = now.strftime('%Y-%m-%d')
tijd = now.strftime('%H:%M')
for batch in active:
sensor = next((s for s in sensors if s.get('tank') == batch.get('tank')), None)
if not sensor or not sensor.get('entity'):
continue
val = _ha_fetch_state(sensor['entity'])
if val is None:
continue
new_entries.append({
'batch_id': batch['id'],
'datum': datum,
'tijd': tijd,
'temp': val,
'auto': True,
})
if not new_entries:
return
with _data_lock:
metingen = _read_json('gist_metingen', [])
max_id = max((m.get('id', 0) for m in metingen), default=0)
for entry in new_entries:
max_id += 1
entry['id'] = max_id
metingen.append(entry)
_write_json('gist_metingen', metingen)
print(f'[auto-metingen] {len(new_entries)} meting(en) opgeslagen', flush=True)
def _cold_crash_tick() -> None:
"""Zet voor elke batch in Conditioneren met een actieve cold-crash de
climate-setpoint één ramp-stap naar beneden zodra er minstens een uur is
verstreken sinds de vorige stap. Stopt wanneer het target is bereikt."""
with _data_lock:
batches = _read_json('batches', []) or []
active = [b for b in batches
if b.get('status') == 'Conditioneren' and b.get('cold_crash_datum')]
if not active:
return # niets te doen — blijf stil in de logs
with _data_lock:
ha_inst = _read_json('ha_instellingen', {}) or {}
if not ha_inst.get('climates_enabled'):
print(f"[cold-crash] {len(active)} actieve batch(es), maar climates_enabled=false in ha_instellingen — skip", flush=True)
return
climates = ha_inst.get('climates', []) or []
if not climates:
print(f"[cold-crash] {len(active)} actieve batch(es), maar geen climates geconfigureerd — skip", flush=True)
return
# Gebruik UTC met tzinfo: de frontend slaat `new Date().toISOString()` op
# (altijd UTC met `Z`), dus `last_dt` is offset-aware. Een naive `now()`
# zou `can't subtract offset-naive and offset-aware datetimes` opleveren.
now = datetime.datetime.now(datetime.timezone.utc)
updated: list[dict] = []
for batch in active:
batch_id = batch.get('id')
# cold_crash_target blijft leidend: zonder target weten we niet waar
# heen te stappen, dus die batch slaan we over.
try:
target = float(batch.get('cold_crash_target'))
except (TypeError, ValueError):
print(f"[cold-crash] batch {batch_id}: ongeldig cold_crash_target — skip", flush=True)
continue
try:
batch_ramp = float(batch.get('cold_crash_ramp') or 1)
except (TypeError, ValueError):
batch_ramp = 1.0
if batch_ramp <= 0:
print(f"[cold-crash] batch {batch_id}: ramp<=0 — skip", flush=True)
continue
climate = next((c for c in climates if c.get('tank') == batch.get('tank') and c.get('entity')), None)
if not climate:
print(f"[cold-crash] batch {batch_id}: geen climate gekoppeld aan tank {batch.get('tank')!r} — skip", flush=True)
continue
entity_id = climate['entity']
# Tijdstip van de laatste stap: bij de allereerste tick is dat het
# moment dat de cold-crash werd gestart (frontend heeft dan al één
# stap lager gezet). Pas daarna volgen uurlijkse stappen.
last_iso = batch.get('cold_crash_laatste_stap') or batch.get('cold_crash_datum')
try:
# `Z`-suffix expliciet vervangen — Python <3.11 slikt dat niet in
# fromisoformat, en ook oudere records zonder offset normaliseren
# we naar UTC zodat arithmetiek met `now` (aware) werkt.
iso_norm = (last_iso or '').replace('Z', '+00:00')
last_dt = datetime.datetime.fromisoformat(iso_norm)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
except (TypeError, ValueError):
print(f"[cold-crash] batch {batch_id}: ongeldig timestamp {last_iso!r} — skip", flush=True)
continue
elapsed_h = (now - last_dt).total_seconds() / 3600.0
if elapsed_h < 1.0:
# Geen ruis in de logs: alleen één keer per 10 minuten melden dat
# we wachten. (int(min) % 10 == 0)
mins = int((now - last_dt).total_seconds() / 60)
if mins > 0 and mins % 10 == 0:
print(f"[cold-crash] batch {batch_id}: wacht op volgend uur (elapsed {mins} min)", flush=True)
continue
current_sp = _ha_fetch_climate_setpoint(entity_id)
if current_sp is None:
print(f"[cold-crash] batch {batch_id}: kon setpoint van {entity_id} niet lezen — skip", flush=True)
continue
if current_sp <= target + 1e-6:
print(f"[cold-crash] batch {batch_id}: setpoint {current_sp}°C <= target {target}°C — klaar", flush=True)
continue
# Doe zoveel hele uur-stappen als mogelijk in één tick — voorkomt
# dat een korte serveronderbreking de ramp uit de pas laat lopen.
steps = int(elapsed_h)
if steps < 1:
continue
new_sp = max(target, current_sp - batch_ramp * steps)
# Rond af op één decimaal om gekke floats te vermijden.
new_sp = round(new_sp, 2)
if not _ha_set_climate_temperature(entity_id, new_sp):
print(f"[cold-crash] batch {batch_id}: set_temperature({entity_id}, {new_sp}) faalde — skip", flush=True)
continue
# Verplaats de "laatste stap"-ijkpunt vooruit per hele uur, zodat
# fracties van het uur bewaard blijven voor de volgende tick.
next_last = (last_dt + datetime.timedelta(hours=steps)).isoformat()
updated.append({
'id': batch.get('id'),
'new_sp': new_sp,
'steps': steps,
'next_last': next_last,
})
if not updated:
return
# Her-lees batches onder de lock en merge alleen het cold-crash-tijdpunt
# terug, zodat we gelijktijdige UI-schrijfacties niet overschrijven.
step_map = {u['id']: u['next_last'] for u in updated}
with _data_lock:
current = _read_json('batches', []) or []
for b in current:
nl = step_map.get(b.get('id'))
if nl is not None:
b['cold_crash_laatste_stap'] = nl
_write_json('batches', current)
for u in updated:
print(f"[cold-crash] batch {u['id']}: setpoint → {u['new_sp']}°C ({u['steps']} stap(pen))", flush=True)
def _list_backups() -> list[dict]:
"""Return list of available backups with date and file count."""
result = []
for entry in sorted(BACKUP_DIR.iterdir()):
if not entry.is_dir():
continue
try:
datetime.date.fromisoformat(entry.name)
except ValueError:
continue
files = list(entry.glob('*.json'))
result.append({'date': entry.name, 'file_count': len(files)})
return result
def _backup_to_zip(date_str: str) -> bytes | None:
"""Create a ZIP archive of a backup directory. Returns bytes or None."""
backup_path = BACKUP_DIR / date_str
if not backup_path.is_dir():
return None
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
for f in sorted(backup_path.iterdir()):
if f.is_file():
zf.write(f, f.name)
return buf.getvalue()
class BrouwerijHandler(http.server.BaseHTTPRequestHandler):
# ── helpers ────────────────────────────────────────────────────────────
def _add_security_headers(self, html: bool = False) -> None:
for name, value in _SEC_HEADERS:
self.send_header(name, value)
if html:
for name, value in _HTML_EXTRA:
self.send_header(name, value)
# Restrict CORS to trusted origins only (removes the old wildcard *)
origin = self.headers.get('Origin', '')
allowed = _trusted_origin(origin)
if allowed:
self.send_header('Access-Control-Allow-Origin', allowed)
self.send_header('Vary', 'Origin')
def _rate_check(self) -> bool:
ip = self.client_address[0]
if not _check_rate(ip):
self._json(429, {'error': 'too many requests'}, extra_headers=[('Retry-After', str(_retry_after(ip)))])
return False
return True
def _json(self, status: int, data, extra_headers: list | None = None) -> None:
body = json.dumps(data).encode()
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-store')
if extra_headers:
for name, value in extra_headers:
self.send_header(name, value)
self._add_security_headers()
self.end_headers()
self.wfile.write(body)
def _read_body(self, max_len: int = MAX_CONTENT_LENGTH) -> bytes | None:
length = int(self.headers.get('Content-Length', 0))
if length > max_len:
self._json(413, {'error': 'request too large'})
return None
return self.rfile.read(length)
# ── request routing ────────────────────────────────────────────────────
def do_OPTIONS(self):
origin = self.headers.get('Origin', '')
allowed = _trusted_origin(origin)
self.send_response(204)
if allowed:
self.send_header('Access-Control-Allow-Origin', allowed)
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.send_header('Vary', 'Origin')
for name, value in _SEC_HEADERS:
self.send_header(name, value)
self.end_headers()
def do_GET(self):
if not self._rate_check():
return
path = self.path.split('?')[0]
if BF_PROXY_PREFIX in path:
self._bf_proxy_get()
return
if WC_PING_PATH in path:
self._json(200, {'ok': True, 'server': 'wc-ready'})
return
if WC_PROXY_PREFIX in path and WC_PUT_PREFIX not in path:
self._wc_proxy_get()
return
if BACKUPS_PREFIX in path and BACKUPS_TRIGGER_PATH not in path:
self._handle_backups_get()
return
if FILE_PREFIX in path:
self._serve_upload()
return
if DOWNLOAD_BIJLAGEN_PREFIX in path:
self._serve_bijlagen_zip()
return
if HA_PROXY_PREFIX in path:
self._ha_proxy(path)
return
key = extract_key(path)
if key is not None:
filepath = DATA_DIR / f'{key}.json'
if filepath.exists():
body = filepath.read_bytes()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-store')
self._add_security_headers()
self.end_headers()
self.wfile.write(body)
else:
self._json(404, None)
return
# Serve the SPA for all other GET requests