Summary
This RFC proposes a phased refactor which introduces a common storage access layer. The objective is to make storage integration predictable and incremental: adding a new storage provider should mainly require implementing traits, while keeping domain logic unchanged.
Motivation
I (personally) have three motivations to do this refactor:
- The ecosystem of vector database is still a chaos. I personally installed many different pgsql / mysql / ... distributions to meet the requirement for vector database from different services (pgvecto.rs, paradedb, vectorchord, tidb, milvus...) 🤦 . It'd be really helpful for the users to have richer choices, so users can maintain less databases. Maintaining database is really a boring / risky job.
- PgSQL cannot always cover all cases. Maybe one day we'll need to have a memory based / cloud based / local file based solutions, or have other more powerful storage solutions (e.g. message queue, graph db...). It'll be more helpful to have a clear storage abstraction.
- Clean code. Having an abstraction between the domain logic and storage is always good. It'll help us to make it easier to add metrics, more complex migration logic and tests in the future.
Goals
- Define storage capabilities as traits.
- Keep behavior and all logic unchanged.
- Allow backend-specific SQL and optimization to stay in the corresponding crates.
- This will not be a short journey. We should implement this RFC incrementally.
Non-Goals
- Adding more storage providers besides paradedb.
- Standardizing every possible backend detail into one universal SQL layer.
Design Principles
- Core owns interfaces; backend crates own implementation details.
- Trait boundaries use portable data formats when possible.
- Backend-specific optimizations remain allowed behind trait implementations.
- Transactions are explicit in capability traits when atomicity matters.
- Refactor is done in horizontal slices so each step is independently valid.
Phased Delivery Plan
Phase 1: Message Queue
- Introduce message queue store trait in
core.
Trait design:
#[async_trait]
pub trait MessageQueueStore: Send + Sync {
async fn get_state(&self, id: Uuid) -> Result<MessageQueueState, AppError>;
async fn append_message(&self, id: Uuid, message: Message) -> Result<i32, AppError>;
async fn append_messages(&self, id: Uuid, messages: Vec<Message>) -> Result<i32, AppError>;
async fn drain(&self, id: Uuid, count: usize) -> Result<(), AppError>;
async fn try_set_fence(&self, id: Uuid, fence_count: i32) -> Result<bool, AppError>;
async fn clear_stale_fence(&self, id: Uuid, ttl_minutes: i64) -> Result<bool, AppError>;
async fn finalize_job(
&self,
id: Uuid,
prev_episode_summary: Option<String>,
) -> Result<(), AppError>;
async fn clear_fence(&self, id: Uuid) -> Result<(), AppError>;
async fn get_prev_episode_summary(&self, id: Uuid) -> Result<Option<String>, AppError>;
async fn add_pending_review(
&self,
id: Uuid,
memory_ids: Vec<Uuid>,
query: String,
) -> Result<(), AppError>;
async fn take_pending_reviews(&self, id: Uuid)
-> Result<Option<Vec<PendingReview>>, AppError>;
async fn message_count(&self, id: Uuid) -> Result<i32, AppError>;
async fn queue_status(&self, id: Uuid) -> Result<QueueStatus, AppError>;
}
- Implement it in
storage_pg.
- Wire server and worker to consume the trait.
Phase 2: Retrieval
- Introduce retrieval traits for episodic and semantic retrieval.
Trait design:
#[async_trait]
pub trait EpisodicMemoryRetrieveStore: Send + Sync {
async fn search_hybrid(
&self,
query: &str,
query_embedding: Vec<f32>,
conversation_id: Uuid,
candidate_limit: i64,
) -> Result<Vec<(EpisodicMemory, f64)>, AppError>;
}
#[async_trait]
pub trait SemanticMemoryRetrieveStore: Send + Sync {
async fn retrieve_by_embedding(
&self,
query: &str,
query_embedding: Vec<f32>,
limit: i64,
conversation_id: Uuid,
category: Option<&str>,
candidate_limit: i64,
) -> Result<Vec<(SemanticMemory, f64)>, AppError>;
}
- Move backend-specific retrieval SQL into
storage_pg.
Phase 3: Episodic Write/Read Store
- Introduce
EpisodicMemoryStore trait for inserts, queries, consolidation flags, and review state updates.
Trait design:
#[async_trait]
pub trait EpisodicMemoryStore: Send + Sync {
async fn insert(&self, memory: &EpisodicMemory) -> Result<(), AppError>;
async fn get_by_id(&self, id: Uuid) -> Result<Option<EpisodicMemory>, AppError>;
async fn fetch_recent(
&self,
conversation_id: Uuid,
since: Option<DateTime<Utc>>,
limit: u64,
) -> Result<Vec<EpisodicMemory>, AppError>;
async fn fetch_unconsolidated(
&self,
conversation_id: Uuid,
) -> Result<Vec<EpisodicMemory>, AppError>;
async fn fetch_unconsolidated_by_ids(
&self,
conversation_id: Uuid,
ids: &[Uuid],
) -> Result<Vec<EpisodicMemory>, AppError>;
async fn count_unconsolidated(&self, conversation_id: Uuid) -> Result<u64, AppError>;
async fn mark_consolidated(&self, ids: &[Uuid]) -> Result<(), AppError>;
async fn update_review_state(
&self,
id: Uuid,
stability: f32,
difficulty: f32,
last_reviewed_at: DateTime<Utc>,
) -> Result<(), AppError>;
}
- Move episodic persistence operations into backend implementation.
- Update server/worker call sites to depend on this capability.
Phase 4: Semantic Store
- Introduce
SemanticMemoryStore and transactional SemanticMemoryStoreTx.
Trait design:
#[async_trait]
pub trait SemanticMemoryStore: Send + Sync {
async fn begin_tx(&self) -> Result<Box<dyn SemanticMemoryStoreTx>, AppError>;
}
#[async_trait]
pub trait SemanticMemoryStoreTx: Send + Sync {
async fn find_similar(
&self,
embedding: Vec<f32>,
threshold: f64,
conversation_id: Uuid,
limit: i64,
) -> Result<Vec<SemanticMemory>, AppError>;
async fn get_by_id(&self, id: Uuid) -> Result<Option<SemanticMemory>, AppError>;
async fn insert(&self, memory: &SemanticMemory) -> Result<(), AppError>;
async fn invalidate(&self, id: Uuid) -> Result<(), AppError>;
async fn append_source_episodic_ids(
&self,
fact_id: Uuid,
existing_source_episodic_ids: &[Uuid],
new_source_episodic_ids: &[Uuid],
) -> Result<(), AppError>;
async fn mark_episodic_consolidated(&self, ids: &[Uuid]) -> Result<(), AppError>;
async fn commit(self: Box<Self>) -> Result<(), AppError>;
async fn rollback(self: Box<Self>) -> Result<(), AppError>;
}
- Move semantic consolidation write sequence to transactional trait operations.
- Keep explicit
begin_tx, commit, and rollback contract.
- Keep similarity input at trait boundary portable (
Vec<f32>), with backend-local conversion.
Phase 5: Job Queue
- Introduce worker job queue abstraction.
Trait design:
#[async_trait]
pub trait JobQueue: Send + Sync {
async fn enqueue_segmentation(&self, job: EventSegmentationJob) -> Result<(), AppError>;
async fn enqueue_review(&self, job: MemoryReviewJob) -> Result<(), AppError>;
async fn enqueue_semantic(&self, job: SemanticConsolidationJob) -> Result<(), AppError>;
async fn count_active_jobs(&self, conversation_id: Uuid) -> Result<i64, AppError>;
}
- Provide PostgreSQL-backed implementation in
storage_pg.
- Remove direct queue backend coupling from server/worker composition.
Validation
TODO: I didn;t find any existing tests in this repo. Need discussion / help to validate the modification.
Most of the modification are just Copy-Paste, so the risk is relatively low IMO.
Future Work
- Add backend conformance test suite shared across providers.
- Introduce optional feature flags for backend selection in build/runtime composition.
Disclaimer
I'm a developer of TiDB and also a self-hosting maximalist 🧙♂️ . This proposal is submitted as a prerequisite for #41 . The final target is to add the support of TiDB (a database with MySQL compatible grammar and vector search capability) as the storage layer.
Summary
This RFC proposes a phased refactor which introduces a common storage access layer. The objective is to make storage integration predictable and incremental: adding a new storage provider should mainly require implementing traits, while keeping domain logic unchanged.
Motivation
I (personally) have three motivations to do this refactor:
Goals
Non-Goals
Design Principles
Phased Delivery Plan
Phase 1: Message Queue
core.Trait design:
storage_pg.Phase 2: Retrieval
Trait design:
storage_pg.Phase 3: Episodic Write/Read Store
EpisodicMemoryStoretrait for inserts, queries, consolidation flags, and review state updates.Trait design:
Phase 4: Semantic Store
SemanticMemoryStoreand transactionalSemanticMemoryStoreTx.Trait design:
begin_tx,commit, androllbackcontract.Vec<f32>), with backend-local conversion.Phase 5: Job Queue
Trait design:
storage_pg.Validation
TODO: I didn;t find any existing tests in this repo. Need discussion / help to validate the modification.
Most of the modification are just Copy-Paste, so the risk is relatively low IMO.
Future Work
Disclaimer
I'm a developer of TiDB and also a self-hosting maximalist 🧙♂️ . This proposal is submitted as a prerequisite for #41 . The final target is to add the support of TiDB (a database with MySQL compatible grammar and vector search capability) as the storage layer.