Skip to content

Commit d42e5f7

Browse files
committed
feat(memory): pluggable storage backends with full PostgreSQL + Qdrant support
Redesign the openfang-memory crate for pluggable storage backends. The main backend (sqlite or postgres) and semantic backend (sqlite, postgres, qdrant, http) are independently configurable. Architecture: - substrate.rs is 100% backend-agnostic (zero rusqlite imports) - 9 backend traits: Structured, Semantic, Knowledge, Session, Usage, PairedDevices, TaskQueue, Consolidation, Audit - SessionBackend has 5 default trait impls (create_session, canonical_context, append_canonical, store_llm_summary, etc.) - Shared helpers.rs for serialization/parsing across backends - JSONL session mirror extracted to standalone filesystem utility Backends: - sqlite/ — 11 files, full implementation with sqlite-vec vectors - postgres/ — 11 files, full implementation with pgvector - qdrant/ — semantic-only, gRPC vector similarity search - http/ — semantic-only, remote memory-api gateway with fallback External callers migrated: - kernel uses memory.usage_arc() and memory.audit() (was usage_conn()) - api routes use memory.usage() trait method - runtime AuditLog uses AuditBackend trait (was raw rusqlite Connection) - MeteringEngine accepts Arc<dyn UsageBackend> (was Arc<UsageStore>) Config: [memory] backend = "sqlite" # or "postgres" semantic_backend = "qdrant" # independently: sqlite/postgres/qdrant/http Docker: pgvector/pg18 + qdrant services for integration testing. 65 tests (40 unit + 25 integration) across all backends.
1 parent 6851bba commit d42e5f7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+7243
-2112
lines changed

Cargo.lock

Lines changed: 742 additions & 76 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ uuid = { version = "1", features = ["v4", "v5", "serde"] }
5656

5757
# Database
5858
rusqlite = { version = "0.31", features = ["bundled", "serde_json"] }
59+
sqlite-vec = "0.1.6"
5960

6061
# CLI
6162
clap = { version = "4", features = ["derive"] }

Dockerfile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
# syntax=docker/dockerfile:1
2+
3+
# ── Test stage ──────────────────────────────────────────────────────────────
4+
# Run: docker compose --profile test run test
5+
FROM rust:1-slim-bookworm AS tester
6+
WORKDIR /build
7+
RUN apt-get update && apt-get install -y pkg-config libssl-dev perl make && rm -rf /var/lib/apt/lists/*
8+
COPY Cargo.toml Cargo.lock ./
9+
COPY crates ./crates
10+
COPY xtask ./xtask
11+
COPY agents ./agents
12+
COPY packages ./packages
13+
RUN cargo test --workspace --exclude openfang-desktop
14+
RUN cargo clippy --workspace --all-targets --exclude openfang-desktop -- -D warnings
15+
16+
# ── Builder stage ───────────────────────────────────────────────────────────
217
FROM rust:1-slim-bookworm AS builder
318
WORKDIR /build
419
RUN apt-get update && apt-get install -y pkg-config libssl-dev perl make && rm -rf /var/lib/apt/lists/*

crates/openfang-api/src/routes.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5436,10 +5436,10 @@ pub async fn agent_budget_status(
54365436
};
54375437

54385438
let quota = &entry.manifest.resources;
5439-
let usage_store = openfang_memory::usage::UsageStore::new(state.kernel.memory.usage_conn());
5440-
let hourly = usage_store.query_hourly(agent_id).unwrap_or(0.0);
5441-
let daily = usage_store.query_daily(agent_id).unwrap_or(0.0);
5442-
let monthly = usage_store.query_monthly(agent_id).unwrap_or(0.0);
5439+
let usage = state.kernel.memory.usage();
5440+
let hourly = usage.query_hourly(agent_id).unwrap_or(0.0);
5441+
let daily = usage.query_daily(agent_id).unwrap_or(0.0);
5442+
let monthly = usage.query_monthly(agent_id).unwrap_or(0.0);
54435443

54445444
// Token usage from scheduler
54455445
let token_usage = state.kernel.scheduler.get_usage(agent_id);
@@ -5476,14 +5476,14 @@ pub async fn agent_budget_status(
54765476

54775477
/// GET /api/budget/agents — Per-agent cost ranking (top spenders).
54785478
pub async fn agent_budget_ranking(State(state): State<Arc<AppState>>) -> impl IntoResponse {
5479-
let usage_store = openfang_memory::usage::UsageStore::new(state.kernel.memory.usage_conn());
5479+
let usage = state.kernel.memory.usage();
54805480
let agents: Vec<serde_json::Value> = state
54815481
.kernel
54825482
.registry
54835483
.list()
54845484
.iter()
54855485
.filter_map(|entry| {
5486-
let daily = usage_store.query_daily(entry.id).unwrap_or(0.0);
5486+
let daily = usage.query_daily(entry.id).unwrap_or(0.0);
54875487
if daily > 0.0 {
54885488
Some(serde_json::json!({
54895489
"agent_id": entry.id.to_string(),

crates/openfang-kernel/src/kernel.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -756,9 +756,7 @@ impl OpenFangKernel {
756756
};
757757

758758
// Initialize metering engine (shares the same SQLite connection as the memory substrate)
759-
let metering = Arc::new(MeteringEngine::new(Arc::new(
760-
openfang_memory::usage::UsageStore::new(memory.usage_conn()),
761-
)));
759+
let metering = Arc::new(MeteringEngine::new(memory.usage_arc()));
762760

763761
let supervisor = Supervisor::new();
764762
let background = BackgroundExecutor::new(supervisor.subscribe());
@@ -1119,7 +1117,10 @@ impl OpenFangKernel {
11191117
workflows: WorkflowEngine::new(),
11201118
triggers: TriggerEngine::new(),
11211119
background,
1122-
audit_log: Arc::new(AuditLog::with_db(memory.usage_conn())),
1120+
audit_log: Arc::new(match memory.audit() {
1121+
Some(backend) => AuditLog::with_backend(backend),
1122+
None => AuditLog::new(),
1123+
}),
11231124
metering,
11241125
default_driver: driver,
11251126
wasm_sandbox,

crates/openfang-kernel/src/metering.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
//! Metering engine — tracks LLM cost and enforces spending quotas.
22
3-
use openfang_memory::usage::{ModelUsage, UsageRecord, UsageStore, UsageSummary};
3+
use openfang_memory::backends::UsageBackend;
4+
use openfang_memory::usage::{ModelUsage, UsageRecord, UsageSummary};
45
use openfang_types::agent::{AgentId, ResourceQuota};
56
use openfang_types::error::{OpenFangError, OpenFangResult};
67
use std::sync::Arc;
78

89
/// The metering engine tracks usage cost and enforces quota limits.
910
pub struct MeteringEngine {
10-
/// Persistent usage store (SQLite-backed).
11-
store: Arc<UsageStore>,
11+
/// Persistent usage store.
12+
store: Arc<dyn UsageBackend>,
1213
}
1314

1415
impl MeteringEngine {
1516
/// Create a new metering engine with the given usage store.
16-
pub fn new(store: Arc<UsageStore>) -> Self {
17+
pub fn new(store: Arc<dyn UsageBackend>) -> Self {
1718
Self { store }
1819
}
1920

@@ -518,8 +519,7 @@ mod tests {
518519

519520
fn setup() -> MeteringEngine {
520521
let substrate = MemorySubstrate::open_in_memory(0.1).unwrap();
521-
let store = Arc::new(UsageStore::new(substrate.usage_conn()));
522-
MeteringEngine::new(store)
522+
MeteringEngine::new(substrate.usage_arc())
523523
}
524524

525525
#[test]

crates/openfang-memory/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ description = "Memory substrate for the OpenFang Agent OS"
88
[features]
99
default = ["http-memory"]
1010
http-memory = ["reqwest"]
11+
postgres = ["dep:tokio-postgres", "dep:deadpool-postgres", "dep:pgvector"]
12+
qdrant = ["dep:qdrant-client"]
1113

1214
[dependencies]
1315
openfang-types = { path = "../openfang-types" }
@@ -16,12 +18,19 @@ serde = { workspace = true }
1618
serde_json = { workspace = true }
1719
rmp-serde = { workspace = true }
1820
rusqlite = { workspace = true }
21+
sqlite-vec = { workspace = true }
1922
chrono = { workspace = true }
23+
hex = { workspace = true }
24+
sha2 = { workspace = true }
2025
uuid = { workspace = true }
2126
thiserror = { workspace = true }
2227
async-trait = { workspace = true }
2328
tracing = { workspace = true }
2429
reqwest = { workspace = true, features = ["blocking"], optional = true }
30+
tokio-postgres = { version = "0.7.17", optional = true, features = ["with-serde_json-1", "with-chrono-0_4", "with-uuid-1"] }
31+
deadpool-postgres = { version = "0.14", optional = true }
32+
pgvector = { version = "0.4", optional = true, features = ["postgres"] }
33+
qdrant-client = { version = "1.13", optional = true }
2534

2635
[dev-dependencies]
2736
tokio-test = { workspace = true }
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
//! Backend traits for session and usage stores.
2+
//!
3+
//! These traits reference types defined in this crate (`Session`, `UsageRecord`, etc.)
4+
//! and therefore cannot live in `openfang-types`. The structured, semantic, and
5+
//! knowledge backend traits are in `openfang_types::storage`.
6+
7+
use chrono::Utc;
8+
use openfang_types::agent::{AgentId, SessionId};
9+
use openfang_types::error::OpenFangResult;
10+
use openfang_types::message::Message;
11+
12+
use crate::session::{CanonicalSession, Session};
13+
use crate::usage::{DailyBreakdown, ModelUsage, UsageRecord, UsageSummary};
14+
15+
// Re-export the traits from openfang-types for convenience.
16+
pub use openfang_types::storage::{KnowledgeBackend, SemanticBackend, StructuredBackend};
17+
18+
/// Backend for conversation session persistence.
19+
pub trait SessionBackend: Send + Sync {
20+
/// Get a session by ID.
21+
fn get_session(&self, id: SessionId) -> OpenFangResult<Option<Session>>;
22+
/// Save (upsert) a session.
23+
fn save_session(&self, session: &Session) -> OpenFangResult<()>;
24+
/// Delete a session.
25+
fn delete_session(&self, id: SessionId) -> OpenFangResult<()>;
26+
/// Delete all sessions for an agent.
27+
fn delete_agent_sessions(&self, agent_id: AgentId) -> OpenFangResult<()>;
28+
/// Create a new empty session for an agent.
29+
fn create_session(&self, agent_id: AgentId) -> OpenFangResult<Session> {
30+
self.create_session_with_label(agent_id, None)
31+
}
32+
/// Create a new session with an optional label.
33+
fn create_session_with_label(
34+
&self,
35+
agent_id: AgentId,
36+
label: Option<&str>,
37+
) -> OpenFangResult<Session> {
38+
let session = Session {
39+
id: SessionId::new(),
40+
agent_id,
41+
messages: vec![],
42+
context_window_tokens: 0,
43+
label: label.map(|s| s.to_string()),
44+
};
45+
self.save_session(&session)?;
46+
Ok(session)
47+
}
48+
/// List all sessions (metadata only).
49+
fn list_sessions(&self) -> OpenFangResult<Vec<serde_json::Value>>;
50+
/// List sessions for a specific agent.
51+
fn list_agent_sessions(&self, agent_id: AgentId) -> OpenFangResult<Vec<serde_json::Value>>;
52+
/// Set a human-readable label on a session.
53+
fn set_session_label(&self, id: SessionId, label: Option<&str>) -> OpenFangResult<()>;
54+
/// Find a session by label for a given agent.
55+
fn find_session_by_label(
56+
&self,
57+
agent_id: AgentId,
58+
label: &str,
59+
) -> OpenFangResult<Option<Session>>;
60+
/// Delete the canonical session for an agent.
61+
fn delete_canonical_session(&self, agent_id: AgentId) -> OpenFangResult<()>;
62+
63+
// -- Canonical session methods --
64+
65+
/// Load the canonical (cross-channel) session, creating if absent.
66+
fn load_canonical(&self, agent_id: AgentId) -> OpenFangResult<CanonicalSession>;
67+
/// Persist a canonical session (insert or update).
68+
fn save_canonical(&self, canonical: &CanonicalSession) -> OpenFangResult<()>;
69+
/// Append messages to the canonical session, compacting if needed.
70+
fn append_canonical(
71+
&self,
72+
agent_id: AgentId,
73+
new_messages: &[Message],
74+
compaction_threshold: Option<usize>,
75+
) -> OpenFangResult<CanonicalSession> {
76+
let mut canonical = self.load_canonical(agent_id)?;
77+
canonical.messages.extend_from_slice(new_messages);
78+
79+
let threshold = compaction_threshold.unwrap_or(100);
80+
let keep_count = 50;
81+
82+
if canonical.messages.len() > threshold {
83+
let to_compact = canonical.messages.len().saturating_sub(keep_count);
84+
if to_compact > canonical.compaction_cursor {
85+
let compacting = &canonical.messages[canonical.compaction_cursor..to_compact];
86+
let mut summary_parts: Vec<String> = Vec::new();
87+
if let Some(ref existing) = canonical.compacted_summary {
88+
summary_parts.push(existing.clone());
89+
}
90+
for msg in compacting {
91+
let role = match msg.role {
92+
openfang_types::message::Role::User => "User",
93+
openfang_types::message::Role::Assistant => "Assistant",
94+
openfang_types::message::Role::System => "System",
95+
};
96+
let text = msg.content.text_content();
97+
if !text.is_empty() {
98+
let truncated = if text.len() > 200 {
99+
format!("{}...", openfang_types::truncate_str(&text, 200))
100+
} else {
101+
text
102+
};
103+
summary_parts.push(format!("{role}: {truncated}"));
104+
}
105+
}
106+
let mut full_summary = summary_parts.join("\n");
107+
if full_summary.len() > 4000 {
108+
let start = full_summary.len() - 4000;
109+
let safe_start = (start..full_summary.len())
110+
.find(|&i| full_summary.is_char_boundary(i))
111+
.unwrap_or(full_summary.len());
112+
full_summary = full_summary[safe_start..].to_string();
113+
}
114+
canonical.compacted_summary = Some(full_summary);
115+
canonical.messages = canonical.messages.split_off(to_compact);
116+
canonical.compaction_cursor = 0;
117+
}
118+
}
119+
120+
canonical.updated_at = Utc::now().to_rfc3339();
121+
self.save_canonical(&canonical)?;
122+
Ok(canonical)
123+
}
124+
/// Get the canonical context window (optional summary + recent messages).
125+
fn canonical_context(
126+
&self,
127+
agent_id: AgentId,
128+
window_size: Option<usize>,
129+
) -> OpenFangResult<(Option<String>, Vec<Message>)> {
130+
let session = self.load_canonical(agent_id)?;
131+
let window = window_size.unwrap_or(50);
132+
let messages = if session.messages.len() > window {
133+
session.messages[session.messages.len() - window..].to_vec()
134+
} else {
135+
session.messages
136+
};
137+
Ok((session.compacted_summary, messages))
138+
}
139+
/// Store an LLM-generated summary for the canonical session.
140+
fn store_llm_summary(
141+
&self,
142+
agent_id: AgentId,
143+
summary: &str,
144+
kept_messages: Vec<Message>,
145+
) -> OpenFangResult<()> {
146+
let mut canonical = self.load_canonical(agent_id)?;
147+
canonical.compacted_summary = Some(summary.to_string());
148+
canonical.messages = kept_messages;
149+
canonical.compaction_cursor = 0;
150+
canonical.updated_at = Utc::now().to_rfc3339();
151+
self.save_canonical(&canonical)
152+
}
153+
}
154+
155+
/// Backend for LLM usage tracking and cost metering.
156+
pub trait UsageBackend: Send + Sync {
157+
/// Record a usage event.
158+
fn record(&self, record: &UsageRecord) -> OpenFangResult<()>;
159+
/// Query total cost for an agent in the last hour.
160+
fn query_hourly(&self, agent_id: AgentId) -> OpenFangResult<f64>;
161+
/// Query total cost for an agent today.
162+
fn query_daily(&self, agent_id: AgentId) -> OpenFangResult<f64>;
163+
/// Query total cost for an agent this month.
164+
fn query_monthly(&self, agent_id: AgentId) -> OpenFangResult<f64>;
165+
/// Query total cost across all agents in the last hour.
166+
fn query_global_hourly(&self) -> OpenFangResult<f64>;
167+
/// Query total cost across all agents this month.
168+
fn query_global_monthly(&self) -> OpenFangResult<f64>;
169+
/// Get a summary of usage, optionally filtered by agent.
170+
fn query_summary(&self, agent_id: Option<AgentId>) -> OpenFangResult<UsageSummary>;
171+
/// Get usage breakdown by model.
172+
fn query_by_model(&self) -> OpenFangResult<Vec<ModelUsage>>;
173+
/// Get daily usage breakdown for the last N days.
174+
fn query_daily_breakdown(&self, days: u32) -> OpenFangResult<Vec<DailyBreakdown>>;
175+
/// Get the date of the first usage event.
176+
fn query_first_event_date(&self) -> OpenFangResult<Option<String>>;
177+
/// Get today's total cost.
178+
fn query_today_cost(&self) -> OpenFangResult<f64>;
179+
/// Delete usage events older than N days. Returns count deleted.
180+
fn cleanup_old(&self, days: u32) -> OpenFangResult<usize>;
181+
}
182+
183+
/// Backend for paired device persistence.
184+
pub trait PairedDevicesBackend: Send + Sync {
185+
fn load_paired_devices(&self) -> OpenFangResult<Vec<serde_json::Value>>;
186+
fn save_paired_device(
187+
&self,
188+
device_id: &str,
189+
display_name: &str,
190+
platform: &str,
191+
paired_at: &str,
192+
last_seen: &str,
193+
push_token: Option<&str>,
194+
) -> OpenFangResult<()>;
195+
fn remove_paired_device(&self, device_id: &str) -> OpenFangResult<()>;
196+
}
197+
198+
/// Backend for the shared task queue.
199+
pub trait TaskQueueBackend: Send + Sync {
200+
fn task_post(&self, title: &str, description: &str, assigned_to: &str, created_by: &str) -> OpenFangResult<String>;
201+
fn task_claim(&self, agent_id: &str) -> OpenFangResult<Option<serde_json::Value>>;
202+
fn task_complete(&self, task_id: &str, result: &str) -> OpenFangResult<()>;
203+
fn task_list(&self, status: Option<&str>) -> OpenFangResult<Vec<serde_json::Value>>;
204+
}
205+
206+
/// Backend for memory consolidation (confidence decay).
207+
pub trait ConsolidationBackend: Send + Sync {
208+
fn consolidate(&self) -> OpenFangResult<openfang_types::memory::ConsolidationReport>;
209+
}
210+
211+
/// Backend for audit log persistence.
212+
pub trait AuditBackend: Send + Sync {
213+
fn append_entry(&self, agent_id: &str, action: &str, detail: &str, outcome: &str) -> OpenFangResult<()>;
214+
fn load_entries(&self, agent_id: Option<&str>, limit: usize) -> OpenFangResult<Vec<serde_json::Value>>;
215+
}

0 commit comments

Comments
 (0)