Skip to content

Commit 753bc90

Browse files
committed
feat: Major security and reliability improvements for v2.0.0
Security fixes: - Add HTTP request timeout (10s) to prevent hanging - Add base64 decode size limit (1MB) to prevent DoS - Validate HTTP methods against whitelist - Fix expression variable naming to escape brackets Reliability improvements: - Add SIGTERM/SIGINT signal handling for graceful shutdown - Add resource cleanup (MQTT disconnect, InfluxDB close) - Improve exception handling with specific error types - Add timestamp fallback logging Configuration validation: - Require at least one field per point - Validate TLS cert/key relationship - Validate MQTT topic patterns - Validate cron schedule ranges - Validate field types New features: - Add QoS configuration per point (0-2) - Add binary payload support ($.payload_binary.hex) - Safer debug logging (payload size, not content)
1 parent 72b562a commit 753bc90

8 files changed

Lines changed: 309 additions & 61 deletions

File tree

mqtt2influxdb/config.py

Lines changed: 116 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44
import os
55
import re
66
from pathlib import Path
7-
from typing import Any
7+
from typing import Any, Literal
88

99
import jsonpath_ng
1010
import yaml
11-
from pydantic import BaseModel, ConfigDict, Field, field_validator
11+
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
1212

1313
from .expr import parse_expression
1414

1515
# Regex for environment variable substitution: ${VAR} or ${VAR:default}
1616
ENV_VAR_REGEX = re.compile(r"\$\{([^}:]+)(?::([^}]*))?\}")
1717

18-
# Regex for schedule config entries
18+
# Regex for schedule config entries (basic syntax validation)
1919
CRONTAB_REGEX = re.compile(
2020
r"(?P<minute>\*|[0-5]?\d|\*/\d+|\d+-\d+|\d+(,\d+)*)\s+"
2121
r"(?P<hour>\*|[01]?\d|2[0-3]|\*/\d+|\d+-\d+|\d+(,\d+)*)\s+"
@@ -24,6 +24,18 @@
2424
r"(?P<day_of_week>\*|[0-6](-[0-6])?|\*/\d+|\d+(,\d+)*)"
2525
)
2626

27+
# Valid MQTT topic pattern (no empty segments, valid wildcards)
28+
MQTT_TOPIC_REGEX = re.compile(
29+
r"^(?:[^/#+]+|\+)(?:/(?:[^/#+]+|\+))*(?:/#)?$|" # Normal topics with + and trailing #
30+
r"^#$" # Just # is valid
31+
)
32+
33+
# Allowed HTTP methods for forwarding
34+
ALLOWED_HTTP_METHODS = frozenset({"get", "post", "put", "patch", "delete", "head"})
35+
36+
# Allowed field types for type conversion
37+
ALLOWED_FIELD_TYPES = frozenset({"float", "int", "str", "bool", "booltoint"})
38+
2739

2840
class ConfigError(Exception):
2941
"""Configuration validation error."""
@@ -51,6 +63,22 @@ def validate_file_exists(cls, v: Path | None) -> Path | None:
5163
raise ValueError(f"File not found: {v}")
5264
return v
5365

66+
@model_validator(mode="after")
67+
def validate_tls_config(self) -> "MqttConfig":
68+
"""Validate TLS certificate configuration consistency."""
69+
# If client cert is provided, key must also be provided (and vice versa)
70+
if self.certfile and not self.keyfile:
71+
raise ValueError("certfile requires keyfile to be set")
72+
if self.keyfile and not self.certfile:
73+
raise ValueError("keyfile requires certfile to be set")
74+
# If using client certs, CA file should typically be set
75+
if self.certfile and not self.cafile:
76+
logging.warning(
77+
"certfile/keyfile set without cafile - "
78+
"server certificate will not be verified"
79+
)
80+
return self
81+
5482

5583
class InfluxDBConfig(BaseModel):
5684
"""InfluxDB v3 configuration."""
@@ -75,6 +103,24 @@ class HttpConfig(BaseModel):
75103
username: str | None = Field(default=None, min_length=1)
76104
password: str | None = Field(default=None, min_length=1)
77105

106+
@field_validator("action", mode="after")
107+
@classmethod
108+
def validate_http_method(cls, v: str) -> str:
109+
"""Validate HTTP method against allowed methods."""
110+
if v.lower() not in ALLOWED_HTTP_METHODS:
111+
allowed = ", ".join(sorted(ALLOWED_HTTP_METHODS))
112+
raise ValueError(f"Invalid HTTP method: {v}. Allowed: {allowed}")
113+
return v
114+
115+
@model_validator(mode="after")
116+
def validate_auth_config(self) -> "HttpConfig":
117+
"""Validate HTTP authentication configuration consistency."""
118+
if self.username and not self.password:
119+
raise ValueError("username requires password to be set")
120+
if self.password and not self.username:
121+
raise ValueError("password requires username to be set")
122+
return self
123+
78124

79125
class Base64DecodeConfig(BaseModel):
80126
"""Base64 decode configuration."""
@@ -93,6 +139,15 @@ class FieldConfig(BaseModel):
93139
value: str = Field(..., min_length=1)
94140
type: str | None = Field(default=None, min_length=1)
95141

142+
@field_validator("type", mode="after")
143+
@classmethod
144+
def validate_field_type(cls, v: str | None) -> str | None:
145+
"""Validate field type against allowed types."""
146+
if v is not None and v not in ALLOWED_FIELD_TYPES:
147+
allowed = ", ".join(sorted(ALLOWED_FIELD_TYPES))
148+
raise ValueError(f"Invalid field type: {v}. Allowed: {allowed}")
149+
return v
150+
96151

97152
class PointConfig(BaseModel):
98153
"""Measurement point configuration."""
@@ -103,19 +158,76 @@ class PointConfig(BaseModel):
103158
topic: str = Field(..., min_length=1)
104159
bucket: str | None = Field(default=None, min_length=1)
105160
schedule: str | None = Field(default=None, min_length=1)
106-
fields: dict[str, str | FieldConfig] = Field(default_factory=dict)
161+
qos: int = Field(default=0, ge=0, le=2, description="MQTT QoS level (0, 1, or 2)")
162+
fields: dict[str, str | FieldConfig] = Field(..., min_length=1)
107163
tags: dict[str, str] = Field(default_factory=dict)
108164
httpcontent: dict[str, str] = Field(default_factory=dict)
109165

166+
@field_validator("topic", mode="after")
167+
@classmethod
168+
def validate_topic_pattern(cls, v: str) -> str:
169+
"""Validate MQTT topic pattern syntax."""
170+
# Check for empty segments
171+
if "//" in v:
172+
raise ValueError(f"Invalid topic pattern (empty segment): {v}")
173+
# Check for invalid wildcard usage
174+
segments = v.split("/")
175+
for i, segment in enumerate(segments):
176+
# + must be alone in its segment
177+
if "+" in segment and segment != "+":
178+
raise ValueError(
179+
f"Invalid topic pattern (+ must be alone in segment): {v}"
180+
)
181+
# # must be the last segment and alone
182+
if "#" in segment:
183+
if segment != "#" or i != len(segments) - 1:
184+
raise ValueError(
185+
f"Invalid topic pattern (# must be last and alone): {v}"
186+
)
187+
logging.debug("Validated MQTT topic pattern: '%s'", v)
188+
return v
189+
110190
@field_validator("schedule", mode="after")
111191
@classmethod
112192
def validate_schedule(cls, v: str | None) -> str | None:
193+
"""Validate cron schedule syntax."""
113194
if v is not None:
114195
if not CRONTAB_REGEX.match(v):
115196
raise ValueError(f"Invalid cron format: {v}")
197+
# Additional semantic validation
198+
parts = v.split()
199+
if len(parts) == 5:
200+
minute, hour, day, month, dow = parts
201+
# Validate ranges make sense
202+
cls._validate_cron_range(day, 1, 31, "day")
203+
cls._validate_cron_range(month, 1, 12, "month")
204+
cls._validate_cron_range(dow, 0, 6, "day_of_week")
116205
logging.debug("Validated crontab entry: '%s'", v)
117206
return v
118207

208+
@staticmethod
209+
def _validate_cron_range(value: str, min_val: int, max_val: int, name: str) -> None:
210+
"""Validate individual cron field ranges."""
211+
if value == "*" or value.startswith("*/"):
212+
return
213+
# Handle ranges like 1-15
214+
if "-" in value and "," not in value:
215+
try:
216+
start, end = map(int, value.split("-"))
217+
if start > end:
218+
raise ValueError(
219+
f"Invalid cron {name} range: {start}-{end} (start > end)"
220+
)
221+
if start < min_val or end > max_val:
222+
raise ValueError(
223+
f"Invalid cron {name} range: {value} (must be {min_val}-{max_val})"
224+
)
225+
except ValueError as e:
226+
if "Invalid cron" in str(e):
227+
raise
228+
# Not a simple range, skip validation
229+
pass
230+
119231
@field_validator("measurement", mode="after")
120232
@classmethod
121233
def validate_measurement(cls, v: str) -> str:

mqtt2influxdb/expr.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Expression parsing utilities for mqtt2influxdb."""
22

3+
import re
4+
35
import py_expression_eval
46

57

@@ -9,29 +11,46 @@ class ExpressionError(ValueError):
911
pass
1012

1113

14+
# Regex to match array bracket notation like [0], [1], etc.
15+
BRACKET_REGEX = re.compile(r"\[(\d+)\]")
16+
17+
1218
def jsonpath_to_variable(path: str) -> str:
1319
"""Convert JSONPath ($.) to valid expression variable (JSON__).
1420
21+
Handles array bracket notation by converting [n] to _n_.
22+
Example: $.topic[1] -> JSON__topic_1_
23+
1524
Args:
1625
path: JSONPath expression starting with $.
1726
1827
Returns:
1928
Valid expression variable name.
2029
"""
21-
return path.replace("$", "JSON_").replace(".", "_")
30+
# First convert brackets [n] to _n_ to avoid parser confusion
31+
result = BRACKET_REGEX.sub(r"_\1_", path)
32+
# Then do the standard replacements
33+
return result.replace("$", "JSON_").replace(".", "_")
2234

2335

2436
def variable_to_jsonpath(var) -> str:
2537
"""Convert expression variable back to JSONPath.
2638
39+
Reverses the conversion done by jsonpath_to_variable.
40+
Example: JSON__topic_1_ -> $.topic[1]
41+
2742
Args:
2843
var: Expression variable (either string or object with .var attribute).
2944
3045
Returns:
3146
JSONPath expression starting with $.
3247
"""
3348
name = var.var if hasattr(var, "var") else str(var)
34-
return name.replace("JSON_", "$").replace("_", ".")
49+
# First do standard replacements
50+
result = name.replace("JSON_", "$").replace("_", ".")
51+
# Then convert back _n_ patterns to [n] (now they are .n.)
52+
result = re.sub(r"\.(\d+)\.", r"[\1]", result)
53+
return result
3554

3655

3756
def parse_expression(text: str) -> py_expression_eval.Expression:

0 commit comments

Comments
 (0)