Skip to content

Commit 92f951c

Browse files
committed
Fix UNIQUE constraint race in session submit with atomic upsert
The check-then-insert pattern in SessionStore.submit() had a TOCTOU race: two concurrent submissions of the same session (e.g. local client + remote client) could both pass the existence check, then one INSERT would succeed and the other would crash with IntegrityError, returning 500 to the client. Replace with INSERT ... ON CONFLICT DO UPDATE (upsert). The WHERE clause ensures we only update when the content actually changed, so duplicate submissions with identical content are still skipped.
1 parent df7d1c6 commit 92f951c

File tree

1 file changed

+27
-7
lines changed

1 file changed

+27
-7
lines changed

src/synix/mesh/store.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,20 @@ def submit(
101101
submitted_by: str,
102102
subsession_seq: int = 0,
103103
) -> bool:
104-
"""Submit a session file. Returns True if new, False if duplicate.
104+
"""Submit a session file. Returns True if new/updated, False if duplicate.
105105
106106
Content is gzip-compressed and stored as
107107
sessions/{project_dir}/{session_id}_sub{seq:04d}.jsonl.gz.
108108
Deduplication is based on SHA256 of the raw content.
109+
110+
Uses an atomic upsert to avoid race conditions when multiple
111+
clients submit the same session concurrently.
109112
"""
110113
content_hash = hashlib.sha256(content).hexdigest()
111114

112115
conn = self._get_conn()
113116

114-
# Check for duplicate by sha256
117+
# Fast path: exact same content already exists anywhere — skip
115118
row = conn.execute("SELECT 1 FROM sessions WHERE jsonl_sha256 = ?", (content_hash,)).fetchone()
116119
if row is not None:
117120
logger.debug(
@@ -128,18 +131,35 @@ def submit(
128131
gz_path = proj_dir / self._file_name(session_id, subsession_seq)
129132
gz_path.write_bytes(gzip.compress(content))
130133

131-
# Insert metadata
132134
now = datetime.now(UTC).isoformat()
133-
conn.execute(
135+
136+
# Atomic upsert — if the primary key already exists (same session,
137+
# different content), update the row. Eliminates the TOCTOU race
138+
# between concurrent submissions from multiple clients.
139+
cursor = conn.execute(
134140
"""INSERT INTO sessions
135141
(session_id, project_dir, subsession_seq, submitted_by, jsonl_sha256, submitted_at, processed)
136-
VALUES (?, ?, ?, ?, ?, ?, 0)""",
142+
VALUES (?, ?, ?, ?, ?, ?, 0)
143+
ON CONFLICT (session_id, project_dir, subsession_seq) DO UPDATE SET
144+
jsonl_sha256 = excluded.jsonl_sha256,
145+
submitted_by = excluded.submitted_by,
146+
submitted_at = excluded.submitted_at,
147+
processed = 0
148+
WHERE excluded.jsonl_sha256 != sessions.jsonl_sha256""",
137149
(session_id, project_dir, subsession_seq, submitted_by, content_hash, now),
138150
)
139151
conn.commit()
140152

141-
logger.info("Submitted session %s seq=%d (sha256=%s)", session_id, subsession_seq, content_hash[:12])
142-
return True
153+
if cursor.rowcount > 0:
154+
logger.info("Submitted session %s seq=%d (sha256=%s)", session_id, subsession_seq, content_hash[:12])
155+
return True
156+
else:
157+
# ON CONFLICT matched but WHERE clause excluded it — same hash already stored
158+
logger.debug(
159+
"Session %s seq=%d already has same content (sha256=%s), skipping",
160+
session_id, subsession_seq, content_hash[:12],
161+
)
162+
return False
143163

144164
def get_unprocessed(self) -> list[dict]:
145165
"""Return list of unprocessed session metadata dicts."""

0 commit comments

Comments
 (0)