Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions bindings/rust/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,3 +1463,258 @@ async fn test_check_on_conflict_replace() {
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
assert_eq!(ids, vec![1]);
}

use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tempfile::tempdir;
use tokio::sync::Barrier;

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_lost_updates() {
let (db, _dir) = setup_mvcc_db(
"CREATE TABLE counter(id INTEGER PRIMARY KEY, val INTEGER);
INSERT INTO counter VALUES(1, 0);",
)
.await;

let num_workers: usize = 16;
let rounds: i64 = 100;
let total_committed = Arc::new(AtomicI64::new(0));

for _round in 0..rounds {
let barrier = Arc::new(Barrier::new(num_workers));
let mut handles = Vec::new();

for _ in 0..num_workers {
let conn = db.connect().unwrap();
let barrier = barrier.clone();
let total_committed = total_committed.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
return;
}
if conn
.execute("UPDATE counter SET val = val + 1 WHERE id = 1", ())
.await
.is_err()
{
let _ = conn.execute("ROLLBACK", ()).await;
return;
}
match conn.execute("COMMIT", ()).await {
Ok(_) => {
total_committed.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
let _ = conn.execute("ROLLBACK", ()).await;
}
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

let conn = db.connect().unwrap();
let val = query_i64(&conn, "SELECT val FROM counter WHERE id = 1").await;
let committed = total_committed.load(Ordering::Relaxed);
assert_eq!(
val, committed,
"Lost updates! counter={val} but {committed} transactions committed successfully"
);
}

/// Helper: create MVCC-enabled file-backed database with given schema
async fn setup_mvcc_db(schema: &str) -> (turso::Database, tempfile::TempDir) {
setup_mvcc_db_with_options(schema, false).await
}

/// Helper: create MVCC-enabled file-backed database with options
async fn setup_mvcc_db_with_options(
schema: &str,
triggers: bool,
) -> (turso::Database, tempfile::TempDir) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let mut builder = Builder::new_local(db_path.to_str().unwrap());
if triggers {
builder = builder.experimental_triggers(true);
}
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
// PRAGMA journal_mode returns a row, so use query() to consume it
let mut rows = conn
.query("PRAGMA journal_mode = 'experimental_mvcc'", ())
.await
.unwrap();
while let Ok(Some(_)) = rows.next().await {}
drop(rows);
if !schema.is_empty() {
conn.execute_batch(schema).await.unwrap();
}
(db, dir)
}

/// Helper: query a single i64 value
async fn query_i64(conn: &turso::Connection, sql: &str) -> i64 {
let mut rows = conn.query(sql, ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
row.get::<i64>(0).unwrap()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[ignore = "FIXME: This test hangs on main"]
async fn test_deadlock_join_during_writes() {
let (db, _dir) = setup_mvcc_db(
"CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER);
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT);
INSERT INTO customers VALUES(1, 'alice');
INSERT INTO customers VALUES(2, 'bob');
INSERT INTO customers VALUES(3, 'charlie');",
)
.await;

let done = Arc::new(AtomicBool::new(false));
let mut handles = vec![];

// Writers: insert orders for various customers
for w in 0..4 {
let db = db.clone();
let done = done.clone();
handles.push(tokio::spawn(async move {
let conn = db.connect().unwrap();
let mut i = 0u64;
while !done.load(Ordering::Relaxed) {
let id = (w as u64) * 100000 + i;
let cust = (i % 3) + 1;
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
let _ = conn
.execute(
&format!("INSERT INTO orders VALUES({}, {}, {})", id, cust, 10),
(),
)
.await;
let _ = conn.execute("COMMIT", ()).await;
i += 1;
}
}));
}

// Readers: do JOINs (THIS IS WHAT TRIGGERS THE HANG)
for _ in 0..4 {
let db = db.clone();
let done = done.clone();
handles.push(tokio::spawn(async move {
let conn = db.connect().unwrap();
while !done.load(Ordering::Relaxed) {
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
let _orphans = match conn
.query(
"SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
(),
)
.await
{
Ok(mut rows) => match rows.next().await {
Ok(Some(row)) => row.get::<i64>(0).unwrap_or(0),
_ => 0,
},
Err(_) => 0,
};
let _ = conn.execute("COMMIT", ()).await;
}
}));
}

// If this test hangs here, the bug is confirmed.
tokio::time::sleep(Duration::from_secs(3)).await;
done.store(true, Ordering::Relaxed);
for handle in handles {
// This await will never return if threads are deadlocked
handle.await.unwrap();
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_snapshot_isolation_violation() {
let (db, _dir) = setup_mvcc_db("CREATE TABLE t(id INTEGER PRIMARY KEY, val INTEGER)").await;

let done = Arc::new(AtomicBool::new(false));
let violation_found = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();

// 4 writers: continuously insert batches of 5 rows
for w in 0..4i64 {
let conn = db.connect().unwrap();
let done = done.clone();
handles.push(tokio::spawn(async move {
let mut i = 0i64;
while !done.load(Ordering::Relaxed) {
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
continue;
}
let mut ok = true;
for j in 0..5i64 {
let id = w * 100_000 + i * 5 + j;
if conn
.execute(&format!("INSERT INTO t VALUES({id}, {id})"), ())
.await
.is_err()
{
ok = false;
break;
}
}
if ok {
if conn.execute("COMMIT", ()).await.is_err() {
let _ = conn.execute("ROLLBACK", ()).await;
}
} else {
let _ = conn.execute("ROLLBACK", ()).await;
}
i += 1;
}
}));
}

// 4 readers: open snapshot, read COUNT(*) twice, assert they match
for _ in 0..4 {
let conn = db.connect().unwrap();
let done = done.clone();
let violation_found = violation_found.clone();
handles.push(tokio::spawn(async move {
while !done.load(Ordering::Relaxed) {
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
continue;
}
let count1 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
tokio::task::yield_now().await; // Let writers commit between reads
let count2 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
let _ = conn.execute("COMMIT", ()).await;
if count1 != count2 {
violation_found.store(true, Ordering::Relaxed);
eprintln!(
"VIOLATION: COUNT changed {} -> {} within same txn (delta={})",
count1,
count2,
count2 - count1
);
}
}
}));
}

tokio::time::sleep(Duration::from_secs(3)).await;
done.store(true, Ordering::Relaxed);
for handle in handles {
let _ = handle.await;
}

assert!(
!violation_found.load(Ordering::Relaxed),
"Snapshot isolation violated: COUNT(*) changed within a single BEGIN CONCURRENT txn"
);
}
Loading