Skip to content

Commit 92c8e95

Browse files
Centrilgefjon
andauthored
MutTxId::update: consider deleted commited row when replacing tx row (#2302)
Signed-off-by: Mazdak Farrokhzad <[email protected]> Co-authored-by: Phoebe Goldman <[email protected]>
1 parent d92a3f5 commit 92c8e95

File tree

2 files changed

+127
-5
lines changed

2 files changed

+127
-5
lines changed

crates/core/src/db/datastore/locking_tx_datastore/datastore.rs

+80
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,7 @@ mod tests {
10421042
use core::{fmt, mem};
10431043
use itertools::Itertools;
10441044
use pretty_assertions::{assert_eq, assert_matches};
1045+
use spacetimedb_execution::Datastore;
10451046
use spacetimedb_lib::db::auth::{StAccess, StTableType};
10461047
use spacetimedb_lib::error::ResultTest;
10471048
use spacetimedb_lib::st_var::StVarValue;
@@ -2283,6 +2284,85 @@ mod tests {
22832284
Ok(())
22842285
}
22852286

2287+
#[test]
2288+
fn test_update_brings_back_deleted_commit_row_repro_2296() -> ResultTest<()> {
2289+
// Get us a datastore and tx.
2290+
let datastore = get_datastore()?;
2291+
let mut tx = begin_mut_tx(&datastore);
2292+
2293+
// Create the table. The minimal repro is a two column table with a unique constraint.
2294+
let table_id = TableId::SENTINEL;
2295+
let col = |pos: usize| ColumnSchema {
2296+
table_id,
2297+
col_pos: pos.into(),
2298+
col_name: format!("c{pos}").into(),
2299+
col_type: AlgebraicType::U32,
2300+
};
2301+
let table_schema = TableSchema::new(
2302+
table_id,
2303+
"Foo".into(),
2304+
[col(0), col(1)].into(),
2305+
vec![IndexSchema {
2306+
table_id,
2307+
index_id: IndexId::SENTINEL,
2308+
index_name: "index".into(),
2309+
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: 0.into() }),
2310+
}],
2311+
vec![ConstraintSchema {
2312+
table_id,
2313+
constraint_id: ConstraintId::SENTINEL,
2314+
constraint_name: "constraint".into(),
2315+
data: ConstraintData::Unique(UniqueConstraintData { columns: 0.into() }),
2316+
}],
2317+
vec![],
2318+
StTableType::User,
2319+
StAccess::Public,
2320+
None,
2321+
None,
2322+
);
2323+
let table_id = datastore.create_table_mut_tx(&mut tx, table_schema)?;
2324+
let index_id = datastore.index_id_from_name_mut_tx(&tx, "index")?.unwrap();
2325+
let find_row_by_key = |tx: &MutTxId, key: u32| {
2326+
tx.index_scan_point(table_id, index_id, &key.into())
2327+
.unwrap()
2328+
.map(|row| row.pointer())
2329+
.collect::<Vec<_>>()
2330+
};
2331+
2332+
// Insert `Foo { c0: 0, c1: 0 }`.
2333+
const KEY: u32 = 0;
2334+
const DATA: u32 = 0;
2335+
let row = &product![KEY, DATA];
2336+
let row_prime = &product![KEY, DATA + 1];
2337+
insert(&datastore, &mut tx, table_id, row)?;
2338+
2339+
// It's important for the test that the row is committed.
2340+
commit(&datastore, tx)?;
2341+
2342+
// Start a new transaction where we:
2343+
let mut tx = begin_mut_tx(&datastore);
2344+
// 1. delete the row.
2345+
let row_to_del = find_row_by_key(&tx, KEY);
2346+
datastore.delete_mut_tx(&mut tx, table_id, row_to_del.iter().copied());
2347+
// 2. insert a new row with the same key as the one we deleted but different extra field.
2348+
// We should now have a committed row `row` marked as deleted and a row `row_prime`.
2349+
// These share `KEY`.
2350+
insert(&datastore, &mut tx, table_id, row_prime)?;
2351+
// 3. update `row_prime` -> `row`.
2352+
// Because `row` exists in the committed state but was marked as deleted,
2353+
// it should be undeleted, without a new row being added to the tx state.
2354+
let (_, row_ref) = update(&datastore, &mut tx, table_id, index_id, row)?;
2355+
assert_eq!([row_ref.pointer()], &*row_to_del);
2356+
assert_eq!(row_to_del, find_row_by_key(&tx, KEY));
2357+
2358+
// Commit the transaction.
2359+
// We expect the transaction to be a noop.
2360+
let tx_data = commit(&datastore, tx)?;
2361+
assert_eq!(tx_data.inserts().count(), 0);
2362+
assert_eq!(tx_data.deletes().count(), 0);
2363+
Ok(())
2364+
}
2365+
22862366
#[test]
22872367
fn test_update_no_such_index() -> ResultTest<()> {
22882368
let (datastore, tx, table_id) = setup_table_with_indices([].into(), [].into())?;

crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs

+47-5
Original file line numberDiff line numberDiff line change
@@ -1347,6 +1347,8 @@ impl MutTxId {
13471347
if let (_, Some(commit_ptr)) =
13481348
unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, tx_row_ptr, tx_row_hash) }
13491349
{
1350+
// (insert_undelete)
1351+
// -----------------------------------------------------
13501352
// If `row` was already present in the committed state,
13511353
// either this is a set-semantic duplicate,
13521354
// or the row is marked as deleted, so we will undelete it
@@ -1547,6 +1549,8 @@ impl MutTxId {
15471549
// In the former case, the index must not have been removed in the transaction.
15481550
// As it's unlikely that an index was added in this transaction,
15491551
// we begin by checking the committed state.
1552+
let commit_blob_store = &self.committed_state_write_lock.blob_store;
1553+
let mut old_commit_del_ptr = None;
15501554
let err = 'failed_rev_ins: {
15511555
let tx_row_ptr = if tx_removed_index {
15521556
break 'failed_rev_ins IndexError::NotFound(index_id).into();
@@ -1561,7 +1565,13 @@ impl MutTxId {
15611565
.committed_state_write_lock
15621566
.get_index_by_id_with_table(table_id, index_id)
15631567
.and_then(|index| find_old_row(tx_row_ref, index).0.map(|ptr| (index, ptr)))
1564-
.filter(|(_, ptr)| !del_table.contains(*ptr))
1568+
.filter(|&(_, ptr)| {
1569+
// Was committed row previously deleted in this TX?
1570+
let deleted = del_table.contains(ptr);
1571+
// If so, remember it in case it was identical to the new row.
1572+
old_commit_del_ptr = deleted.then_some(ptr);
1573+
!deleted
1574+
})
15651575
{
15661576
// 1. Ensure the index is unique.
15671577
// 2. Ensure the new row doesn't violate any other committed state unique indices.
@@ -1581,7 +1591,6 @@ impl MutTxId {
15811591
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
15821592
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
15831593
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
1584-
let commit_blob_store = &self.committed_state_write_lock.blob_store;
15851594
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
15861595
let old_row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
15871596
return Ok((cols_to_gen, old_row_ref, update_flags));
@@ -1638,7 +1647,6 @@ impl MutTxId {
16381647
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
16391648
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
16401649
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
1641-
let commit_blob_store = &self.committed_state_write_lock.blob_store;
16421650
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
16431651
let old_row_ref =
16441652
unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
@@ -1665,8 +1673,42 @@ impl MutTxId {
16651673
// SAFETY: `self.is_row_present(tx_row_ptr)` and `self.is_row_present(old_ptr)` both hold
16661674
// as we've deleted neither.
16671675
// In particular, the `write_gen_val_to_col` call does not remove the row.
1668-
unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }
1669-
.map_err(IndexError::UniqueConstraintViolation)?
1676+
let tx_row_ptr =
1677+
unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }
1678+
.map_err(IndexError::UniqueConstraintViolation)?;
1679+
1680+
if let Some(old_commit_del_ptr) = old_commit_del_ptr {
1681+
let commit_table =
1682+
commit_table.expect("previously found a row in `commit_table`, so there should be one");
1683+
// If we have an identical deleted row in the committed state,
1684+
// we need to undeleted it, just like in `Self::insert`.
1685+
// The same note (`insert_undelete`) there re. MVCC applies here as well.
1686+
//
1687+
// SAFETY:
1688+
// 1. `tx_table` is derived from `commit_table` so they have the same layouts.
1689+
// 2. `old_commit_del_ptr` was found in an index of `commit_table`.
1690+
// 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid.
1691+
if unsafe { Table::eq_row_in_page(commit_table, old_commit_del_ptr, tx_table, tx_row_ptr) }
1692+
{
1693+
// It is important that we `confirm_update` first,
1694+
// as we must ensure that undeleting the row causes no tx state conflict.
1695+
tx_table
1696+
.delete(tx_blob_store, tx_row_ptr, |_| ())
1697+
.expect("Failed to delete a row we just inserted");
1698+
1699+
// Undelete.
1700+
del_table.remove(old_commit_del_ptr);
1701+
1702+
// Return the undeleted committed state row.
1703+
// SAFETY: `commit_table.is_row_present(old_commit_del_ptr)` holds.
1704+
let old_row_ref = unsafe {
1705+
commit_table.get_row_ref_unchecked(commit_blob_store, old_commit_del_ptr)
1706+
};
1707+
return Ok((cols_to_gen, old_row_ref, update_flags));
1708+
}
1709+
}
1710+
1711+
tx_row_ptr
16701712
}
16711713
_ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", old_ptr),
16721714
}

0 commit comments

Comments
 (0)