Skip to content

Commit 192b60b

Browse files
georgeh0claude
andauthored
refactor(state_store): per-component handle for fn-memo I/O (#1990)
- Introduce `FnMemoAccessor`, owning the read / write / retain operations for `FunctionMemoization` entries scoped to a single component path. - Build it once per `ComponentProcessorContext` and return it by reference, so any future per-build state on the accessor (e.g. caching or instrumentation) persists across the many fn-memo lookups within a single component's processing phase, rather than being re-created per call. - Inline the corresponding `AppStore::{read,write,retain}_fn_memo*` methods into the accessor; they had no remaining callers outside it. Shrinks `AppStore`'s public surface by ~50 lines. - Update the three engine call sites (cache-probe read in `read_fn_call_memo_with_txn`, post-execute write in `write_fn_call_memo`, commit-time retain GC in `Committer`) to route through `comp_ctx.fn_memo_accessor()`. No behavior change. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 34b151d commit 192b60b

4 files changed

Lines changed: 83 additions & 44 deletions

File tree

rust/core/src/engine/context.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::state::target_state_path::TargetStatePath;
1818
use crate::{
1919
engine::environment::{AppRegistration, Environment},
2020
state::stable_path::StablePath,
21-
state_store::AppStore,
21+
state_store::{AppStore, FnMemoAccessor},
2222
};
2323

2424
struct AppContextInner<Prof: EngineProfile> {
@@ -283,6 +283,13 @@ struct ComponentProcessorContextInner<Prof: EngineProfile> {
283283

284284
/// Opaque per-operation context (e.g. ContextProvider on the Python side).
285285
host_ctx: Arc<Prof::HostCtx>,
286+
287+
/// Per-component handle for function-memo I/O. Built once when the
288+
/// context is constructed and shared by reference, so any per-build
289+
/// state inside the accessor (e.g. a buffer in a future backend)
290+
/// persists across the many fn-memo lookups within one component's
291+
/// processing phase.
292+
fn_memo_accessor: FnMemoAccessor,
286293
}
287294

288295
#[derive(Clone)]
@@ -298,6 +305,10 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
298305
host_ctx: Arc<Prof::HostCtx>,
299306
processing_action: ComponentProcessingAction<Prof>,
300307
) -> Self {
308+
let fn_memo_accessor = FnMemoAccessor::new(
309+
component.app_ctx().app_store().clone(),
310+
component.stable_path().clone(),
311+
);
301312
Self {
302313
inner: Arc::new(ComponentProcessorContextInner {
303314
component,
@@ -308,6 +319,7 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
308319
inflight_permit: Mutex::new(None),
309320
logic_deps: Mutex::new(HashSet::new()),
310321
host_ctx,
322+
fn_memo_accessor,
311323
}),
312324
}
313325
}
@@ -328,6 +340,14 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
328340
self.inner.component.stable_path()
329341
}
330342

343+
/// Per-component handle for function-memoization I/O. All engine code
344+
/// that reads, writes, or retains function memos under this component's
345+
/// path should go through this handle rather than calling `AppStore`
346+
/// methods directly — see [`FnMemoAccessor`] for why.
347+
pub fn fn_memo_accessor(&self) -> &FnMemoAccessor {
348+
&self.inner.fn_memo_accessor
349+
}
350+
331351
pub(crate) fn parent_context(&self) -> Option<&ComponentProcessorContext<Prof>> {
332352
self.inner.parent_context.as_ref()
333353
}

rust/core/src/engine/execution.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ pub(crate) async fn update_component_memo_states<Prof: EngineProfile>(
199199

200200
async fn write_fn_call_memo<Prof: EngineProfile>(
201201
wtxn: &mut WriteTxn<'_>,
202-
app_store: &AppStore,
203202
comp_ctx: &ComponentProcessorContext<Prof>,
204203
memo_fp: Fingerprint,
205204
memo: FnCallMemo<Prof>,
@@ -217,21 +216,18 @@ async fn write_fn_call_memo<Prof: EngineProfile>(
217216
memo_states: memo_states_serialized,
218217
context_memo_states: context_memo_states_serialized,
219218
};
220-
app_store
221-
.write_fn_memo(wtxn, comp_ctx.stable_path(), memo_fp, &fn_call_memo)
219+
comp_ctx
220+
.fn_memo_accessor()
221+
.write(wtxn, memo_fp, &fn_call_memo)
222222
.await
223223
}
224224

225225
async fn read_fn_call_memo_with_txn<Prof: EngineProfile, T: AnyTxn>(
226226
rtxn: &mut T,
227-
app_store: &AppStore,
228227
comp_ctx: &ComponentProcessorContext<Prof>,
229228
memo_fp: Fingerprint,
230229
) -> Result<Option<FnCallMemo<Prof>>> {
231-
let Some(bytes) = app_store
232-
.read_fn_memo(rtxn, comp_ctx.stable_path(), memo_fp)
233-
.await?
234-
else {
230+
let Some(bytes) = comp_ctx.fn_memo_accessor().read(rtxn, memo_fp).await? else {
235231
return Ok(None);
236232
};
237233
let fn_call_memo: db_schema::FunctionMemoizationEntry<'_> = from_msgpack_slice(&bytes)?;
@@ -270,7 +266,7 @@ pub(crate) async fn read_fn_call_memo<Prof: EngineProfile>(
270266
return Ok(None);
271267
}
272268
let mut rtxn = comp_ctx.app_ctx().env().read_txn().await?;
273-
read_fn_call_memo_with_txn(&mut rtxn, comp_ctx.app_ctx().app_store(), comp_ctx, memo_fp).await
269+
read_fn_call_memo_with_txn(&mut rtxn, comp_ctx, memo_fp).await
274270
}
275271

276272
pub fn declare_target_state<Prof: EngineProfile>(
@@ -480,12 +476,13 @@ impl<Prof: EngineProfile> Committer<Prof> {
480476

481477
// Write memos.
482478
for (fp, memo) in memos_without_mounts_to_store {
483-
write_fn_call_memo(wtxn, &self.app_store, &self.component_ctx, fp, memo).await?;
479+
write_fn_call_memo(wtxn, &self.component_ctx, fp, memo).await?;
484480
}
485481

486482
// Delete all function memo entries that are not in the all_memo_fps.
487-
self.app_store
488-
.retain_fn_memos(wtxn, &self.component_path, all_memo_fps)
483+
self.component_ctx
484+
.fn_memo_accessor()
485+
.retain(wtxn, all_memo_fps)
489486
.await?;
490487

491488
if !self.demote_component_only {
@@ -1496,13 +1493,11 @@ async fn finalize_fn_call_memoization<Prof: EngineProfile>(
14961493
// Use a single read transaction for all DB reads.
14971494
if !deps_to_process.is_empty() {
14981495
let mut rtxn = comp_ctx.app_ctx().env().read_txn().await?;
1499-
let app_store = comp_ctx.app_ctx().app_store();
15001496
while let Some(fp) = deps_to_process.pop_front() {
15011497
if !result.all_memos_fps.insert(fp) {
15021498
continue;
15031499
}
1504-
let Some(memo) = read_fn_call_memo_with_txn(&mut rtxn, app_store, comp_ctx, fp).await?
1505-
else {
1500+
let Some(memo) = read_fn_call_memo_with_txn(&mut rtxn, comp_ctx, fp).await? else {
15061501
continue;
15071502
};
15081503
result

rust/core/src/state_store/app_store.rs

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -181,44 +181,68 @@ impl AppStore {
181181
}
182182
}
183183

184-
// --- Function memoization ------------------------------------------------
184+
// --- Function memoization (per-component, via FnMemoAccessor) -----------
185+
186+
/// Per-component handle that mediates function-memoization reads, writes,
187+
/// and retention during a single component build.
188+
///
189+
/// Engine code routes all per-component function-memo I/O through this type
190+
/// rather than calling [`AppStore`] methods directly, so storage backends
191+
/// that benefit from prefetching (e.g. a future Postgres backend that wants
192+
/// to load all entries for a component in one prefix scan and serve all
193+
/// reads from memory) can introduce a per-build buffer here without
194+
/// touching the engine call sites.
195+
///
196+
/// The accessor is constructed once per component build and held by
197+
/// [`ComponentProcessorContext`](crate::engine::context::ComponentProcessorContext)
198+
/// — engine code reaches it via `comp_ctx.fn_memo_accessor()`, which
199+
/// returns a borrow rather than a fresh value, so any per-build state
200+
/// (a future buffer) persists across the many fn-memo lookups inside a
201+
/// single component's processing phase.
202+
///
203+
/// The current LMDB implementation is a pure passthrough — every call
204+
/// delegates directly to the corresponding [`AppStore`] method, since LMDB's
205+
/// random reads are already memory speed.
206+
pub struct FnMemoAccessor {
207+
app_store: AppStore,
208+
component_path: StablePath,
209+
}
185210

186-
impl AppStore {
187-
/// Read raw function-memo bytes. See [`Self::read_tracking_info`] for
188-
/// the owned-bytes return rationale.
189-
pub async fn read_fn_memo<T: AnyTxn>(
190-
&self,
191-
txn: &mut T,
192-
path: &StablePath,
193-
fp: Fingerprint,
194-
) -> Result<Option<Vec<u8>>> {
195-
let key = key_fn_memo(path, fp)?;
196-
Ok(txn.db_get_bytes(self.db(), &key)?.map(<[u8]>::to_vec))
211+
impl FnMemoAccessor {
212+
pub fn new(app_store: AppStore, component_path: StablePath) -> Self {
213+
Self {
214+
app_store,
215+
component_path,
216+
}
197217
}
198218

199-
pub async fn write_fn_memo(
219+
/// Read raw function-memo bytes for the given fingerprint. Returns
220+
/// owned bytes; see [`AppStore::read_tracking_info`] for the rationale.
221+
pub async fn read<T: AnyTxn>(&self, txn: &mut T, fp: Fingerprint) -> Result<Option<Vec<u8>>> {
222+
let key = key_fn_memo(&self.component_path, fp)?;
223+
Ok(txn
224+
.db_get_bytes(self.app_store.db(), &key)?
225+
.map(<[u8]>::to_vec))
226+
}
227+
228+
pub async fn write(
200229
&self,
201-
txn: &mut WriteTxn<'_>,
202-
path: &StablePath,
230+
wtxn: &mut WriteTxn<'_>,
203231
fp: Fingerprint,
204232
entry: &FunctionMemoizationEntry<'_>,
205233
) -> Result<()> {
206-
let key = key_fn_memo(path, fp)?;
234+
let key = key_fn_memo(&self.component_path, fp)?;
207235
let value = rmp_serde::to_vec_named(entry)?;
208-
self.db().put(&mut **txn, &key, &value)?;
236+
self.app_store.db().put(&mut **wtxn, &key, &value)?;
209237
Ok(())
210238
}
211239

212-
/// GC: delete all function memos for `path` whose fingerprint is NOT in `keep`.
213-
pub async fn retain_fn_memos(
214-
&self,
215-
txn: &mut WriteTxn<'_>,
216-
path: &StablePath,
217-
keep: &HashSet<Fingerprint>,
218-
) -> Result<()> {
219-
let prefix = key_fn_memo_prefix(path)?;
220-
let db = self.db();
221-
let mut iter = db.prefix_iter_mut(&mut **txn, &prefix)?;
240+
/// GC: delete all function memos for this component whose fingerprint
241+
/// is NOT in `keep`.
242+
pub async fn retain(&self, wtxn: &mut WriteTxn<'_>, keep: &HashSet<Fingerprint>) -> Result<()> {
243+
let prefix = key_fn_memo_prefix(&self.component_path)?;
244+
let db = self.app_store.db();
245+
let mut iter = db.prefix_iter_mut(&mut **wtxn, &prefix)?;
222246
while let Some((raw_key, _)) = iter.next().transpose()? {
223247
let fp: Fingerprint = storekey::decode(raw_key[prefix.len()..].as_ref())?;
224248
if keep.contains(&fp) {

rust/core/src/state_store/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ mod app_store;
1212
mod storage;
1313
mod txn;
1414

15-
pub use app_store::AppStore;
15+
pub use app_store::{AppStore, FnMemoAccessor};
1616
pub use storage::{Storage, StorageSettings};
1717
pub use txn::{AnyTxn, ReadTxn, WriteTxn};

0 commit comments

Comments
 (0)