Skip to content

Commit 2058ef0

Browse files
committed
Invariance
1 parent 94c7452 commit 2058ef0

12 files changed

Lines changed: 578 additions & 39 deletions

src/mboxer/classify.py

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Any
77

88
from .naming import normalize_category_path
9+
from .records import decode_address_fields
910

1011

1112
def _load_rules(config: dict[str, Any]) -> list[dict[str, Any]]:
@@ -16,10 +17,7 @@ def _match_rule(rule: dict[str, Any], record: dict[str, Any]) -> bool:
1617
match = rule.get("match", {})
1718
sender = (record.get("sender") or "").lower()
1819
subject = (record.get("subject") or "").lower()
19-
try:
20-
recipients = json.loads(record.get("recipients_json") or "[]")
21-
except Exception:
22-
recipients = []
20+
recipients = record["recipients"]
2321
all_addrs = [sender] + [r.lower() for r in recipients]
2422

2523
for domain in match.get("from_domain", []):
@@ -112,7 +110,7 @@ def _build_thread_input(
112110
) -> dict[str, Any]:
113111
"""Aggregate thread messages into a single record suitable for rule matching."""
114112
if not messages:
115-
return {"thread_key": thread_key, "subject": "", "sender": "", "recipients_json": "[]"}
113+
return {"thread_key": thread_key, "subject": "", "sender": "", "recipients": []}
116114

117115
subject = ""
118116
for m in messages:
@@ -128,17 +126,14 @@ def _build_thread_input(
128126
break
129127

130128
# Collect all unique participant addresses across every message in the thread.
131-
# Putting all of them into recipients_json ensures _match_rule's all_addrs covers
132-
# every sender domain, not just the first message's sender.
129+
# Putting all non-sender participants into recipients ensures _match_rule's
130+
# all_addrs covers every sender domain, not just the first message's sender.
133131
all_addrs: set[str] = set()
134132
for m in messages:
135133
if m.get("sender"):
136134
all_addrs.add(m["sender"].lower())
137-
try:
138-
for r in json.loads(m.get("recipients_json") or "[]"):
139-
all_addrs.add(r.lower())
140-
except Exception:
141-
pass
135+
for r in m["recipients"]:
136+
all_addrs.add(r.lower())
142137

143138
sender = messages[0].get("sender") or ""
144139
other_addrs = sorted(all_addrs - {sender.lower()})
@@ -151,7 +146,7 @@ def _build_thread_input(
151146
"thread_key": thread_key,
152147
"subject": subject,
153148
"sender": sender,
154-
"recipients_json": json.dumps(other_addrs),
149+
"recipients": other_addrs,
155150
"_participants": sorted(all_addrs),
156151
"_excerpts": excerpts,
157152
"_message_count": len(messages),
@@ -330,8 +325,8 @@ def _run_rule_classification_thread(
330325
if account_id is not None:
331326
msg_rows = conn.execute(
332327
"""
333-
SELECT id, subject, sender, recipients_json, date_utc, body_text,
334-
thread_key, account_id
328+
SELECT id, subject, sender, recipients_json, cc_json, bcc_json,
329+
date_utc, body_text, thread_key, account_id
335330
FROM messages
336331
WHERE thread_key = ? AND account_id = ?
337332
ORDER BY date_utc NULLS LAST, id
@@ -341,8 +336,8 @@ def _run_rule_classification_thread(
341336
else:
342337
msg_rows = conn.execute(
343338
"""
344-
SELECT id, subject, sender, recipients_json, date_utc, body_text,
345-
thread_key, account_id
339+
SELECT id, subject, sender, recipients_json, cc_json, bcc_json,
340+
date_utc, body_text, thread_key, account_id
346341
FROM messages
347342
WHERE thread_key = ?
348343
ORDER BY date_utc NULLS LAST, id
@@ -351,10 +346,10 @@ def _run_rule_classification_thread(
351346
).fetchall()
352347

353348
cols = [
354-
"id", "subject", "sender", "recipients_json", "date_utc",
355-
"body_text", "thread_key", "account_id",
349+
"id", "subject", "sender", "recipients_json", "cc_json", "bcc_json",
350+
"date_utc", "body_text", "thread_key", "account_id",
356351
]
357-
messages = [dict(zip(cols, row)) for row in msg_rows]
352+
messages = [decode_address_fields(dict(zip(cols, row))) for row in msg_rows]
358353
if not messages:
359354
continue
360355

@@ -404,7 +399,7 @@ def run_rule_classification(
404399

405400
query = """
406401
SELECT m.id, m.account_id, m.source_id, m.mbox_key, m.message_id, m.thread_key,
407-
m.subject, m.sender, m.recipients_json, m.date_utc
402+
m.subject, m.sender, m.recipients_json, m.cc_json, m.bcc_json, m.date_utc
408403
FROM messages m
409404
LEFT JOIN classifications c
410405
ON c.message_db_id = m.id AND c.classifier_type IN ('rule', 'rule_hint')
@@ -418,9 +413,9 @@ def run_rule_classification(
418413
rows = conn.execute(query, params).fetchall()
419414
cols = [
420415
"id", "account_id", "source_id", "mbox_key", "message_id", "thread_key",
421-
"subject", "sender", "recipients_json", "date_utc",
416+
"subject", "sender", "recipients_json", "cc_json", "bcc_json", "date_utc",
422417
]
423-
records = [dict(zip(cols, row)) for row in rows]
418+
records = [decode_address_fields(dict(zip(cols, row))) for row in rows]
424419

425420
classified = 0
426421
skipped = 0
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
-- Migration 003: enforce JSON-array invariants for message address columns.
2+
-- SQLite cannot add NOT NULL/CHECK constraints to existing columns in place,
3+
-- so rebuild messages and recreate its indexes.
4+
5+
PRAGMA foreign_keys = OFF;
6+
7+
CREATE TABLE messages_new (
8+
id INTEGER PRIMARY KEY,
9+
source_id INTEGER NOT NULL,
10+
mbox_key TEXT NOT NULL,
11+
message_id TEXT,
12+
thread_key TEXT,
13+
subject TEXT,
14+
sender TEXT,
15+
recipients_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(recipients_json) = 'array'),
16+
cc_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(cc_json) = 'array'),
17+
bcc_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(bcc_json) = 'array'),
18+
date_header TEXT,
19+
date_utc TEXT,
20+
body_text TEXT,
21+
body_html TEXT,
22+
body_hash TEXT,
23+
body_chars INTEGER DEFAULT 0,
24+
body_word_count INTEGER DEFAULT 0,
25+
attachment_count INTEGER DEFAULT 0,
26+
raw_headers_json TEXT,
27+
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
28+
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
29+
account_id INTEGER REFERENCES accounts(id),
30+
FOREIGN KEY(source_id) REFERENCES mbox_sources(id),
31+
UNIQUE(source_id, mbox_key)
32+
);
33+
34+
INSERT INTO messages_new (
35+
id,
36+
source_id,
37+
mbox_key,
38+
message_id,
39+
thread_key,
40+
subject,
41+
sender,
42+
recipients_json,
43+
cc_json,
44+
bcc_json,
45+
date_header,
46+
date_utc,
47+
body_text,
48+
body_html,
49+
body_hash,
50+
body_chars,
51+
body_word_count,
52+
attachment_count,
53+
raw_headers_json,
54+
created_at,
55+
updated_at,
56+
account_id
57+
)
58+
SELECT
59+
id,
60+
source_id,
61+
mbox_key,
62+
message_id,
63+
thread_key,
64+
subject,
65+
sender,
66+
recipients_json,
67+
cc_json,
68+
bcc_json,
69+
date_header,
70+
date_utc,
71+
body_text,
72+
body_html,
73+
body_hash,
74+
body_chars,
75+
body_word_count,
76+
attachment_count,
77+
raw_headers_json,
78+
created_at,
79+
updated_at,
80+
account_id
81+
FROM messages;
82+
83+
DROP TABLE messages;
84+
ALTER TABLE messages_new RENAME TO messages;
85+
86+
CREATE INDEX IF NOT EXISTS idx_messages_message_id ON messages(message_id);
87+
CREATE INDEX IF NOT EXISTS idx_messages_thread_key ON messages(thread_key);
88+
CREATE INDEX IF NOT EXISTS idx_messages_date_utc ON messages(date_utc);
89+
CREATE INDEX IF NOT EXISTS idx_messages_body_hash ON messages(body_hash);
90+
CREATE INDEX IF NOT EXISTS idx_messages_account ON messages(account_id);
91+
CREATE INDEX IF NOT EXISTS idx_messages_account_message_id ON messages(account_id, message_id);
92+
CREATE INDEX IF NOT EXISTS idx_messages_account_thread_key ON messages(account_id, thread_key);
93+
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_account_source_mbox_key
94+
ON messages(account_id, source_id, mbox_key) WHERE account_id IS NOT NULL;
95+
96+
PRAGMA foreign_keys = ON;

src/mboxer/db/schema.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ CREATE TABLE IF NOT EXISTS messages (
7474
thread_key TEXT,
7575
subject TEXT,
7676
sender TEXT,
77-
recipients_json TEXT,
78-
cc_json TEXT,
79-
bcc_json TEXT,
77+
recipients_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(recipients_json) = 'array'),
78+
cc_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(cc_json) = 'array'),
79+
bcc_json TEXT NOT NULL DEFAULT '[]' CHECK (json_type(bcc_json) = 'array'),
8080
date_header TEXT,
8181
date_utc TEXT,
8282
body_text TEXT,

src/mboxer/exporters/jsonl.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import Any
88

9+
from ..records import decode_address_fields
910
from ..security.findings import ResidualFindingsBlocked, merge_counts
1011
from ..security.policy import default_export_profile, resolve_export_profile, resolve_findings_policy
1112
from .projection import prepare_projection
@@ -37,7 +38,7 @@ def export_jsonl(
3738
rows = conn.execute(
3839
"""
3940
SELECT m.id, m.message_id, m.thread_key, m.subject, m.sender,
40-
m.recipients_json, m.cc_json, m.date_utc,
41+
m.recipients_json, m.cc_json, m.bcc_json, m.date_utc,
4142
m.body_text, m.body_hash, m.body_chars, m.body_word_count,
4243
m.attachment_count, s.source_name, s.source_slug
4344
FROM messages m
@@ -51,7 +52,7 @@ def export_jsonl(
5152
rows = conn.execute(
5253
"""
5354
SELECT m.id, m.message_id, m.thread_key, m.subject, m.sender,
54-
m.recipients_json, m.cc_json, m.date_utc,
55+
m.recipients_json, m.cc_json, m.bcc_json, m.date_utc,
5556
m.body_text, m.body_hash, m.body_chars, m.body_word_count,
5657
m.attachment_count, s.source_name, s.source_slug
5758
FROM messages m
@@ -62,7 +63,7 @@ def export_jsonl(
6263

6364
cols = [
6465
"id", "message_id", "thread_key", "subject", "sender",
65-
"recipients_json", "cc_json", "date_utc",
66+
"recipients_json", "cc_json", "bcc_json", "date_utc",
6667
"body_text", "body_hash", "body_chars", "body_word_count",
6768
"attachment_count", "source_name", "source_slug",
6869
]
@@ -118,12 +119,7 @@ def export_jsonl(
118119
any_scrubbed = True
119120

120121
record["account_key"] = account_key
121-
try:
122-
record["recipients"] = json.loads(record.pop("recipients_json") or "[]")
123-
record["cc"] = json.loads(record.pop("cc_json") or "[]")
124-
except Exception:
125-
record["recipients"] = []
126-
record["cc"] = []
122+
record = decode_address_fields(record)
127123

128124
if include_classification and record["id"] in classifications:
129125
record["classification"] = classifications[record["id"]]

src/mboxer/ingest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .db import init_db
1414
from .naming import slugify
1515
from .normalize import normalize_message
16+
from .records import loads_address_list
1617

1718

1819
class SourceIdentityError(RuntimeError):
@@ -409,7 +410,7 @@ def ingest_mbox(
409410
raise RuntimeError("INSERT succeeded but cursor.lastrowid is None")
410411

411412
if record.get("thread_key"):
412-
participants = json.loads(record.get("recipients_json") or "[]")
413+
participants = loads_address_list(record["recipients_json"])
413414
if record.get("sender"):
414415
participants = [record["sender"]] + participants
415416
_upsert_thread(

src/mboxer/normalize.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def _decode_header_value(value: str | None) -> str:
2626

2727

2828
def _parse_address_list(header_value: str | None) -> list[str]:
29+
"""Return bare lowercased addresses for one address-list header, preserving order."""
2930
if not header_value:
3031
return []
3132
addrs: list[str] = []

src/mboxer/records.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from typing import Any
5+
6+
_ADDRESS_FIELDS = (
7+
("recipients_json", "recipients"),
8+
("cc_json", "cc"),
9+
("bcc_json", "bcc"),
10+
)
11+
12+
13+
def loads_address_list(value: str) -> list[str]:
14+
"""Decode a DB-backed address-list JSON value into a strict list of strings."""
15+
parsed = json.loads(value)
16+
if not isinstance(parsed, list):
17+
raise ValueError("address-list JSON must be an array")
18+
if not all(isinstance(item, str) for item in parsed):
19+
raise ValueError("address-list JSON elements must be strings")
20+
return parsed
21+
22+
23+
def decode_address_fields(record: dict[str, Any]) -> dict[str, Any]:
24+
"""Replace address-list JSON fields on a materialized row with typed lists."""
25+
for json_field, list_field in _ADDRESS_FIELDS:
26+
if json_field in record:
27+
record[list_field] = loads_address_list(record.pop(json_field))
28+
return record

0 commit comments

Comments
 (0)