Skip to content

Commit 3865921

Browse files
georgeh0claude
andauthored
refactor(engine): per-component fn-memo cache with prefetch + flush (#1991)
Replace the previous `FnMemoAccessor` handle (#1990) with a `FnMemoCache` that owns the function-memoization lifecycle at the engine layer. - Eager prefix-scan prefetch at the start of build mode loads every fn-memo entry for the component into memory as `Stored(bytes)`. - `reserve_memoization` no longer reads from the database. Cache slot is looked up (or inserted as `Pending` on cache miss); `Stored` entries are lazy-decoded on first access via `decode_stored_entry`. - `finalize_fn_call_memoization` now walks the cache in memory and decodes `Stored` entries it reaches via dep chains — zero DB reads. - Commit-time flush is one consolidated pass: per-entry write/delete when the cache is fully loaded; prefix-delete + write-new when it isn't (covers `full_reprocess` and delete mode). - Net I/O per build: one prefix scan up front instead of O(N) point lookups during processing + one prefix scan at commit. Finalize's transitive dep walk drops from one point read per dep to zero. Storage API changes: - New: `list_fn_memos` (prefix read; layer-neutral name matching `list_child_existence` / `list_tombstones`), `delete_fn_memo`, `delete_all_fn_memos`. - Drop: `read_fn_memo`, `retain_fn_memos`. - `write_fn_memo` unchanged. Engine API changes: - New `FnCallMemoEntry::Stored(Vec<u8>)` variant; lazy-decoded. - `FnMemoCache<Prof>` on `ComponentBuildingState` replaces the `fn_call_memos: HashMap<…>` field; carries an `is_fully_loaded` flag driving the flush strategy. - `ComponentProcessorContext::prefetch_fn_memos()` runs in `execute_once` right after the build semaphore acquisition; idempotent and skipped under `full_reprocess` / delete mode. - `Committer::commit_in_txn` / `commit` lose the `all_memo_fps` + `memos_without_mounts_to_store` parameters in favor of a single `fn_memos: FnMemoCache<Prof>` param. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 192b60b commit 3865921

6 files changed

Lines changed: 392 additions & 259 deletions

File tree

rust/core/src/engine/component.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,13 @@ impl<Prof: EngineProfile> Component<Prof> {
741741
let ret_n_submit_output = {
742742
let _permit = self.inner.build_semaphore.acquire().await?;
743743

744+
// Eagerly load all function-memo entries for this component
745+
// into the per-build cache, so every subsequent fn-call probe
746+
// serves from memory. Skipped under `full_reprocess` and in
747+
// delete mode (no `ComponentBuildingState`); see the cache
748+
// flush logic for how those cases are handled at commit time.
749+
processor_context.prefetch_fn_memos().await?;
750+
744751
if memo_fp_to_store.is_some() {
745752
*self.inner.last_memo_fp.lock().unwrap() = memo_fp_to_store;
746753
// TODO: when matching, it means there're ongoing processing for the same memoization key pending on children.

rust/core/src/engine/context.rs

Lines changed: 251 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,17 @@ use crate::state::target_state_path::TargetStatePath;
1818
use crate::{
1919
engine::environment::{AppRegistration, Environment},
2020
state::stable_path::StablePath,
21-
state_store::{AppStore, FnMemoAccessor},
21+
state_store::{AppStore, WriteTxn},
2222
};
2323

24+
use cocoindex_utils::deser::from_msgpack_slice;
25+
26+
use crate::engine::execution::{
27+
deserialize_context_memo_states, deserialize_memo_values, serialize_context_memo_states,
28+
serialize_memo_values,
29+
};
30+
use crate::engine::profile::Persist;
31+
2432
struct AppContextInner<Prof: EngineProfile> {
2533
env: Environment<Prof>,
2634
app_store: AppStore,
@@ -209,6 +217,10 @@ impl<Prof: EngineProfile> MemoStatesPayload<Prof> {
209217
}
210218

211219
pub enum FnCallMemoEntry<Prof: EngineProfile> {
220+
/// Prefetched from the database but not yet accessed during this build.
221+
/// Lazily decoded on first access (transitions to `Ready` or stays
222+
/// `Stored` for later GC). Treated as untouched at flush time.
223+
Stored(Vec<u8>),
212224
/// Memoization result is pending, i.e. the function call is not finished yet.
213225
Pending,
214226
/// Memoization result is ready. None means memoization is disabled, e.g. it mounts child components.
@@ -221,10 +233,214 @@ impl<Prof: EngineProfile> Default for FnCallMemoEntry<Prof> {
221233
}
222234
}
223235

236+
/// In-memory cache of all function-memoization entries for a single
237+
/// component build. Populated by [`Self::prefetch`] (one prefix scan over
238+
/// the storage layer) at the start of build mode, then serves every
239+
/// subsequent fn-memo lookup in memory. New entries from cache-miss
240+
/// executions accumulate here; [`Self::flush_to_db`] applies the diff to
241+
/// storage at commit time as per-entry writes and deletes.
242+
pub(crate) struct FnMemoCache<Prof: EngineProfile> {
243+
/// All known fn-memo entries for this component. Prefetched entries
244+
/// start as `Stored(bytes)` and lazily decode to `Ready` on first
245+
/// access; cache-miss inserts go straight to `Pending` then `Ready`.
246+
entries: HashMap<Fingerprint, Arc<tokio::sync::RwLock<FnCallMemoEntry<Prof>>>>,
247+
/// True after a successful `prefetch`. Stays false under
248+
/// `full_reprocess` (prefetch skipped) or before `prefetch` runs.
249+
/// Determines the flush strategy: per-entry writes/deletes when true,
250+
/// prefix-delete + per-entry writes when false.
251+
is_fully_loaded: bool,
252+
}
253+
254+
impl<Prof: EngineProfile> FnMemoCache<Prof> {
255+
pub(crate) fn new() -> Self {
256+
Self {
257+
entries: HashMap::new(),
258+
is_fully_loaded: false,
259+
}
260+
}
261+
262+
/// Insert prefetched rows from the database into the cache and mark it
263+
/// fully loaded. The async I/O lives at the context layer
264+
/// ([`ComponentProcessorContext::prefetch_fn_memos`]); this is the sync
265+
/// half that runs under the building-state mutex.
266+
pub(crate) fn populate(&mut self, rows: Vec<(Fingerprint, Vec<u8>)>) {
267+
for (fp, bytes) in rows {
268+
self.entries.entry(fp).or_insert_with(|| {
269+
Arc::new(tokio::sync::RwLock::new(FnCallMemoEntry::Stored(bytes)))
270+
});
271+
}
272+
self.is_fully_loaded = true;
273+
}
274+
275+
/// Get the entry for `fp`, inserting a `Pending` slot if absent. The
276+
/// returned `Arc<RwLock<_>>` is what `reserve_memoization` locks.
277+
pub(crate) fn entry_or_pending(
278+
&mut self,
279+
fp: Fingerprint,
280+
) -> Arc<tokio::sync::RwLock<FnCallMemoEntry<Prof>>> {
281+
self.entries
282+
.entry(fp)
283+
.or_insert_with(|| Arc::new(tokio::sync::RwLock::new(FnCallMemoEntry::Pending)))
284+
.clone()
285+
}
286+
287+
/// Read-only lookup. Returns `None` if no entry exists for `fp`. Used
288+
/// by the finalize-time dep walk.
289+
pub(crate) fn get(
290+
&self,
291+
fp: Fingerprint,
292+
) -> Option<Arc<tokio::sync::RwLock<FnCallMemoEntry<Prof>>>> {
293+
self.entries.get(&fp).cloned()
294+
}
295+
296+
pub(crate) fn is_fully_loaded(&self) -> bool {
297+
self.is_fully_loaded
298+
}
299+
300+
/// Walk all entries. Used by finalize to enumerate touched/untouched
301+
/// state without consuming the cache.
302+
pub(crate) fn iter(
303+
&self,
304+
) -> impl Iterator<
305+
Item = (
306+
&Fingerprint,
307+
&Arc<tokio::sync::RwLock<FnCallMemoEntry<Prof>>>,
308+
),
309+
> {
310+
self.entries.iter()
311+
}
312+
313+
/// Consume the cache and apply its diff to storage in a single write
314+
/// transaction.
315+
///
316+
/// - If `is_fully_loaded` is true: for each entry, write `Ready(Some)`
317+
/// with `already_stored=false` (new or re-executed), delete
318+
/// `Stored(_)` and `Ready(None)` (untouched or memoization-disabled),
319+
/// skip `Ready(Some)` with `already_stored=true` (cache hit, row
320+
/// already in DB).
321+
/// - Otherwise: prefix-delete every fn-memo row for `path`, then write
322+
/// the `Ready(Some)` `already_stored=false` entries. Covers
323+
/// `full_reprocess` and any path where the cache was not prefetched.
324+
pub(crate) async fn flush_to_db(
325+
self,
326+
wtxn: &mut WriteTxn<'_>,
327+
app_store: &AppStore,
328+
path: &StablePath,
329+
) -> Result<()> {
330+
if !self.is_fully_loaded {
331+
app_store.delete_all_fn_memos(wtxn, path).await?;
332+
}
333+
for (fp, lock) in self.entries.into_iter() {
334+
// No other holders at flush time — extract by reference under
335+
// a try_write guard rather than unwrapping the Arc, since
336+
// upstream cancellation paths may have leaked clones.
337+
let mut guard = lock.try_write().map_err(|_| {
338+
internal_error!("fn memo entry for {fp:?} still locked at flush time")
339+
})?;
340+
let state = std::mem::replace(&mut *guard, FnCallMemoEntry::Pending);
341+
match state {
342+
FnCallMemoEntry::Ready(Some(memo)) => {
343+
if memo.already_stored {
344+
// Cache hit: DB row already correct.
345+
continue;
346+
}
347+
let ret_bytes = memo.ret.to_bytes()?;
348+
let memo_states_serialized = serialize_memo_values::<Prof>(&memo.memo_states)?;
349+
let context_memo_states_serialized =
350+
serialize_context_memo_states::<Prof>(&memo.context_memo_states)?;
351+
let entry = db_schema::FunctionMemoizationEntry {
352+
return_value: db_schema::MemoizedValue::Inlined(Cow::Borrowed(
353+
ret_bytes.as_ref(),
354+
)),
355+
child_components: vec![],
356+
target_state_paths: memo.target_state_paths,
357+
dependency_memo_entries: memo.dependency_memo_entries.into_iter().collect(),
358+
logic_deps: memo.logic_deps.into_iter().collect(),
359+
memo_states: memo_states_serialized,
360+
context_memo_states: context_memo_states_serialized,
361+
};
362+
app_store.write_fn_memo(wtxn, path, fp, &entry).await?;
363+
}
364+
FnCallMemoEntry::Stored(_) | FnCallMemoEntry::Ready(None) => {
365+
if self.is_fully_loaded {
366+
// Stored: untouched prefetched entry, stale.
367+
// Ready(None): memoization disabled at runtime.
368+
app_store.delete_fn_memo(wtxn, path, fp).await?;
369+
}
370+
}
371+
FnCallMemoEntry::Pending => {
372+
// The slot was inserted via `entry_or_pending` for a fp not
373+
// present in the prefetched cache (prefetched fps decode
374+
// to `Ready` in `reserve_memoization`, never end up
375+
// Pending). The function call then errored before
376+
// resolving — e.g. the caller wrapped it in try/except
377+
// and continued. Such fps have no DB row, so nothing to
378+
// write or delete.
379+
}
380+
}
381+
}
382+
Ok(())
383+
}
384+
}
385+
386+
impl<Prof: EngineProfile> Default for FnMemoCache<Prof> {
387+
fn default() -> Self {
388+
Self::new()
389+
}
390+
}
391+
392+
/// Decode a `Stored(bytes)` entry into `Ready(Some(memo))` if the entry's
393+
/// stored logic deps still resolve. If the deps no longer resolve (e.g.
394+
/// logic registry no longer contains a fingerprint), or the entry is a
395+
/// legacy form with `child_components`, the result is `Ready(None)` so
396+
/// the entry is treated as a deletion at flush time.
397+
///
398+
/// This helper is shared between `reserve_memoization` (probe path) and
399+
/// the finalize dep walk; both call it under the per-entry write lock.
400+
///
401+
/// `*entry` must be `Stored(_)` on entry. After this call it is `Ready`.
402+
pub(crate) fn decode_stored_entry<Prof: EngineProfile>(
403+
entry: &mut FnCallMemoEntry<Prof>,
404+
env: &Environment<Prof>,
405+
) -> Result<()> {
406+
let FnCallMemoEntry::Stored(bytes) = std::mem::replace(entry, FnCallMemoEntry::Pending) else {
407+
internal_bail!("decode_stored_entry called on non-Stored entry");
408+
};
409+
let decoded: db_schema::FunctionMemoizationEntry<'_> = from_msgpack_slice(&bytes)?;
410+
if !crate::engine::logic_registry::all_contained_with_env(&decoded.logic_deps, env) {
411+
*entry = FnCallMemoEntry::Ready(None);
412+
return Ok(());
413+
}
414+
if !decoded.child_components.is_empty() {
415+
// Legacy entry with stored child component paths. Invalidate so the
416+
// function re-runs, detects child components, and the entry is
417+
// cleaned up.
418+
*entry = FnCallMemoEntry::Ready(None);
419+
return Ok(());
420+
}
421+
let return_value_bytes = match decoded.return_value {
422+
db_schema::MemoizedValue::Inlined(b) => b,
423+
};
424+
let ret = Prof::FunctionData::from_bytes(return_value_bytes.as_ref())?;
425+
let memo_states = deserialize_memo_values::<Prof>(&decoded.memo_states)?;
426+
let context_memo_states =
427+
deserialize_context_memo_states::<Prof>(&decoded.context_memo_states)?;
428+
*entry = FnCallMemoEntry::Ready(Some(FnCallMemo {
429+
ret,
430+
target_state_paths: decoded.target_state_paths,
431+
dependency_memo_entries: decoded.dependency_memo_entries.into_iter().collect(),
432+
logic_deps: decoded.logic_deps.into_iter().collect(),
433+
memo_states,
434+
context_memo_states,
435+
already_stored: true,
436+
}));
437+
Ok(())
438+
}
439+
224440
pub(crate) struct ComponentBuildingState<Prof: EngineProfile> {
225441
pub target_states: ComponentTargetStatesContext<Prof>,
226442
pub child_path_set: ChildStablePathSet,
227-
pub fn_call_memos: HashMap<Fingerprint, Arc<tokio::sync::RwLock<FnCallMemoEntry<Prof>>>>,
443+
pub fn_memos: FnMemoCache<Prof>,
228444
}
229445

230446
pub(crate) struct ComponentBuildContext<Prof: EngineProfile> {
@@ -261,7 +477,7 @@ impl<Prof: EngineProfile> ComponentProcessingAction<Prof> {
261477
provider_registry: TargetStateProviderRegistry::new(providers),
262478
},
263479
child_path_set: Default::default(),
264-
fn_call_memos: Default::default(),
480+
fn_memos: FnMemoCache::new(),
265481
})),
266482
full_reprocess,
267483
live,
@@ -283,13 +499,6 @@ struct ComponentProcessorContextInner<Prof: EngineProfile> {
283499

284500
/// Opaque per-operation context (e.g. ContextProvider on the Python side).
285501
host_ctx: Arc<Prof::HostCtx>,
286-
287-
/// Per-component handle for function-memo I/O. Built once when the
288-
/// context is constructed and shared by reference, so any per-build
289-
/// state inside the accessor (e.g. a buffer in a future backend)
290-
/// persists across the many fn-memo lookups within one component's
291-
/// processing phase.
292-
fn_memo_accessor: FnMemoAccessor,
293502
}
294503

295504
#[derive(Clone)]
@@ -305,10 +514,6 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
305514
host_ctx: Arc<Prof::HostCtx>,
306515
processing_action: ComponentProcessingAction<Prof>,
307516
) -> Self {
308-
let fn_memo_accessor = FnMemoAccessor::new(
309-
component.app_ctx().app_store().clone(),
310-
component.stable_path().clone(),
311-
);
312517
Self {
313518
inner: Arc::new(ComponentProcessorContextInner {
314519
component,
@@ -319,7 +524,6 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
319524
inflight_permit: Mutex::new(None),
320525
logic_deps: Mutex::new(HashSet::new()),
321526
host_ctx,
322-
fn_memo_accessor,
323527
}),
324528
}
325529
}
@@ -340,12 +544,38 @@ impl<Prof: EngineProfile> ComponentProcessorContext<Prof> {
340544
self.inner.component.stable_path()
341545
}
342546

343-
/// Per-component handle for function-memoization I/O. All engine code
344-
/// that reads, writes, or retains function memos under this component's
345-
/// path should go through this handle rather than calling `AppStore`
346-
/// methods directly — see [`FnMemoAccessor`] for why.
347-
pub fn fn_memo_accessor(&self) -> &FnMemoAccessor {
348-
&self.inner.fn_memo_accessor
547+
/// Eagerly load every function-memo entry for this component from
548+
/// storage into the per-build cache. Called at the start of build mode
549+
/// before any function calls run; skipped under `full_reprocess` so
550+
/// the cache stays empty and `FnMemoCache::flush_to_db` falls through
551+
/// to a prefix delete + write of newly-computed entries.
552+
pub(crate) async fn prefetch_fn_memos(&self) -> Result<()> {
553+
if self.full_reprocess() {
554+
return Ok(());
555+
}
556+
// Cheap check: skip if already loaded (re-entry, etc.).
557+
let already_loaded = match &self.inner.processing_action {
558+
ComponentProcessingAction::Build(build_ctx) => {
559+
let guard = build_ctx.state.lock().unwrap();
560+
let Some(state) = guard.as_ref() else {
561+
return Ok(());
562+
};
563+
state.fn_memos.is_fully_loaded()
564+
}
565+
ComponentProcessingAction::Delete { .. } => return Ok(()),
566+
};
567+
if already_loaded {
568+
return Ok(());
569+
}
570+
let app_store = self.app_ctx().app_store();
571+
let path = self.stable_path();
572+
let mut rtxn = self.app_ctx().env().read_txn().await?;
573+
let rows = app_store.list_fn_memos(&mut rtxn, path).await?;
574+
drop(rtxn);
575+
self.update_building_state(|s| {
576+
s.fn_memos.populate(rows);
577+
Ok(())
578+
})
349579
}
350580

351581
pub(crate) fn parent_context(&self) -> Option<&ComponentProcessorContext<Prof>> {

0 commit comments

Comments
 (0)