-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathelk_writer.py
More file actions
85 lines (70 loc) · 2.8 KB
/
elk_writer.py
File metadata and controls
85 lines (70 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
elk_writer.py — ELK output extension for log-explainer.
Writes one JSON line per explained log to explained-logs.jsonl.
Filebeat tails that file and ships documents to Elasticsearch.
Usage (from log_parser.py):
from elk_writer import ELKWriter
writer = ELKWriter(output_file="explained-logs.jsonl", model="qwen2.5-coder:1.5b")
writer.write(raw_log=line, explanation=text, severity=severity)
"""
import json
import re
import os
from datetime import datetime, timezone
# Matches: 2026-05-23 10:45:43 ERROR ServiceName - message
_LOG_PATTERN = re.compile(
r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[\.,]?\d*\s+"
r"(?P<level>INFO|WARN|WARNING|ERROR|CRITICAL|FATAL|DEBUG)\s+"
r"(?P<service>\S+)\s+-\s+(?P<message>.+)$"
)
_ERROR_TYPE_PATTERNS = {
"system": [
r"OutOfMemory", r"OOMKilled", r"NullPointer", r"StackOverflow",
r"ConnectException", r"Connection refused", r"SQLTimeout",
r"HikariPool.*Unable", r"heap space", r"segfault",
],
"business": [
r"not found", r"declined", r"out of stock", r"expired",
r"invalid", r"unauthorized", r"forbidden", r"session expired",
],
}
def _detect_error_type(raw_log: str) -> str:
"""Classify as system or business error based on log content."""
for error_type, patterns in _ERROR_TYPE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, raw_log, re.IGNORECASE):
return error_type
return "unknown"
def _parse_service(raw_log: str) -> str:
"""Extract service name from log line if parseable."""
m = _LOG_PATTERN.match(raw_log)
if m:
return m.group("service")
return "unknown"
class ELKWriter:
def __init__(self, output_file: str = "explained-logs.jsonl", model: str = "unknown"):
self.output_file = output_file
self.model = model
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
def write(self, raw_log: str, explanation: str, severity: str) -> None:
"""Write one explained log document as a JSON line."""
raw_log = raw_log.strip()
explanation = explanation.strip()
if not raw_log:
return
document = {
"@timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"raw_log": raw_log,
"explanation": explanation,
"level": severity,
"app_service": _parse_service(raw_log),
"error_type": _detect_error_type(raw_log) if severity in ("ERROR", "CRITICAL") else "none",
"model": self.model,
}
try:
with open(self.output_file, "a") as f:
f.write(json.dumps(document) + "\n")
except OSError as e:
print(f" [elk_writer] Write failed: {e}")