Skip to content

Commit 56526d8

Browse files
committed
refactor: extract out storage logic
1 parent 4d5fa87 commit 56526d8

14 files changed

Lines changed: 851 additions & 452 deletions

File tree

rust/core/src/engine/app.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,9 @@ impl<Prof: EngineProfile> App<Prof> {
7272
let app_reg = AppRegistration::new(name, &env)?;
7373

7474
// TODO: This database initialization logic should happen lazily on first call to `update()`.
75-
let db = {
76-
let mut wtxn = env.db_env().write_txn()?;
77-
let db = env.db_env().create_database(&mut wtxn, Some(name))?;
78-
wtxn.commit()?;
79-
db
80-
};
75+
let store = env.create_app_store(name)?;
8176

82-
let app_ctx = AppContext::new(env, db, app_reg, max_inflight_components);
77+
let app_ctx = AppContext::new(env, store, app_reg, max_inflight_components);
8378
let root_component = Component::new(app_ctx, StablePath::root(), None);
8479
crate::telemetry::track("app_create");
8580
Ok(Self { root_component })
@@ -216,12 +211,12 @@ impl<Prof: EngineProfile> App<Prof> {
216211
handle.ready().await?;
217212

218213
// Clear the database
219-
let db = root_component.app_ctx().db().clone();
214+
let store = root_component.app_ctx().store().clone();
220215
root_component
221216
.app_ctx()
222217
.env()
223218
.txn_batcher()
224-
.run(move |wtxn| Ok(db.clear(wtxn)?))
219+
.run(move |wtxn| crate::state_store::ops::clear_all(wtxn, &store))
225220
.await?;
226221

227222
info!("App dropped successfully");

rust/core/src/engine/context.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ use crate::state::target_state_path::TargetStatePath;
1818
use crate::{
1919
engine::environment::{AppRegistration, Environment},
2020
state::stable_path::StablePath,
21+
state_store::Store,
2122
};
2223

2324
struct AppContextInner<Prof: EngineProfile> {
2425
env: Environment<Prof>,
25-
db: db_schema::Database,
26+
store: Store,
2627
app_reg: AppRegistration<Prof>,
2728
id_sequencer_manager: IdSequencerManager,
2829
inflight_semaphore: Option<Arc<tokio::sync::Semaphore>>,
@@ -53,7 +54,7 @@ pub struct AppContext<Prof: EngineProfile> {
5354
impl<Prof: EngineProfile> AppContext<Prof> {
5455
pub fn new(
5556
env: Environment<Prof>,
56-
db: db_schema::Database,
57+
store: Store,
5758
app_reg: AppRegistration<Prof>,
5859
max_inflight_components: Option<usize>,
5960
) -> Self {
@@ -62,7 +63,7 @@ impl<Prof: EngineProfile> AppContext<Prof> {
6263
Self {
6364
inner: Arc::new(AppContextInner {
6465
env,
65-
db,
66+
store,
6667
app_reg,
6768
id_sequencer_manager: IdSequencerManager::new(),
6869
inflight_semaphore,
@@ -113,8 +114,8 @@ impl<Prof: EngineProfile> AppContext<Prof> {
113114
&self.inner.env
114115
}
115116

116-
pub fn db(&self) -> &db_schema::Database {
117-
&self.inner.db
117+
pub fn store(&self) -> &Store {
118+
&self.inner.store
118119
}
119120

120121
pub fn app_reg(&self) -> &AppRegistration<Prof> {
@@ -152,7 +153,7 @@ impl<Prof: EngineProfile> AppContext<Prof> {
152153
let key = key.unwrap_or(&default_key);
153154
self.inner
154155
.id_sequencer_manager
155-
.next_id(self.inner.env.txn_batcher(), &self.inner.db, key)
156+
.next_id(self.inner.env.txn_batcher(), &self.inner.store, key)
156157
.await
157158
}
158159
}

rust/core/src/engine/environment.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{
2-
engine::profile::EngineProfile, engine::target_state::TargetStateProviderRegistry,
3-
engine::txn_batcher::TxnBatcher, prelude::*,
2+
engine::profile::EngineProfile,
3+
engine::target_state::TargetStateProviderRegistry,
4+
prelude::*,
5+
state_store::{ReadTxn, TxnBatcher},
46
};
57

68
use cocoindex_utils::fingerprint::Fingerprint;
@@ -133,13 +135,26 @@ impl<Prof: EngineProfile> Environment<Prof> {
133135
&self.inner.db_env
134136
}
135137

138+
/// Create the per-app sub-database for this environment and wrap it in a
139+
/// `Store`. Hides the underlying LMDB sub-database creation from
140+
/// `App::new`.
141+
pub fn create_app_store(&self, app_name: &str) -> Result<crate::state_store::Store> {
142+
let mut wtxn = self.inner.db_env.write_txn()?;
143+
let db = self
144+
.inner
145+
.db_env
146+
.create_database(&mut wtxn, Some(app_name))?;
147+
wtxn.commit()?;
148+
Ok(crate::state_store::Store::new(db))
149+
}
150+
136151
/// Open an LMDB read transaction with automatic retry on `MDB_READERS_FULL`.
137152
///
138153
/// Two-phase strategy:
139154
/// 1. Retry with a short timeout — handles transient reader slot contention.
140155
/// 2. If phase 1 times out, call `clear_stale_readers()` to reclaim slots
141156
/// from dead processes, then retry indefinitely.
142-
pub async fn read_txn(&self) -> Result<heed::RoTxn<'_, heed::WithoutTls>> {
157+
pub async fn read_txn(&self) -> Result<ReadTxn<'_>> {
143158
let db_env = self.db_env();
144159
let try_read_txn = || async {
145160
match db_env.read_txn() {
@@ -156,7 +171,7 @@ impl<Prof: EngineProfile> Environment<Prof> {
156171

157172
// Phase 1: short timeout for transient concurrency.
158173
match retryable::run(&try_read_txn, &LMDB_READ_TXN_RETRY_PHASE1).await {
159-
Ok(txn) => return Ok(txn),
174+
Ok(txn) => return Ok(ReadTxn::new(txn)),
160175
Err(e) if !e.is_retryable => return Err(e.into()),
161176
Err(_) => {}
162177
}
@@ -168,6 +183,7 @@ impl<Prof: EngineProfile> Environment<Prof> {
168183
}
169184
retryable::run(&try_read_txn, &LMDB_READ_TXN_RETRY_PHASE2)
170185
.await
186+
.map(ReadTxn::new)
171187
.map_err(Into::into)
172188
}
173189

0 commit comments

Comments
 (0)