Skip to content

Commit a3ecb73

Browse files
committed
datastore: apply schema changes immediately to committed state.
1 parent b1002f7 commit a3ecb73

File tree

14 files changed

+1154
-1148
lines changed

14 files changed

+1154
-1148
lines changed

crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs

+106-85
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,10 @@ use super::{
22
datastore::Result,
33
delete_table::DeleteTable,
44
sequence::{Sequence, SequencesState},
5-
state_view::{IterByColRangeTx, StateView},
6-
tx_state::{IndexIdMap, RemovedIndexIdSet, TxState},
5+
state_view::{IterByColRangeTx, IterTx, ScanIterByColRangeTx, StateView},
6+
tx_state::{IndexIdMap, PendingSchemaChange, TxState},
77
IterByColEqTx,
88
};
9-
use crate::{
10-
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
11-
error::IndexError,
12-
};
139
use crate::{
1410
db::{
1511
datastore::{
@@ -25,7 +21,7 @@ use crate::{
2521
},
2622
db_metrics::DB_METRICS,
2723
},
28-
error::TableError,
24+
error::{IndexError, TableError},
2925
execution_context::ExecutionContext,
3026
};
3127
use anyhow::anyhow;
@@ -116,14 +112,14 @@ impl StateView for CommittedState {
116112
cols: ColList,
117113
range: R,
118114
) -> Result<Self::IterByColRange<'_, R>> {
119-
// TODO: Why does this unconditionally return a `Scan` iter,
120-
// instead of trying to return a `CommittedIndex` iter?
121-
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
122-
Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
123-
self.iter(table_id)?,
124-
cols,
125-
range,
126-
)))
115+
match self.index_seek(table_id, &cols, &range) {
116+
Some(iter) => Ok(IterByColRangeTx::Index(iter)),
117+
None => Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
118+
self.iter(table_id)?,
119+
cols,
120+
range,
121+
))),
122+
}
127123
}
128124

129125
fn iter_by_col_eq<'a, 'r>(
@@ -377,7 +373,7 @@ impl CommittedState {
377373
seq.value = seq.allocated() + 1;
378374
}
379375

380-
sequence_state.insert(seq.id(), seq);
376+
sequence_state.insert(seq);
381377
}
382378
Ok(())
383379
}
@@ -403,7 +399,7 @@ impl CommittedState {
403399
for index_row in rows {
404400
let index_id = index_row.index_id;
405401
let table_id = index_row.table_id;
406-
let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else {
402+
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
407403
panic!("Cannot create index for table which doesn't exist in committed state");
408404
};
409405
let algo: IndexAlgorithm = index_row.index_algorithm.into();
@@ -413,7 +409,7 @@ impl CommittedState {
413409
let index = table.new_index(&algo, is_unique)?;
414410
// SAFETY: `index` was derived from `table`.
415411
unsafe { table.insert_index(blob_store, index_id, index) };
416-
self.index_id_map.insert(index_id, table_id);
412+
index_id_map.insert(index_id, table_id);
417413
}
418414
Ok(())
419415
}
@@ -538,9 +534,6 @@ impl CommittedState {
538534
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
539535
let mut tx_data = TxData::default();
540536

541-
// First, merge index id fast-lookup map changes and delete indices.
542-
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());
543-
544537
// First, apply deletes. This will free up space in the committed tables.
545538
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);
546539

@@ -560,7 +553,7 @@ impl CommittedState {
560553

561554
fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
562555
for (table_id, row_ptrs) in delete_tables {
563-
if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) {
556+
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
564557
let mut deletes = Vec::with_capacity(row_ptrs.len());
565558

566559
// Note: we maintain the invariant that the delete_tables
@@ -622,19 +615,7 @@ impl CommittedState {
622615
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
623616
}
624617

625-
let (schema, indexes, pages) = tx_table.consume_for_merge();
626-
627-
// Add all newly created indexes to the committed state.
628-
for (index_id, mut index) in indexes {
629-
if !commit_table.indexes.contains_key(&index_id) {
630-
index.clear();
631-
// SAFETY: `tx_table` is derived from `commit_table`,
632-
// so they have the same row type.
633-
// This entails that all indices in `tx_table`
634-
// were constructed with the same row type/layout as `commit_table`.
635-
unsafe { commit_table.insert_index(commit_blob_store, index_id, index) };
636-
}
637-
}
618+
let (schema, _indexes, pages) = tx_table.consume_for_merge();
638619

639620
// The schema may have been modified in the transaction.
640621
// Update this last to placate borrowck and avoid a clone.
@@ -646,43 +627,104 @@ impl CommittedState {
646627
}
647628
}
648629

649-
fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
650-
// Remove indices that tx-state removed.
651-
// It's not necessarily the case that the index already existed in the committed state.
652-
for (index_id, table_id) in index_id_map_removals
653-
.into_iter()
654-
.flatten()
655-
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
656-
{
657-
assert!(self
658-
.tables
659-
.get_mut(&table_id)
660-
.expect("table to delete index from should exist")
661-
.delete_index(&self.blob_store, index_id));
630+
/// Rolls back the changes immediately made to the committed state during a transaction.
631+
pub(super) fn rollback(&mut self, seq_state: &mut SequencesState, tx_state: TxState) {
632+
// Roll back the changes in the reverse order in which they were made
633+
// so that e.g., the last change is undone first.
634+
for change in tx_state.pending_schema_changes.into_iter().rev() {
635+
self.rollback_pending_schema_change(seq_state, change);
636+
}
637+
}
638+
639+
fn rollback_pending_schema_change(
640+
&mut self,
641+
seq_state: &mut SequencesState,
642+
change: PendingSchemaChange,
643+
) -> Option<()> {
644+
use PendingSchemaChange::*;
645+
match change {
646+
// An index was removed. Add it back.
647+
IndexRemoved(table_id, index_id, table_index, index_schema) => {
648+
let table = self.tables.get_mut(&table_id)?;
649+
// SAFETY: `table_index` was derived from `table`.
650+
unsafe { table.add_index(index_id, table_index) };
651+
table.with_mut_schema(|s| s.update_index(index_schema));
652+
self.index_id_map.insert(index_id, table_id);
653+
}
654+
// An index was added. Remove it.
655+
IndexAdded(table_id, index_id, pointer_map) => {
656+
let table = self.tables.get_mut(&table_id)?;
657+
table.delete_index(&self.blob_store, index_id, pointer_map);
658+
table.with_mut_schema(|s| s.remove_index(index_id));
659+
self.index_id_map.remove(&index_id);
660+
}
661+
// A table was removed. Add it back.
662+
TableRemoved(table_id, table) => {
663+
// We don't need to deal with sub-components.
664+
// That is, we don't need to add back indices and such.
665+
// Instead, there will be separate pending schema changes like `IndexRemoved`.
666+
self.tables.insert(table_id, table);
667+
}
668+
// A table was added. Remove it.
669+
TableAdded(table_id) => {
670+
// We don't need to deal with sub-components.
671+
// That is, we don't need to remove indices and such.
672+
// Instead, there will be separate pending schema changes like `IndexAdded`.
673+
self.tables.remove(&table_id);
674+
}
675+
// A table's access was changed. Change back to the old one.
676+
TableAlterAccess(table_id, access) => {
677+
let table = self.tables.get_mut(&table_id)?;
678+
table.with_mut_schema(|s| s.table_access = access);
679+
}
680+
// A constraint was removed. Add it back.
681+
ConstraintRemoved(table_id, constraint_schema) => {
682+
let table = self.tables.get_mut(&table_id)?;
683+
table.with_mut_schema(|s| s.update_constraint(constraint_schema));
684+
}
685+
// A constraint was added. Remove it.
686+
ConstraintAdded(table_id, constraint_id) => {
687+
let table = self.tables.get_mut(&table_id)?;
688+
table.with_mut_schema(|s| s.remove_constraint(constraint_id));
689+
}
690+
// A sequence was removed. Add it back.
691+
SequenceRemoved(table_id, seq, schema) => {
692+
let table = self.tables.get_mut(&table_id)?;
693+
table.with_mut_schema(|s| s.update_sequence(schema));
694+
seq_state.insert(seq);
695+
}
696+
// A sequence was added. Remove it.
697+
SequenceAdded(table_id, sequence_id) => {
698+
let table = self.tables.get_mut(&table_id)?;
699+
table.with_mut_schema(|s| s.remove_sequence(sequence_id));
700+
seq_state.remove(sequence_id);
701+
}
662702
}
663703

664-
// Add the ones tx-state added.
665-
self.index_id_map.extend(index_id_map);
704+
Some(())
666705
}
667706

668707
pub(super) fn get_table(&self, table_id: TableId) -> Option<&Table> {
669708
self.tables.get(&table_id)
670709
}
671710

672-
pub(super) fn get_table_mut(&mut self, table_id: TableId) -> (Option<&mut Table>, &PagePool) {
673-
(self.tables.get_mut(&table_id), &self.page_pool)
674-
}
675-
676-
pub fn get_table_and_blob_store_immutable(&self, table_id: TableId) -> Option<(&Table, &dyn BlobStore)> {
677-
self.tables
678-
.get(&table_id)
679-
.map(|tbl| (tbl, &self.blob_store as &dyn BlobStore))
711+
#[allow(clippy::unnecessary_lazy_evaluations)]
712+
pub fn get_table_and_blob_store(&self, table_id: TableId) -> Result<CommitTableForInsertion<'_>> {
713+
let table = self
714+
.get_table(table_id)
715+
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
716+
Ok((table, &self.blob_store as &dyn BlobStore, &self.index_id_map))
680717
}
681718

682-
pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> {
683-
self.tables
684-
.get_mut(&table_id)
685-
.map(|tbl| (tbl, &mut self.blob_store as &mut dyn BlobStore))
719+
pub(super) fn get_table_and_blob_store_mut(
720+
&mut self,
721+
table_id: TableId,
722+
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
723+
(
724+
self.tables.get_mut(&table_id),
725+
&mut self.blob_store as &mut dyn BlobStore,
726+
&mut self.index_id_map,
727+
)
686728
}
687729

688730
fn make_table(schema: Arc<TableSchema>) -> Table {
@@ -703,7 +745,7 @@ impl CommittedState {
703745
.entry(table_id)
704746
.or_insert_with(|| Self::make_table(schema.clone()));
705747
let blob_store = &mut self.blob_store;
706-
let pool = &mut self.page_pool;
748+
let pool = &self.page_pool;
707749
(table, blob_store, pool)
708750
}
709751

@@ -741,25 +783,4 @@ impl CommittedState {
741783
}
742784
}
743785

744-
pub struct CommittedIndexIterWithDeletedMutTx<'a> {
745-
committed_rows: IndexScanRangeIter<'a>,
746-
del_table: &'a DeleteTable,
747-
}
748-
749-
impl<'a> CommittedIndexIterWithDeletedMutTx<'a> {
750-
pub(super) fn new(committed_rows: IndexScanRangeIter<'a>, del_table: &'a DeleteTable) -> Self {
751-
Self {
752-
committed_rows,
753-
del_table,
754-
}
755-
}
756-
}
757-
758-
impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> {
759-
type Item = RowRef<'a>;
760-
761-
fn next(&mut self) -> Option<Self::Item> {
762-
self.committed_rows
763-
.find(|row_ref| !self.del_table.contains(row_ref.pointer()))
764-
}
765-
}
786+
pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap);

crates/core/src/db/datastore/locking_tx_datastore/datastore.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,8 @@ impl MutTxDatastore for Locking {
612612
index_id: IndexId,
613613
row: &[u8],
614614
) -> Result<(ColList, RowRef<'a>, UpdateFlags)> {
615-
tx.update(table_id, index_id, row)
615+
let (gens, row_ref, update_flags) = tx.update(table_id, index_id, row)?;
616+
Ok((gens, row_ref.collapse(), update_flags))
616617
}
617618

618619
fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result<Option<Metadata>> {

0 commit comments

Comments
 (0)