Skip to content

Commit 3b3169b

Browse files
Migrate backend to PostgreSQL and harden compatibility (#175)
1 parent 5f29e69 commit 3b3169b

File tree

3 files changed

+48
-8
lines changed

3 files changed

+48
-8
lines changed

service/server/database.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
r"\bALTER\s+TABLE\s+([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?!IF\s+NOT\s+EXISTS)",
4343
flags=re.IGNORECASE,
4444
)
45+
_POSTGRES_RETRYABLE_SQLSTATES = {"40001", "40P01", "55P03"}
4546

4647

4748
def using_postgres() -> bool:
@@ -52,6 +53,40 @@ def get_database_backend_name() -> str:
5253
return "postgresql" if using_postgres() else "sqlite"
5354

5455

56+
def begin_write_transaction(cursor: Any) -> None:
57+
"""Start a write transaction using syntax compatible with the active backend."""
58+
if using_postgres():
59+
cursor.execute("BEGIN")
60+
return
61+
cursor.execute("BEGIN IMMEDIATE")
62+
63+
64+
def is_retryable_db_error(exc: Exception) -> bool:
65+
"""Return True when the error is a transient write conflict worth retrying."""
66+
if isinstance(exc, sqlite3.OperationalError):
67+
message = str(exc).lower()
68+
return "database is locked" in message or "database is busy" in message
69+
70+
sqlstate = getattr(exc, "sqlstate", None)
71+
if not sqlstate:
72+
cause = getattr(exc, "__cause__", None)
73+
sqlstate = getattr(cause, "sqlstate", None)
74+
if sqlstate in _POSTGRES_RETRYABLE_SQLSTATES:
75+
return True
76+
77+
message = str(exc).lower()
78+
return any(
79+
fragment in message
80+
for fragment in (
81+
"could not serialize access",
82+
"deadlock detected",
83+
"lock not available",
84+
"database is locked",
85+
"database is busy",
86+
)
87+
)
88+
89+
5590
def _replace_unquoted_question_marks(sql: str) -> str:
5691
"""Translate sqlite-style placeholders to psycopg placeholders."""
5792
result: list[str] = []

service/server/routes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def _enforce_content_rate_limit(agent_id: int, action: str, content: str, target
167167
}
168168

169169
from config import CORS_ORIGINS, SIGNAL_PUBLISH_REWARD, SIGNAL_ADOPT_REWARD, DISCUSSION_PUBLISH_REWARD, REPLY_PUBLISH_REWARD
170-
from database import get_db_connection
170+
from database import begin_write_transaction, get_db_connection
171171
from market_intel import (
172172
get_market_intel_overview,
173173
get_market_news_payload,
@@ -1113,7 +1113,7 @@ async def push_realtime_signal(data: RealtimeSignalRequest, authorization: str =
11131113
conn = get_db_connection()
11141114
cursor = conn.cursor()
11151115
try:
1116-
cursor.execute("BEGIN IMMEDIATE")
1116+
begin_write_transaction(cursor)
11171117
signal_id = _reserve_signal_id(cursor)
11181118

11191119
if action_lower in ("sell", "cover"):
@@ -1209,7 +1209,7 @@ async def push_realtime_signal(data: RealtimeSignalRequest, authorization: str =
12091209
# Get all followers of this agent
12101210
conn = get_db_connection()
12111211
cursor = conn.cursor()
1212-
cursor.execute("BEGIN IMMEDIATE")
1212+
begin_write_transaction(cursor)
12131213
cursor.execute("""
12141214
SELECT follower_id FROM subscriptions
12151215
WHERE leader_id = ? AND status = 'active'

service/server/services.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
"""
66

77
import json
8+
import time
89
from datetime import datetime, timezone
910
from typing import Optional, Dict, Any, List
10-
from database import get_db_connection
11+
from database import get_db_connection, is_retryable_db_error
1112

1213

1314
# ==================== Agent Services ====================
@@ -66,7 +67,7 @@ def _add_agent_points(agent_id: int, points: int, reason: str = "reward") -> boo
6667
if points <= 0:
6768
return False
6869

69-
# Retry logic for database locking
70+
# Retry transient write conflicts on both SQLite and PostgreSQL.
7071
max_retries = 3
7172
for attempt in range(max_retries):
7273
conn = get_db_connection()
@@ -78,9 +79,13 @@ def _add_agent_points(agent_id: int, points: int, reason: str = "reward") -> boo
7879
conn.commit()
7980
return True
8081
except Exception as e:
81-
if "database is locked" in str(e) and attempt < max_retries - 1:
82-
import time
83-
time.sleep(0.5 * (attempt + 1)) # Exponential backoff
82+
try:
83+
conn.rollback()
84+
except Exception:
85+
pass
86+
87+
if is_retryable_db_error(e) and attempt < max_retries - 1:
88+
time.sleep(0.5 * (attempt + 1))
8489
continue
8590
print(f"[ERROR] Failed to add points to agent {agent_id}: {e}")
8691
return False

0 commit comments

Comments
 (0)