Skip to content

Commit 75479df

Browse files
jamiepineclaude
andauthored
fix library sync backfill O(N^2) hotspots on both sides of the protocol (#3061)
* fix library sync backfill O(N^2) hotspots on both sides of the protocol Addresses #3058 (shared models, content_identity) and #3060 (device-owned entries). The progressive slowdown on initial pair came from several independent hotspots that compound: Sender side - PeerLog::get_since now pushes LIMIT into SQL; callers fetch limit + 1 to derive has_more. Previously every SharedChangeRequest reloaded and parsed the entire remaining shared_changes log in memory before truncating, making sender work O(N * batches). - Entry and content_identity query_for_sync batch FK to UUID conversion across the whole batch via a new convert_fks_to_uuids_batch helper — one DB round trip per FK type instead of per record per FK. Schema - New index idx_entries_indexed_at_uuid backing the (indexed_at, uuid) cursor in Entry::query_for_sync. Without it every batch request fell back to a full-table scan. Receiver side - New task-local in_backfill scope wraps the backfill apply phases. Entry::apply_state_change uses it to skip per-entry entry_closure rebuild; the existing post_backfill_rebuild pass does a single bulk rebuild at the end. emit_batch_resource_events short-circuits during backfill and the coordinator emits one Event::Refresh after post- backfill rebuild so the UI invalidates cached views. - Entry FK resolution in backfill splits mappings into self-referential (parent_id) and the rest. Non-self FKs resolve in one batch across the whole batch; only parent_id still runs per-entry so children can see just-inserted parents. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * address PR review feedback: drop unresolved FKs, guard limits convert_fks_to_uuids_batch now returns the set of record indices whose FK could not be resolved (missing target row or non-integer value). Callers in entry::query_for_sync and content_identity::query_for_sync drop those records from the outgoing sync batch instead of zipping the partially converted payload back. Previously the sender would ship a record with its local int field intact and no *_uuid field; the receiver's map_sync_json_to_local interpreted that as already-resolved and wrote the sender-local integer directly to the local DB, corrupting the FK. Also: reject limit == 0 on SharedChangeRequest / get_shared_changes up front (would have returned 0 rows with has_more = true and spun), and clamp the SQL LIMIT bind with i64::try_from(..).unwrap_or(i64::MAX) instead of a wrapping `as i64`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * further review fixes: treat absent FK fields as failed, drop NULL indexed_at convert_fks_to_uuids_batch previously treated a missing local_field as null. convert_fk_to_uuid errors in that case — only an explicit JSON null is a legitimate null FK — so the batch helper now flags absent fields as failed to match the per-record contract. Entry::query_for_sync no longer falls back to modified_at when indexed_at is NULL. The cursor filter/order uses indexed_at exclusively, so a returned cursor derived from modified_at wouldn't match the next query's predicate. The indexed_at backfill migration populated existing rows; any NULL here is a data bug, logged and skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * cargo fmt Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c63f1b0 commit 75479df

12 files changed

Lines changed: 522 additions & 124 deletions

File tree

core/src/domain/resource_manager.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,19 @@ impl ResourceManager {
269269
resource_type: &str,
270270
resource_ids: Vec<Uuid>,
271271
) -> Result<()> {
272+
// During backfill we skip per-record fan-out. For models like
273+
// content_identity each UUID triggers 2 DB queries via dependency
274+
// routing, which dominates apply time on large libraries. The backfill
275+
// coordinator emits a single coarse invalidation after the scope ends.
276+
if crate::infra::sync::is_in_backfill() {
277+
tracing::trace!(
278+
resource_type = %resource_type,
279+
count = resource_ids.len(),
280+
"Skipping per-record resource event emission during backfill"
281+
);
282+
return Ok(());
283+
}
284+
272285
// For now, delegate to single-resource handler
273286
// In future, could optimize by batching virtual resource construction
274287
self.emit_resource_events(resource_type, resource_ids).await

core/src/infra/db/entities/content_identity.rs

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -246,36 +246,77 @@ impl Syncable for Model {
246246

247247
let results = query.all(db).await?;
248248

249-
let mut sync_results = Vec::new();
249+
let mut sync_results: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
250+
Vec::with_capacity(results.len());
250251
for content in results {
251-
if content.uuid.is_none() {
252-
continue;
253-
}
252+
let uuid = match content.uuid {
253+
Some(u) => u,
254+
None => continue,
255+
};
254256

255-
let mut json = match content.to_sync_json() {
257+
let json = match content.to_sync_json() {
256258
Ok(j) => j,
257259
Err(e) => {
258260
tracing::warn!(error = %e, content_hash = %content.content_hash, "Failed to serialize content_identity for sync");
259261
continue;
260262
}
261263
};
262264

263-
// Convert FK to UUID for cross-device compatibility
264-
for fk in Self::foreign_key_mappings() {
265-
if let Err(e) =
266-
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
265+
sync_results.push((uuid, json, content.last_verified_at));
266+
}
267+
268+
// Batch FK → UUID conversion across the whole batch: one DB round trip
269+
// per FK type instead of one per (record × FK). Records that fail
270+
// resolution are dropped so peers never see a sender-local int in the
271+
// payload.
272+
let fk_mappings = Self::foreign_key_mappings();
273+
if !fk_mappings.is_empty() && !sync_results.is_empty() {
274+
let mut payloads: Vec<serde_json::Value> = sync_results
275+
.iter()
276+
.map(|(_, json, _)| json.clone())
277+
.collect();
278+
let mut failed_indices: std::collections::HashSet<usize> =
279+
std::collections::HashSet::new();
280+
281+
for fk in &fk_mappings {
282+
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
283+
&mut payloads,
284+
fk,
285+
db,
286+
)
287+
.await
267288
{
268-
tracing::warn!(
269-
error = %e,
270-
uuid = %content.uuid.unwrap(),
271-
fk_field = fk.local_field,
272-
"Failed to convert FK to UUID, skipping content_identity"
273-
);
274-
continue;
289+
Ok(failed) => failed_indices.extend(failed),
290+
Err(e) => {
291+
tracing::warn!(
292+
error = %e,
293+
fk_field = fk.local_field,
294+
"Batch FK conversion failed for content_identity"
295+
);
296+
return Err(sea_orm::DbErr::Custom(format!(
297+
"ContentIdentity FK batch conversion failed: {}",
298+
e
299+
)));
300+
}
275301
}
276302
}
277303

278-
sync_results.push((content.uuid.unwrap(), json, content.last_verified_at));
304+
sync_results = sync_results
305+
.into_iter()
306+
.zip(payloads.into_iter())
307+
.enumerate()
308+
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
309+
if failed_indices.contains(&idx) {
310+
tracing::warn!(
311+
uuid = %uuid,
312+
"Dropping content_identity with unresolved FK from sync batch"
313+
);
314+
None
315+
} else {
316+
Some((uuid, resolved, ts))
317+
}
318+
})
319+
.collect();
279320
}
280321

281322
Ok(sync_results)

core/src/infra/db/entities/entry.rs

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -292,16 +292,31 @@ impl crate::infra::sync::Syncable for Model {
292292
std::collections::HashMap::new()
293293
};
294294

295-
// Convert to sync format with FK mapping
296-
let mut sync_results = Vec::new();
295+
// Serialize each row to JSON with its UUID and timestamp.
296+
let mut staged: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
297+
Vec::with_capacity(results.len());
297298

298299
for entry in results {
299300
let uuid = match entry.uuid {
300301
Some(u) => u,
301-
None => continue, // Skip entries without UUIDs
302+
None => continue,
303+
};
304+
305+
// The sync cursor filters/orders on `indexed_at`, so a row with a
306+
// NULL `indexed_at` would emit a cursor the next query predicate
307+
// doesn't represent. Skip them — the indexed_at backfill migration
308+
// populated every existing row, so this should be unreachable.
309+
let indexed_at = match entry.indexed_at {
310+
Some(ts) => ts,
311+
None => {
312+
tracing::warn!(
313+
uuid = %uuid,
314+
"Entry has NULL indexed_at; skipping sync until it's populated"
315+
);
316+
continue;
317+
}
302318
};
303319

304-
// Serialize to JSON
305320
let mut json = match entry.to_sync_json() {
306321
Ok(j) => j,
307322
Err(e) => {
@@ -311,9 +326,8 @@ impl crate::infra::sync::Syncable for Model {
311326
};
312327

313328
// For directories, include the absolute path from directory_paths
314-
// This ensures receiving devices get identical paths for universal addressing
329+
// so receiving devices get identical paths for universal addressing.
315330
if entry.kind == 1 {
316-
// Directory
317331
if let Some(path) = directory_paths_map.get(&entry.id) {
318332
if let Some(obj) = json.as_object_mut() {
319333
obj.insert(
@@ -324,27 +338,62 @@ impl crate::infra::sync::Syncable for Model {
324338
}
325339
}
326340

327-
// Convert FK integer IDs to UUIDs
328-
for fk in <Model as Syncable>::foreign_key_mappings() {
329-
if let Err(e) =
330-
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
341+
staged.push((uuid, json, indexed_at));
342+
}
343+
344+
// Batch-convert FK integer IDs to UUIDs one FK type at a time across
345+
// the whole batch — single DB round trip per FK, not per record × FK.
346+
// Any record that fails resolution (missing target, bad value) is
347+
// dropped before we return so peers never see a sender-local int.
348+
let fk_mappings = <Model as Syncable>::foreign_key_mappings();
349+
if !fk_mappings.is_empty() && !staged.is_empty() {
350+
let mut payloads: Vec<serde_json::Value> =
351+
staged.iter().map(|(_, json, _)| json.clone()).collect();
352+
let mut failed_indices: std::collections::HashSet<usize> =
353+
std::collections::HashSet::new();
354+
355+
for fk in &fk_mappings {
356+
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
357+
&mut payloads,
358+
fk,
359+
db,
360+
)
361+
.await
331362
{
332-
tracing::warn!(
333-
error = %e,
334-
uuid = %uuid,
335-
fk_field = fk.local_field,
336-
"Failed to convert FK to UUID, skipping entry"
337-
);
338-
continue;
363+
Ok(failed) => failed_indices.extend(failed),
364+
Err(e) => {
365+
tracing::warn!(
366+
error = %e,
367+
fk_field = fk.local_field,
368+
"Batch FK conversion failed for entries"
369+
);
370+
return Err(sea_orm::DbErr::Custom(format!(
371+
"Entry FK batch conversion failed: {}",
372+
e
373+
)));
374+
}
339375
}
340376
}
341377

342-
// Use indexed_at for checkpoint/watermark tracking, fallback to modified_at if NULL
343-
let timestamp = entry.indexed_at.unwrap_or(entry.modified_at);
344-
sync_results.push((uuid, json, timestamp));
378+
staged = staged
379+
.into_iter()
380+
.zip(payloads.into_iter())
381+
.enumerate()
382+
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
383+
if failed_indices.contains(&idx) {
384+
tracing::warn!(
385+
uuid = %uuid,
386+
"Dropping entry with unresolved FK from sync batch"
387+
);
388+
None
389+
} else {
390+
Some((uuid, resolved, ts))
391+
}
392+
})
393+
.collect();
345394
}
346395

347-
Ok(sync_results)
396+
Ok(staged)
348397
}
349398

350399
/// Apply state change - already implemented in Model impl block below
@@ -564,10 +613,12 @@ impl Model {
564613
inserted.id
565614
};
566615

567-
// Rebuild entry_closure for this synced entry
568-
// Without this, the entry only has a self-reference and cannot be queried
569-
// for descendants, breaking subtree operations, location scoping, etc.
570-
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
616+
// Rebuild entry_closure for this synced entry, unless we're inside a
617+
// backfill apply loop — in that case the post_backfill_rebuild hook
618+
// does a single bulk rebuild at the end, so per-entry work is wasted.
619+
if !crate::infra::sync::is_in_backfill() {
620+
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
621+
}
571622

572623
// If this is a directory, create or update its entry in the directory_paths table
573624
if EntryKind::from(kind) == EntryKind::Directory {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//! Add composite index on entries(indexed_at, uuid) to back the device-owned
2+
//! sync cursor.
3+
//!
4+
//! `Entry::query_for_sync` paginates by `ORDER BY indexed_at ASC, uuid ASC`
5+
//! with a tie-breaker filter of the same shape. Without this index, SQLite
6+
//! does a full table scan per batch request — O(N) per batch, O(N^2) across
7+
//! an initial backfill of a large library.
8+
9+
use sea_orm_migration::prelude::*;
10+
11+
#[derive(DeriveMigrationName)]
12+
pub struct Migration;
13+
14+
#[async_trait::async_trait]
15+
impl MigrationTrait for Migration {
16+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
17+
manager
18+
.get_connection()
19+
.execute_unprepared(
20+
"CREATE INDEX IF NOT EXISTS idx_entries_indexed_at_uuid \
21+
ON entries(indexed_at, uuid)",
22+
)
23+
.await?;
24+
25+
Ok(())
26+
}
27+
28+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
29+
manager
30+
.get_connection()
31+
.execute_unprepared("DROP INDEX IF EXISTS idx_entries_indexed_at_uuid")
32+
.await?;
33+
34+
Ok(())
35+
}
36+
}

core/src/infra/db/migration/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod m20260104_000001_replace_device_id_with_volume_id;
3838
mod m20260105_000001_add_volume_id_to_locations;
3939
mod m20260114_000001_fix_search_index_include_directories;
4040
mod m20260123_000001_remove_legacy_sync_columns;
41+
mod m20260417_000001_add_entries_sync_cursor_index;
4142

4243
pub struct Migrator;
4344

@@ -81,6 +82,7 @@ impl MigratorTrait for Migrator {
8182
Box::new(m20260105_000001_add_volume_id_to_locations::Migration),
8283
Box::new(m20260114_000001_fix_search_index_include_directories::Migration),
8384
Box::new(m20260123_000001_remove_legacy_sync_columns::Migration),
85+
Box::new(m20260417_000001_add_entries_sync_cursor_index::Migration),
8486
]
8587
}
8688
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! Task-local flag identifying code running inside a backfill apply loop.
2+
//!
3+
//! A few per-record hooks (closure table rebuild, resource event emission)
4+
//! are redundant during backfill because the coordinator does bulk work at
5+
//! the end. Models check this flag to skip that per-record work.
6+
7+
tokio::task_local! {
8+
static IN_BACKFILL: ();
9+
}
10+
11+
/// Run `fut` with the in-backfill flag set. Nested scopes are allowed.
12+
pub async fn in_backfill<F, T>(fut: F) -> T
13+
where
14+
F: std::future::Future<Output = T>,
15+
{
16+
if is_in_backfill() {
17+
fut.await
18+
} else {
19+
IN_BACKFILL.scope((), fut).await
20+
}
21+
}
22+
23+
/// True when the current task is inside an `in_backfill` scope.
24+
pub fn is_in_backfill() -> bool {
25+
IN_BACKFILL.try_with(|_| ()).is_ok()
26+
}

0 commit comments

Comments
 (0)