Skip to content

Commit 5f50c5c

Browse files
committed
add sequence_watermark(name) function and track sequence allocations in mvcc mode
1 parent 8231c6d commit 5f50c5c

9 files changed

Lines changed: 396 additions & 4 deletions

File tree

core/connection.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3097,6 +3097,10 @@ impl Connection {
30973097
continue;
30983098
}
30993099
let seq = self.read_sequence_descriptor_via_sql(&backing_table_name, &seq_name)?;
3100+
let watermark = self.read_sequence_watermark_via_sql(&backing_table_name, &seq)?;
3101+
if let Some(mv_store) = self.db.get_mv_store().as_ref() {
3102+
mv_store.set_sequence_watermark(&normalized, watermark);
3103+
}
31003104
self.with_database_schema_mut(MAIN_DB_ID, |schema| {
31013105
schema.sequences.insert(normalized.clone(), Arc::new(seq));
31023106
});
@@ -3162,6 +3166,53 @@ impl Connection {
31623166
})
31633167
}
31643168

3169+
/// Read the current exclusive sequence watermark from the backing table.
3170+
///
3171+
/// The returned value is the upper bound used by `sequence_watermark()`:
3172+
/// callers can consume ascending sequence rows with values lower than it.
3173+
fn read_sequence_watermark_via_sql(
3174+
self: &Arc<Connection>,
3175+
backing_table_name: &str,
3176+
seq: &crate::schema::Sequence,
3177+
) -> Result<i64> {
3178+
let escaped = backing_table_name.replace('"', "\"\"");
3179+
let direction = if seq.increment_by >= 0 { "DESC" } else { "ASC" };
3180+
let sql = format!(
3181+
"SELECT value, is_called FROM \"{escaped}\" ORDER BY value {direction} LIMIT 1"
3182+
);
3183+
let mut stmt = self.prepare_internal(sql).map_err(|err| {
3184+
LimboError::Corrupt(format!(
3185+
"internal sequence backing table \"{backing_table_name}\" for sequence \
3186+
\"{}\": cannot prepare watermark SELECT: {err}",
3187+
seq.name
3188+
))
3189+
})?;
3190+
let mut row: Option<(i64, bool)> = None;
3191+
stmt.run_with_row_callback(|r| {
3192+
let value = r.get::<i64>(0)?;
3193+
let is_called = r.get::<i64>(1)? != 0;
3194+
row = Some((value, is_called));
3195+
Ok(())
3196+
})
3197+
.map_err(|err| {
3198+
LimboError::Corrupt(format!(
3199+
"internal sequence backing table \"{backing_table_name}\" for sequence \
3200+
\"{}\": watermark row read failed: {err}",
3201+
seq.name
3202+
))
3203+
})?;
3204+
let (value, is_called) = row.ok_or_else(|| {
3205+
LimboError::Corrupt(format!(
3206+
"internal sequence backing table \"{backing_table_name}\" for sequence \
3207+
\"{}\" is empty; cannot derive sequence watermark",
3208+
seq.name
3209+
))
3210+
})?;
3211+
Ok(crate::mvcc::database::first_unsafe_sequence_watermark(
3212+
seq, value, is_called,
3213+
))
3214+
}
3215+
31653216
/// Sync AUTOINCREMENT backing-table watermarks from `sqlite_sequence`.
31663217
///
31673218
/// Covers the WAL→MVCC mode-switch compatibility path: a WAL-mode
@@ -3258,6 +3309,13 @@ impl Connection {
32583309
);
32593310
let mut insert_stmt = self.prepare_internal(insert_sql)?;
32603311
insert_stmt.run_with_row_callback(|_| Ok(()))?;
3312+
if let Some(mv_store) = self.db.get_mv_store().as_ref() {
3313+
let first_unsafe = watermark.checked_add(1).unwrap_or(watermark);
3314+
mv_store.set_sequence_watermark(
3315+
&autoincrement_sequence_name(&table_name),
3316+
first_unsafe,
3317+
);
3318+
}
32613319
}
32623320

32633321
Ok(())

core/function.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,7 @@ pub enum ScalarFunc {
784784
StatGet,
785785
ConnTxnId,
786786
IsAutocommit,
787+
SequenceWatermark,
787788
// Test type functions (for custom type system testing)
788789
TestUintEncode,
789790
TestUintDecode,
@@ -910,6 +911,7 @@ impl Deterministic for ScalarFunc {
910911
ScalarFunc::StatGet => false, // internal ANALYZE function
911912
ScalarFunc::ConnTxnId => false, // depends on connection state
912913
ScalarFunc::IsAutocommit => false, // depends on connection state
914+
ScalarFunc::SequenceWatermark => false, // depends on active MVCC transactions
913915
ScalarFunc::TestUintEncode
914916
| ScalarFunc::TestUintDecode
915917
| ScalarFunc::TestUintAdd
@@ -1049,6 +1051,7 @@ impl Display for ScalarFunc {
10491051
Self::StatGet => "stat_get",
10501052
Self::ConnTxnId => "conn_txn_id",
10511053
Self::IsAutocommit => "is_autocommit",
1054+
Self::SequenceWatermark => "sequence_watermark",
10521055
Self::TestUintEncode => "test_uint_encode",
10531056
Self::TestUintDecode => "test_uint_decode",
10541057
Self::TestUintAdd => "test_uint_add",
@@ -1152,7 +1155,8 @@ impl ScalarFunc {
11521155
| Self::Upper
11531156
| Self::ZeroBlob
11541157
| Self::Likely
1155-
| Self::Unlikely => &[1],
1158+
| Self::Unlikely
1159+
| Self::SequenceWatermark => &[1],
11561160
// 2-arg
11571161
Self::Glob
11581162
| Self::Instr
@@ -1711,6 +1715,7 @@ impl Func {
17111715
"bin_record_json_object" => Ok(Some(Self::Scalar(ScalarFunc::BinRecordJsonObject))),
17121716
"conn_txn_id" => Ok(Some(Self::Scalar(ScalarFunc::ConnTxnId))),
17131717
"is_autocommit" => Ok(Some(Self::Scalar(ScalarFunc::IsAutocommit))),
1718+
"sequence_watermark" => Ok(Some(Self::Scalar(ScalarFunc::SequenceWatermark))),
17141719
"acos" => Ok(Some(Self::Math(MathFunc::Acos))),
17151720
"acosh" => Ok(Some(Self::Math(MathFunc::Acosh))),
17161721
"asin" => Ok(Some(Self::Math(MathFunc::Asin))),

core/mvcc/database/mod.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::mvcc::cursor::{static_iterator_hack, MvccIterator};
33
#[cfg(any(test, injected_yields))]
44
use crate::mvcc::yield_hooks::{ProvidesYieldContext, YieldContext, YieldPointMarker};
55
use crate::mvcc::yield_points::{inject_transition_failure, inject_transition_yield};
6-
use crate::schema::{Schema, Table};
6+
use crate::schema::{Schema, Sequence, Table};
77
use crate::state_machine::StateMachine;
88
use crate::state_machine::StateTransition;
99
use crate::state_machine::TransitionResult;
@@ -44,7 +44,7 @@ use crossbeam_skiplist::map::Entry;
4444
use crossbeam_skiplist::SkipMap;
4545
use rustc_hash::FxHashMap as HashMap;
4646
use rustc_hash::FxHashSet as HashSet;
47-
use std::collections::BTreeSet;
47+
use std::collections::{BTreeSet, HashMap as StdHashMap};
4848
use std::fmt::Debug;
4949
use std::marker::PhantomData;
5050
use std::ops::Bound;
@@ -80,6 +80,20 @@ pub mod tests;
8080
/// Sentinel value for `MvStore::exclusive_tx` indicating no exclusive transaction is active.
8181
const NO_EXCLUSIVE_TX: u64 = 0;
8282

83+
/// Convert a sequence backing-table row into the exclusive upper bound used by
84+
/// sync scans. This is intentionally tailored to ascending non-CYCLE sequences,
85+
/// which is the shape used by AUTOINCREMENT CDC ids.
86+
pub(crate) fn first_unsafe_sequence_watermark(seq: &Sequence, value: i64, is_called: bool) -> i64 {
87+
if !is_called {
88+
return value;
89+
}
90+
if seq.increment_by > 0 && !seq.cycle {
91+
value.checked_add(seq.increment_by).unwrap_or(value)
92+
} else {
93+
value
94+
}
95+
}
96+
8397
#[cfg(not(any(test, injected_yields)))]
8498
struct YieldContext;
8599

@@ -3437,6 +3451,15 @@ pub struct MvStore<Clock: LogicalClock> {
34373451
/// deadlock.
34383452
last_global_header_ts: AtomicU64,
34393453
table_id_to_last_rowid: RwLock<HashMap<MVTableId, Arc<RowidAllocator>>>,
3454+
/// Per-sequence first value not guaranteed safe to read past based only on
3455+
/// durable/current sequence state. Active allocations can lower this.
3456+
sequence_watermarks: Mutex<HashMap<String, i64>>,
3457+
/// Per-sequence minimum allocated value for each active transaction.
3458+
///
3459+
/// This is in-memory and therefore only correct while all MVCC writers for a
3460+
/// database live in one process. Multi-process MVCC will need a shared
3461+
/// coordination mechanism before sync can rely on this watermark.
3462+
sequence_allocations: Mutex<HashMap<String, StdHashMap<TxID, i64>>>,
34403463
}
34413464

34423465
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -3512,6 +3535,8 @@ impl<Clock: LogicalClock> MvStore<Clock> {
35123535
last_committed_tx_ts: AtomicU64::new(0),
35133536
last_global_header_ts: AtomicU64::new(0),
35143537
table_id_to_last_rowid: RwLock::new(HashMap::default()),
3538+
sequence_watermarks: Mutex::new(HashMap::default()),
3539+
sequence_allocations: Mutex::new(HashMap::default()),
35153540
}
35163541
}
35173542

@@ -4744,6 +4769,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
47444769
}
47454770

47464771
pub fn remove_tx(&self, tx_id: TxID) {
4772+
self.remove_sequence_allocations(tx_id);
47474773
if let Some(entry) = self.txs.get(&tx_id) {
47484774
let tx = entry.value();
47494775
if let TransactionState::Committed(commit_ts) = tx.state.load() {
@@ -4768,6 +4794,87 @@ impl<Clock: LogicalClock> MvStore<Clock> {
47684794
self.blocking_checkpoint_lock.unlock();
47694795
}
47704796

4797+
pub fn register_sequence_allocation(
4798+
&self,
4799+
tx_id: TxID,
4800+
sequence_name: &str,
4801+
sequence_value: i64,
4802+
) -> Result<()> {
4803+
let Some(tx) = self.txs.get(&tx_id) else {
4804+
return Err(LimboError::NoSuchTransactionID(tx_id.to_string()));
4805+
};
4806+
turso_assert!(
4807+
matches!(
4808+
tx.value().state.load(),
4809+
TransactionState::Active | TransactionState::Preparing(_)
4810+
),
4811+
"sequence allocation must be registered while the transaction is active or preparing"
4812+
);
4813+
4814+
let sequence_name = crate::util::normalize_ident(sequence_name);
4815+
let mut allocations = self.sequence_allocations.lock();
4816+
let tx_allocations = allocations.entry(sequence_name).or_default();
4817+
tx_allocations
4818+
.entry(tx_id)
4819+
.and_modify(|value| *value = (*value).min(sequence_value))
4820+
.or_insert(sequence_value);
4821+
Ok(())
4822+
}
4823+
4824+
pub fn set_sequence_watermark(&self, sequence_name: &str, watermark: i64) {
4825+
let sequence_name = crate::util::normalize_ident(sequence_name);
4826+
self.sequence_watermarks
4827+
.lock()
4828+
.insert(sequence_name, watermark);
4829+
}
4830+
4831+
/// Returns the first sequence value that is not safe for cursor scans to pass.
4832+
///
4833+
/// Readers can safely consume rows with sequence values less than this
4834+
/// watermark. The value is the minimum of the current sequence boundary and
4835+
/// any lower value already allocated by an active transaction.
4836+
pub fn sequence_watermark(&self, sequence_name: &str) -> Option<i64> {
4837+
let sequence_name = crate::util::normalize_ident(sequence_name);
4838+
let mut allocations = self.sequence_allocations.lock();
4839+
let mut remove_allocations = false;
4840+
let active_watermark = {
4841+
allocations
4842+
.get_mut(&sequence_name)
4843+
.and_then(|tx_allocations| {
4844+
tx_allocations.retain(|tx_id, _| {
4845+
self.txs.get(tx_id).is_some_and(|tx| {
4846+
matches!(
4847+
tx.value().state.load(),
4848+
TransactionState::Active | TransactionState::Preparing(_)
4849+
)
4850+
})
4851+
});
4852+
let watermark = tx_allocations.values().copied().min();
4853+
if tx_allocations.is_empty() {
4854+
remove_allocations = true;
4855+
}
4856+
watermark
4857+
})
4858+
};
4859+
if remove_allocations {
4860+
allocations.remove(&sequence_name);
4861+
}
4862+
let current_watermark = self.sequence_watermarks.lock().get(&sequence_name).copied();
4863+
match (current_watermark, active_watermark) {
4864+
(Some(current), Some(active)) => Some(current.min(active)),
4865+
(Some(current), None) => Some(current),
4866+
(None, active) => active,
4867+
}
4868+
}
4869+
4870+
fn remove_sequence_allocations(&self, tx_id: TxID) {
4871+
let mut allocations = self.sequence_allocations.lock();
4872+
allocations.retain(|_, tx_allocations| {
4873+
tx_allocations.remove(&tx_id);
4874+
!tx_allocations.is_empty()
4875+
});
4876+
}
4877+
47714878
/// Atomically retire a committed tx: clear the connection's mv_tx_id cache
47724879
/// for `db_id`, then remove the tx from `txs`. Pairs the two mutations so
47734880
/// no concurrent observer (or in-flight statement) can see the divergent

core/mvcc/database/tests.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4070,6 +4070,100 @@ pub(crate) fn commit_tx_no_conn(
40704070
Ok(())
40714071
}
40724072

4073+
#[test]
4074+
fn test_sequence_watermark_tracks_lowest_active_allocation() {
4075+
let db = MvccTestDbNoConn::new_with_random_db();
4076+
let conn = db.connect();
4077+
let mv_store = db.get_mvcc_store();
4078+
let pager = conn.pager.load().clone();
4079+
4080+
mv_store.set_sequence_watermark("turso_cdc_pk_autoincrement", 13);
4081+
let tx1 = mv_store.begin_tx(pager.clone()).unwrap();
4082+
let tx2 = mv_store.begin_tx(pager.clone()).unwrap();
4083+
mv_store
4084+
.register_sequence_allocation(tx1, "turso_cdc_pk_autoincrement", 10)
4085+
.unwrap();
4086+
mv_store
4087+
.register_sequence_allocation(tx2, "turso_cdc_pk_autoincrement", 12)
4088+
.unwrap();
4089+
mv_store
4090+
.register_sequence_allocation(tx1, "turso_cdc_pk_autoincrement", 11)
4091+
.unwrap();
4092+
4093+
assert_eq!(
4094+
mv_store.sequence_watermark("turso_cdc_pk_autoincrement"),
4095+
Some(10)
4096+
);
4097+
4098+
commit_tx(mv_store.clone(), &conn, tx1).unwrap();
4099+
assert_eq!(
4100+
mv_store.sequence_watermark("turso_cdc_pk_autoincrement"),
4101+
Some(12)
4102+
);
4103+
4104+
mv_store.rollback_tx(tx2, pager, conn.as_ref(), crate::MAIN_DB_ID);
4105+
assert_eq!(
4106+
mv_store.sequence_watermark("turso_cdc_pk_autoincrement"),
4107+
Some(13)
4108+
);
4109+
}
4110+
4111+
#[test]
4112+
fn test_sequence_watermark_function_returns_current_watermark_without_active_allocations() {
4113+
let db = MvccTestDbNoConn::new_with_random_db();
4114+
let conn = db.connect();
4115+
let mv_store = db.get_mvcc_store();
4116+
let pager = conn.pager.load().clone();
4117+
4118+
conn.execute("CREATE SEQUENCE s").unwrap();
4119+
mv_store.set_sequence_watermark("s", 42);
4120+
let rows = get_rows(&conn, "SELECT sequence_watermark('s')");
4121+
4122+
assert_eq!(rows.len(), 1);
4123+
assert_eq!(rows[0][0].as_int().unwrap(), 42);
4124+
4125+
let tx_id = mv_store.begin_tx(pager.clone()).unwrap();
4126+
mv_store
4127+
.register_sequence_allocation(tx_id, "s", 10)
4128+
.unwrap();
4129+
let rows = get_rows(&conn, "SELECT sequence_watermark('s')");
4130+
4131+
assert_eq!(rows[0][0].as_int().unwrap(), 10);
4132+
4133+
mv_store.rollback_tx(tx_id, pager, conn.as_ref(), crate::MAIN_DB_ID);
4134+
let rows = get_rows(&conn, "SELECT sequence_watermark('s')");
4135+
4136+
assert_eq!(rows[0][0].as_int().unwrap(), 42);
4137+
}
4138+
4139+
#[test]
4140+
fn test_sequence_watermark_tracks_nextval_allocations() {
4141+
let db = MvccTestDbNoConn::new_with_random_db();
4142+
let setup = db.connect();
4143+
setup.execute("CREATE SEQUENCE s START WITH 1").unwrap();
4144+
4145+
let rows = get_rows(&setup, "SELECT sequence_watermark('s')");
4146+
assert!(matches!(rows[0][0], Value::Null));
4147+
4148+
let rows = get_rows(&setup, "SELECT nextval('s')");
4149+
assert_eq!(rows[0][0].as_int().unwrap(), 1);
4150+
let rows = get_rows(&setup, "SELECT sequence_watermark('s')");
4151+
assert_eq!(rows[0][0].as_int().unwrap(), 2);
4152+
4153+
let writer = db.connect();
4154+
writer.execute("BEGIN CONCURRENT").unwrap();
4155+
let rows = get_rows(&writer, "SELECT nextval('s')");
4156+
assert_eq!(rows[0][0].as_int().unwrap(), 2);
4157+
4158+
let observer = db.connect();
4159+
let rows = get_rows(&observer, "SELECT sequence_watermark('s')");
4160+
assert_eq!(rows[0][0].as_int().unwrap(), 2);
4161+
4162+
writer.execute("COMMIT").unwrap();
4163+
let rows = get_rows(&observer, "SELECT sequence_watermark('s')");
4164+
assert_eq!(rows[0][0].as_int().unwrap(), 3);
4165+
}
4166+
40734167
/// What this test checks: Cursor traversal and seek operations honor MVCC visibility and key ordering under updates/deletes.
40744168
/// Why this matters: Read-path correctness is critical: wrong cursor semantics directly surface as wrong query answers.
40754169
#[test]

core/translate/expr/translator.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1235,7 +1235,8 @@ pub fn translate_expr(
12351235
| ScalarFunc::RandomBlob
12361236
| ScalarFunc::Sign
12371237
| ScalarFunc::Soundex
1238-
| ScalarFunc::ZeroBlob => {
1238+
| ScalarFunc::ZeroBlob
1239+
| ScalarFunc::SequenceWatermark => {
12391240
let args = expect_arguments_exact!(args, 1, srf);
12401241
let start_reg = program.alloc_register();
12411242
translate_expr(

0 commit comments

Comments
 (0)