Skip to content

Commit 8a94ab4

Browse files
Speed up sync apply hot paths
1 parent 9540081 commit 8a94ab4

1 file changed

Lines changed: 87 additions & 8 deletions

File tree

crates/contextdb-engine/src/database.rs

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2345,6 +2345,8 @@ impl Database {
23452345
let mut vector_row_map: HashMap<u64, u64> = HashMap::new();
23462346
let mut vector_row_idx = 0usize;
23472347
let mut failed_row_ids: HashSet<u64> = HashSet::new();
2348+
let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
2349+
let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
23482350

23492351
for ddl in changes.ddl.clone() {
23502352
match ddl {
@@ -2417,12 +2419,16 @@ impl Database {
24172419
sql.push_str(&constraints.join(" "));
24182420
}
24192421
self.execute_in_tx(tx, &sql, &HashMap::new())?;
2422+
table_meta_cache.remove(&name);
2423+
visible_rows_cache.remove(&name);
24202424
}
24212425
DdlChange::DropTable { name } => {
24222426
if self.table_meta(&name).is_some() {
24232427
self.relational_store().drop_table(&name);
24242428
self.remove_persisted_table(&name)?;
24252429
}
2430+
table_meta_cache.remove(&name);
2431+
visible_rows_cache.remove(&name);
24262432
}
24272433
DdlChange::AlterTable {
24282434
name,
@@ -2447,6 +2453,8 @@ impl Database {
24472453
sql.push_str(&constraints.join(" "));
24482454
}
24492455
self.execute_in_tx(tx, &sql, &HashMap::new())?;
2456+
table_meta_cache.remove(&name);
2457+
visible_rows_cache.remove(&name);
24502458
}
24512459
}
24522460
}
@@ -2467,11 +2475,12 @@ impl Database {
24672475
.copied()
24682476
.unwrap_or(policies.default);
24692477

2470-
let existing = self.point_lookup(
2478+
let existing = cached_point_lookup(
2479+
self,
2480+
&mut visible_rows_cache,
24712481
&row.table,
24722482
&row.natural_key.column,
24732483
&row.natural_key.value,
2474-
self.snapshot(),
24752484
)?;
24762485
let is_delete = row.deleted;
24772486

@@ -2485,6 +2494,7 @@ impl Database {
24852494
});
24862495
result.skipped_rows += 1;
24872496
} else {
2497+
remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
24882498
result.applied_rows += 1;
24892499
}
24902500
} else {
@@ -2500,7 +2510,7 @@ impl Database {
25002510

25012511
match (existing, policy) {
25022512
(None, _) => {
2503-
if let Some(meta) = self.table_meta(&row.table) {
2513+
if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
25042514
let mut constraint_error: Option<String> = None;
25052515

25062516
for col_def in &meta.columns {
@@ -2523,16 +2533,18 @@ impl Database {
25232533

25242534
let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
25252535
if constraint_error.is_none() && has_unique {
2526-
let existing_rows =
2527-
self.scan(&row.table, self.snapshot()).unwrap_or_default();
25282536
for col_def in &meta.columns {
25292537
if col_def.unique
25302538
&& !col_def.primary_key
25312539
&& let Some(new_val) = values.get(&col_def.name)
25322540
&& *new_val != Value::Null
2533-
&& existing_rows
2534-
.iter()
2535-
.any(|r| r.values.get(&col_def.name) == Some(new_val))
2541+
&& cached_visible_rows(
2542+
self,
2543+
&mut visible_rows_cache,
2544+
&row.table,
2545+
)?
2546+
.iter()
2547+
.any(|r| r.values.get(&col_def.name) == Some(new_val))
25362548
{
25372549
constraint_error = Some(format!(
25382550
"UNIQUE constraint violated: {}.{}",
@@ -2562,6 +2574,18 @@ impl Database {
25622574

25632575
match self.insert_row(tx, &row.table, values.clone()) {
25642576
Ok(new_row_id) => {
2577+
record_cached_insert(
2578+
&mut visible_rows_cache,
2579+
&row.table,
2580+
VersionedRow {
2581+
row_id: new_row_id,
2582+
values: values.clone(),
2583+
created_tx: tx,
2584+
deleted_tx: None,
2585+
lsn: row.lsn,
2586+
created_at: None,
2587+
},
2588+
);
25652589
result.applied_rows += 1;
25662590
if let Some(remote_row_id) = vector_row_ids.get(vector_row_idx) {
25672591
vector_row_map.insert(*remote_row_id, new_row_id);
@@ -2666,6 +2690,7 @@ impl Database {
26662690
values.clone(),
26672691
) {
26682692
Ok(_) => {
2693+
visible_rows_cache.remove(&row.table);
26692694
result.applied_rows += 1;
26702695
if let Some(remote_row_id) = vector_row_ids.get(vector_row_idx) {
26712696
if let Ok(Some(found)) = self.point_lookup(
@@ -2705,6 +2730,7 @@ impl Database {
27052730
});
27062731
match self.upsert_row(tx, &row.table, &row.natural_key.column, values.clone()) {
27072732
Ok(_) => {
2733+
visible_rows_cache.remove(&row.table);
27082734
result.applied_rows += 1;
27092735
if let Some(remote_row_id) = vector_row_ids.get(vector_row_idx) {
27102736
if let Ok(Some(found)) = self.point_lookup(
@@ -2867,6 +2893,59 @@ fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
28672893
qr
28682894
}
28692895

2896+
fn cached_table_meta(
2897+
db: &Database,
2898+
cache: &mut HashMap<String, Option<TableMeta>>,
2899+
table: &str,
2900+
) -> Option<TableMeta> {
2901+
cache
2902+
.entry(table.to_string())
2903+
.or_insert_with(|| db.table_meta(table))
2904+
.clone()
2905+
}
2906+
2907+
fn cached_visible_rows<'a>(
2908+
db: &Database,
2909+
cache: &'a mut HashMap<String, Vec<VersionedRow>>,
2910+
table: &str,
2911+
) -> Result<&'a mut Vec<VersionedRow>> {
2912+
if !cache.contains_key(table) {
2913+
let rows = db.scan(table, db.snapshot())?;
2914+
cache.insert(table.to_string(), rows);
2915+
}
2916+
Ok(cache.get_mut(table).expect("cached visible rows"))
2917+
}
2918+
2919+
fn cached_point_lookup(
2920+
db: &Database,
2921+
cache: &mut HashMap<String, Vec<VersionedRow>>,
2922+
table: &str,
2923+
col: &str,
2924+
value: &Value,
2925+
) -> Result<Option<VersionedRow>> {
2926+
let rows = cached_visible_rows(db, cache, table)?;
2927+
Ok(rows
2928+
.iter()
2929+
.find(|r| r.values.get(col) == Some(value))
2930+
.cloned())
2931+
}
2932+
2933+
fn record_cached_insert(
2934+
cache: &mut HashMap<String, Vec<VersionedRow>>,
2935+
table: &str,
2936+
row: VersionedRow,
2937+
) {
2938+
if let Some(rows) = cache.get_mut(table) {
2939+
rows.push(row);
2940+
}
2941+
}
2942+
2943+
fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
2944+
if let Some(rows) = cache.get_mut(table) {
2945+
rows.retain(|row| row.row_id != row_id);
2946+
}
2947+
}
2948+
28702949
fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
28712950
match result {
28722951
Ok(query_result) => QueryOutcome::Success {

0 commit comments

Comments
 (0)