@@ -2,14 +2,10 @@ use super::{
2
2
datastore:: Result ,
3
3
delete_table:: DeleteTable ,
4
4
sequence:: { Sequence , SequencesState } ,
5
- state_view:: { IterByColRangeTx , StateView } ,
6
- tx_state:: { IndexIdMap , RemovedIndexIdSet , TxState } ,
5
+ state_view:: { IterByColRangeTx , IterTx , ScanIterByColRangeTx , StateView } ,
6
+ tx_state:: { IndexIdMap , PendingSchemaChange , TxState } ,
7
7
IterByColEqTx ,
8
8
} ;
9
- use crate :: {
10
- db:: datastore:: locking_tx_datastore:: state_view:: { IterTx , ScanIterByColRangeTx } ,
11
- error:: IndexError ,
12
- } ;
13
9
use crate :: {
14
10
db:: {
15
11
datastore:: {
@@ -25,7 +21,7 @@ use crate::{
25
21
} ,
26
22
db_metrics:: DB_METRICS ,
27
23
} ,
28
- error:: TableError ,
24
+ error:: { IndexError , TableError } ,
29
25
execution_context:: ExecutionContext ,
30
26
} ;
31
27
use anyhow:: anyhow;
@@ -116,14 +112,14 @@ impl StateView for CommittedState {
116
112
cols : ColList ,
117
113
range : R ,
118
114
) -> Result < Self :: IterByColRange < ' _ , R > > {
119
- // TODO: Why does this unconditionally return a `Scan` iter,
120
- // instead of trying to return a `CommittedIndex` iter?
121
- // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
122
- Ok ( IterByColRangeTx :: Scan ( ScanIterByColRangeTx :: new (
123
- self . iter ( table_id ) ? ,
124
- cols ,
125
- range ,
126
- ) ) )
115
+ match self . index_seek ( table_id , & cols , & range ) {
116
+ Some ( iter ) => Ok ( IterByColRangeTx :: Index ( iter) ) ,
117
+ None => Ok ( IterByColRangeTx :: Scan ( ScanIterByColRangeTx :: new (
118
+ self . iter ( table_id ) ? ,
119
+ cols ,
120
+ range ,
121
+ ) ) ) ,
122
+ }
127
123
}
128
124
129
125
fn iter_by_col_eq < ' a , ' r > (
@@ -377,7 +373,7 @@ impl CommittedState {
377
373
seq. value = seq. allocated ( ) + 1 ;
378
374
}
379
375
380
- sequence_state. insert ( seq. id ( ) , seq ) ;
376
+ sequence_state. insert ( seq) ;
381
377
}
382
378
Ok ( ( ) )
383
379
}
@@ -403,7 +399,7 @@ impl CommittedState {
403
399
for index_row in rows {
404
400
let index_id = index_row. index_id ;
405
401
let table_id = index_row. table_id ;
406
- let Some ( ( table, blob_store) ) = self . get_table_and_blob_store ( table_id) else {
402
+ let ( Some ( table) , blob_store, index_id_map ) = self . get_table_and_blob_store_mut ( table_id) else {
407
403
panic ! ( "Cannot create index for table which doesn't exist in committed state" ) ;
408
404
} ;
409
405
let algo: IndexAlgorithm = index_row. index_algorithm . into ( ) ;
@@ -413,7 +409,7 @@ impl CommittedState {
413
409
let index = table. new_index ( & algo, is_unique) ?;
414
410
// SAFETY: `index` was derived from `table`.
415
411
unsafe { table. insert_index ( blob_store, index_id, index) } ;
416
- self . index_id_map . insert ( index_id, table_id) ;
412
+ index_id_map. insert ( index_id, table_id) ;
417
413
}
418
414
Ok ( ( ) )
419
415
}
@@ -538,9 +534,6 @@ impl CommittedState {
538
534
pub ( super ) fn merge ( & mut self , tx_state : TxState , ctx : & ExecutionContext ) -> TxData {
539
535
let mut tx_data = TxData :: default ( ) ;
540
536
541
- // First, merge index id fast-lookup map changes and delete indices.
542
- self . merge_index_map ( tx_state. index_id_map , tx_state. index_id_map_removals . as_deref ( ) ) ;
543
-
544
537
// First, apply deletes. This will free up space in the committed tables.
545
538
self . merge_apply_deletes ( & mut tx_data, tx_state. delete_tables ) ;
546
539
@@ -560,7 +553,7 @@ impl CommittedState {
560
553
561
554
fn merge_apply_deletes ( & mut self , tx_data : & mut TxData , delete_tables : BTreeMap < TableId , DeleteTable > ) {
562
555
for ( table_id, row_ptrs) in delete_tables {
563
- if let Some ( ( table, blob_store) ) = self . get_table_and_blob_store ( table_id) {
556
+ if let ( Some ( table) , blob_store, _ ) = self . get_table_and_blob_store_mut ( table_id) {
564
557
let mut deletes = Vec :: with_capacity ( row_ptrs. len ( ) ) ;
565
558
566
559
// Note: we maintain the invariant that the delete_tables
@@ -622,19 +615,7 @@ impl CommittedState {
622
615
tx_data. set_inserts_for_table ( table_id, table_name, inserts. into ( ) ) ;
623
616
}
624
617
625
- let ( schema, indexes, pages) = tx_table. consume_for_merge ( ) ;
626
-
627
- // Add all newly created indexes to the committed state.
628
- for ( index_id, mut index) in indexes {
629
- if !commit_table. indexes . contains_key ( & index_id) {
630
- index. clear ( ) ;
631
- // SAFETY: `tx_table` is derived from `commit_table`,
632
- // so they have the same row type.
633
- // This entails that all indices in `tx_table`
634
- // were constructed with the same row type/layout as `commit_table`.
635
- unsafe { commit_table. insert_index ( commit_blob_store, index_id, index) } ;
636
- }
637
- }
618
+ let ( schema, _indexes, pages) = tx_table. consume_for_merge ( ) ;
638
619
639
620
// The schema may have been modified in the transaction.
640
621
// Update this last to placate borrowck and avoid a clone.
@@ -646,43 +627,104 @@ impl CommittedState {
646
627
}
647
628
}
648
629
649
- fn merge_index_map ( & mut self , index_id_map : IndexIdMap , index_id_map_removals : Option < & RemovedIndexIdSet > ) {
650
- // Remove indices that tx-state removed.
651
- // It's not necessarily the case that the index already existed in the committed state.
652
- for ( index_id, table_id) in index_id_map_removals
653
- . into_iter ( )
654
- . flatten ( )
655
- . filter_map ( |index_id| self . index_id_map . remove ( index_id) . map ( |x| ( * index_id, x) ) )
656
- {
657
- assert ! ( self
658
- . tables
659
- . get_mut( & table_id)
660
- . expect( "table to delete index from should exist" )
661
- . delete_index( & self . blob_store, index_id) ) ;
630
+ /// Rolls back the changes immediately made to the committed state during a transaction.
631
+ pub ( super ) fn rollback ( & mut self , seq_state : & mut SequencesState , tx_state : TxState ) {
632
+ // Roll back the changes in the reverse order in which they were made
633
+ // so that e.g., the last change is undone first.
634
+ for change in tx_state. pending_schema_changes . into_iter ( ) . rev ( ) {
635
+ self . rollback_pending_schema_change ( seq_state, change) ;
636
+ }
637
+ }
638
+
639
+ fn rollback_pending_schema_change (
640
+ & mut self ,
641
+ seq_state : & mut SequencesState ,
642
+ change : PendingSchemaChange ,
643
+ ) -> Option < ( ) > {
644
+ use PendingSchemaChange :: * ;
645
+ match change {
646
+ // An index was removed. Add it back.
647
+ IndexRemoved ( table_id, index_id, table_index, index_schema) => {
648
+ let table = self . tables . get_mut ( & table_id) ?;
649
+ // SAFETY: `table_index` was derived from `table`.
650
+ unsafe { table. add_index ( index_id, table_index) } ;
651
+ table. with_mut_schema ( |s| s. update_index ( index_schema) ) ;
652
+ self . index_id_map . insert ( index_id, table_id) ;
653
+ }
654
+ // An index was added. Remove it.
655
+ IndexAdded ( table_id, index_id, pointer_map) => {
656
+ let table = self . tables . get_mut ( & table_id) ?;
657
+ table. delete_index ( & self . blob_store , index_id, pointer_map) ;
658
+ table. with_mut_schema ( |s| s. remove_index ( index_id) ) ;
659
+ self . index_id_map . remove ( & index_id) ;
660
+ }
661
+ // A table was removed. Add it back.
662
+ TableRemoved ( table_id, table) => {
663
+ // We don't need to deal with sub-components.
664
+ // That is, we don't need to add back indices and such.
665
+ // Instead, there will be separate pending schema changes like `IndexRemoved`.
666
+ self . tables . insert ( table_id, table) ;
667
+ }
668
+ // A table was added. Remove it.
669
+ TableAdded ( table_id) => {
670
+ // We don't need to deal with sub-components.
671
+ // That is, we don't need to remove indices and such.
672
+ // Instead, there will be separate pending schema changes like `IndexAdded`.
673
+ self . tables . remove ( & table_id) ;
674
+ }
675
+ // A table's access was changed. Change back to the old one.
676
+ TableAlterAccess ( table_id, access) => {
677
+ let table = self . tables . get_mut ( & table_id) ?;
678
+ table. with_mut_schema ( |s| s. table_access = access) ;
679
+ }
680
+ // A constraint was removed. Add it back.
681
+ ConstraintRemoved ( table_id, constraint_schema) => {
682
+ let table = self . tables . get_mut ( & table_id) ?;
683
+ table. with_mut_schema ( |s| s. update_constraint ( constraint_schema) ) ;
684
+ }
685
+ // A constraint was added. Remove it.
686
+ ConstraintAdded ( table_id, constraint_id) => {
687
+ let table = self . tables . get_mut ( & table_id) ?;
688
+ table. with_mut_schema ( |s| s. remove_constraint ( constraint_id) ) ;
689
+ }
690
+ // A sequence was removed. Add it back.
691
+ SequenceRemoved ( table_id, seq, schema) => {
692
+ let table = self . tables . get_mut ( & table_id) ?;
693
+ table. with_mut_schema ( |s| s. update_sequence ( schema) ) ;
694
+ seq_state. insert ( seq) ;
695
+ }
696
+ // A sequence was added. Remove it.
697
+ SequenceAdded ( table_id, sequence_id) => {
698
+ let table = self . tables . get_mut ( & table_id) ?;
699
+ table. with_mut_schema ( |s| s. remove_sequence ( sequence_id) ) ;
700
+ seq_state. remove ( sequence_id) ;
701
+ }
662
702
}
663
703
664
- // Add the ones tx-state added.
665
- self . index_id_map . extend ( index_id_map) ;
704
+ Some ( ( ) )
666
705
}
667
706
668
707
pub ( super ) fn get_table ( & self , table_id : TableId ) -> Option < & Table > {
669
708
self . tables . get ( & table_id)
670
709
}
671
710
672
- pub ( super ) fn get_table_mut ( & mut self , table_id : TableId ) -> ( Option < & mut Table > , & PagePool ) {
673
- ( self . tables . get_mut ( & table_id) , & self . page_pool )
674
- }
675
-
676
- pub fn get_table_and_blob_store_immutable ( & self , table_id : TableId ) -> Option < ( & Table , & dyn BlobStore ) > {
677
- self . tables
678
- . get ( & table_id)
679
- . map ( |tbl| ( tbl, & self . blob_store as & dyn BlobStore ) )
711
+ #[ allow( clippy:: unnecessary_lazy_evaluations) ]
712
+ pub fn get_table_and_blob_store ( & self , table_id : TableId ) -> Result < CommitTableForInsertion < ' _ > > {
713
+ let table = self
714
+ . get_table ( table_id)
715
+ . ok_or_else ( || TableError :: IdNotFoundState ( table_id) ) ?;
716
+ Ok ( ( table, & self . blob_store as & dyn BlobStore , & self . index_id_map ) )
680
717
}
681
718
682
- pub ( super ) fn get_table_and_blob_store ( & mut self , table_id : TableId ) -> Option < ( & mut Table , & mut dyn BlobStore ) > {
683
- self . tables
684
- . get_mut ( & table_id)
685
- . map ( |tbl| ( tbl, & mut self . blob_store as & mut dyn BlobStore ) )
719
+ pub ( super ) fn get_table_and_blob_store_mut (
720
+ & mut self ,
721
+ table_id : TableId ,
722
+ ) -> ( Option < & mut Table > , & mut dyn BlobStore , & mut IndexIdMap ) {
723
+ (
724
+ self . tables . get_mut ( & table_id) ,
725
+ & mut self . blob_store as & mut dyn BlobStore ,
726
+ & mut self . index_id_map ,
727
+ )
686
728
}
687
729
688
730
fn make_table ( schema : Arc < TableSchema > ) -> Table {
@@ -703,7 +745,7 @@ impl CommittedState {
703
745
. entry ( table_id)
704
746
. or_insert_with ( || Self :: make_table ( schema. clone ( ) ) ) ;
705
747
let blob_store = & mut self . blob_store ;
706
- let pool = & mut self . page_pool ;
748
+ let pool = & self . page_pool ;
707
749
( table, blob_store, pool)
708
750
}
709
751
@@ -741,25 +783,4 @@ impl CommittedState {
741
783
}
742
784
}
743
785
744
- pub struct CommittedIndexIterWithDeletedMutTx < ' a > {
745
- committed_rows : IndexScanRangeIter < ' a > ,
746
- del_table : & ' a DeleteTable ,
747
- }
748
-
749
- impl < ' a > CommittedIndexIterWithDeletedMutTx < ' a > {
750
- pub ( super ) fn new ( committed_rows : IndexScanRangeIter < ' a > , del_table : & ' a DeleteTable ) -> Self {
751
- Self {
752
- committed_rows,
753
- del_table,
754
- }
755
- }
756
- }
757
-
758
- impl < ' a > Iterator for CommittedIndexIterWithDeletedMutTx < ' a > {
759
- type Item = RowRef < ' a > ;
760
-
761
- fn next ( & mut self ) -> Option < Self :: Item > {
762
- self . committed_rows
763
- . find ( |row_ref| !self . del_table . contains ( row_ref. pointer ( ) ) )
764
- }
765
- }
786
+ pub ( super ) type CommitTableForInsertion < ' a > = ( & ' a Table , & ' a dyn BlobStore , & ' a IndexIdMap ) ;
0 commit comments