Skip to content

Commit b3009e0

Browse files
authored
Merge 'bindings/rust: add old multithreaded bug reproducers as mvcc regression tests' from Jussi Saurio
tests copied from closed issues: - lost updates test (#5421 ) - snapshot isolation violation test (#5420 ) + an ignored test from an open issue that still reproduces a hang. (#5423 ) Reviewed-by: Mikaël Francoeur (@LeMikaelF) Closes #5484
2 parents 191ec98 + 418b0e6 commit b3009e0

File tree

1 file changed

+255
-0
lines changed

1 file changed

+255
-0
lines changed

bindings/rust/tests/integration_tests.rs

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,3 +1463,258 @@ async fn test_check_on_conflict_replace() {
14631463
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
14641464
assert_eq!(ids, vec![1]);
14651465
}
1466+
1467+
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
1468+
use std::sync::Arc;
1469+
use std::time::Duration;
1470+
use tempfile::tempdir;
1471+
use tokio::sync::Barrier;
1472+
1473+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1474+
async fn test_lost_updates() {
1475+
let (db, _dir) = setup_mvcc_db(
1476+
"CREATE TABLE counter(id INTEGER PRIMARY KEY, val INTEGER);
1477+
INSERT INTO counter VALUES(1, 0);",
1478+
)
1479+
.await;
1480+
1481+
let num_workers: usize = 16;
1482+
let rounds: i64 = 100;
1483+
let total_committed = Arc::new(AtomicI64::new(0));
1484+
1485+
for _round in 0..rounds {
1486+
let barrier = Arc::new(Barrier::new(num_workers));
1487+
let mut handles = Vec::new();
1488+
1489+
for _ in 0..num_workers {
1490+
let conn = db.connect().unwrap();
1491+
let barrier = barrier.clone();
1492+
let total_committed = total_committed.clone();
1493+
handles.push(tokio::spawn(async move {
1494+
barrier.wait().await;
1495+
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
1496+
return;
1497+
}
1498+
if conn
1499+
.execute("UPDATE counter SET val = val + 1 WHERE id = 1", ())
1500+
.await
1501+
.is_err()
1502+
{
1503+
let _ = conn.execute("ROLLBACK", ()).await;
1504+
return;
1505+
}
1506+
match conn.execute("COMMIT", ()).await {
1507+
Ok(_) => {
1508+
total_committed.fetch_add(1, Ordering::Relaxed);
1509+
}
1510+
Err(_) => {
1511+
let _ = conn.execute("ROLLBACK", ()).await;
1512+
}
1513+
}
1514+
}));
1515+
}
1516+
for handle in handles {
1517+
handle.await.unwrap();
1518+
}
1519+
}
1520+
1521+
let conn = db.connect().unwrap();
1522+
let val = query_i64(&conn, "SELECT val FROM counter WHERE id = 1").await;
1523+
let committed = total_committed.load(Ordering::Relaxed);
1524+
assert_eq!(
1525+
val, committed,
1526+
"Lost updates! counter={val} but {committed} transactions committed successfully"
1527+
);
1528+
}
1529+
1530+
/// Helper: create MVCC-enabled file-backed database with given schema
1531+
async fn setup_mvcc_db(schema: &str) -> (turso::Database, tempfile::TempDir) {
1532+
setup_mvcc_db_with_options(schema, false).await
1533+
}
1534+
1535+
/// Helper: create MVCC-enabled file-backed database with options
1536+
async fn setup_mvcc_db_with_options(
1537+
schema: &str,
1538+
triggers: bool,
1539+
) -> (turso::Database, tempfile::TempDir) {
1540+
let dir = tempdir().unwrap();
1541+
let db_path = dir.path().join("test.db");
1542+
let mut builder = Builder::new_local(db_path.to_str().unwrap());
1543+
if triggers {
1544+
builder = builder.experimental_triggers(true);
1545+
}
1546+
let db = builder.build().await.unwrap();
1547+
let conn = db.connect().unwrap();
1548+
// PRAGMA journal_mode returns a row, so use query() to consume it
1549+
let mut rows = conn
1550+
.query("PRAGMA journal_mode = 'experimental_mvcc'", ())
1551+
.await
1552+
.unwrap();
1553+
while let Ok(Some(_)) = rows.next().await {}
1554+
drop(rows);
1555+
if !schema.is_empty() {
1556+
conn.execute_batch(schema).await.unwrap();
1557+
}
1558+
(db, dir)
1559+
}
1560+
1561+
/// Helper: query a single i64 value
1562+
async fn query_i64(conn: &turso::Connection, sql: &str) -> i64 {
1563+
let mut rows = conn.query(sql, ()).await.unwrap();
1564+
let row = rows.next().await.unwrap().unwrap();
1565+
row.get::<i64>(0).unwrap()
1566+
}
1567+
1568+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1569+
#[ignore = "FIXME: This test hangs on main"]
1570+
async fn test_deadlock_join_during_writes() {
1571+
let (db, _dir) = setup_mvcc_db(
1572+
"CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER);
1573+
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT);
1574+
INSERT INTO customers VALUES(1, 'alice');
1575+
INSERT INTO customers VALUES(2, 'bob');
1576+
INSERT INTO customers VALUES(3, 'charlie');",
1577+
)
1578+
.await;
1579+
1580+
let done = Arc::new(AtomicBool::new(false));
1581+
let mut handles = vec![];
1582+
1583+
// Writers: insert orders for various customers
1584+
for w in 0..4 {
1585+
let db = db.clone();
1586+
let done = done.clone();
1587+
handles.push(tokio::spawn(async move {
1588+
let conn = db.connect().unwrap();
1589+
let mut i = 0u64;
1590+
while !done.load(Ordering::Relaxed) {
1591+
let id = (w as u64) * 100000 + i;
1592+
let cust = (i % 3) + 1;
1593+
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
1594+
let _ = conn
1595+
.execute(
1596+
&format!("INSERT INTO orders VALUES({}, {}, {})", id, cust, 10),
1597+
(),
1598+
)
1599+
.await;
1600+
let _ = conn.execute("COMMIT", ()).await;
1601+
i += 1;
1602+
}
1603+
}));
1604+
}
1605+
1606+
// Readers: do JOINs (THIS IS WHAT TRIGGERS THE HANG)
1607+
for _ in 0..4 {
1608+
let db = db.clone();
1609+
let done = done.clone();
1610+
handles.push(tokio::spawn(async move {
1611+
let conn = db.connect().unwrap();
1612+
while !done.load(Ordering::Relaxed) {
1613+
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
1614+
let _orphans = match conn
1615+
.query(
1616+
"SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
1617+
(),
1618+
)
1619+
.await
1620+
{
1621+
Ok(mut rows) => match rows.next().await {
1622+
Ok(Some(row)) => row.get::<i64>(0).unwrap_or(0),
1623+
_ => 0,
1624+
},
1625+
Err(_) => 0,
1626+
};
1627+
let _ = conn.execute("COMMIT", ()).await;
1628+
}
1629+
}));
1630+
}
1631+
1632+
// If this test hangs here, the bug is confirmed.
1633+
tokio::time::sleep(Duration::from_secs(3)).await;
1634+
done.store(true, Ordering::Relaxed);
1635+
for handle in handles {
1636+
// This await will never return if threads are deadlocked
1637+
handle.await.unwrap();
1638+
}
1639+
}
1640+
1641+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1642+
async fn test_snapshot_isolation_violation() {
1643+
let (db, _dir) = setup_mvcc_db("CREATE TABLE t(id INTEGER PRIMARY KEY, val INTEGER)").await;
1644+
1645+
let done = Arc::new(AtomicBool::new(false));
1646+
let violation_found = Arc::new(AtomicBool::new(false));
1647+
let mut handles = Vec::new();
1648+
1649+
// 4 writers: continuously insert batches of 5 rows
1650+
for w in 0..4i64 {
1651+
let conn = db.connect().unwrap();
1652+
let done = done.clone();
1653+
handles.push(tokio::spawn(async move {
1654+
let mut i = 0i64;
1655+
while !done.load(Ordering::Relaxed) {
1656+
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
1657+
continue;
1658+
}
1659+
let mut ok = true;
1660+
for j in 0..5i64 {
1661+
let id = w * 100_000 + i * 5 + j;
1662+
if conn
1663+
.execute(&format!("INSERT INTO t VALUES({id}, {id})"), ())
1664+
.await
1665+
.is_err()
1666+
{
1667+
ok = false;
1668+
break;
1669+
}
1670+
}
1671+
if ok {
1672+
if conn.execute("COMMIT", ()).await.is_err() {
1673+
let _ = conn.execute("ROLLBACK", ()).await;
1674+
}
1675+
} else {
1676+
let _ = conn.execute("ROLLBACK", ()).await;
1677+
}
1678+
i += 1;
1679+
}
1680+
}));
1681+
}
1682+
1683+
// 4 readers: open snapshot, read COUNT(*) twice, assert they match
1684+
for _ in 0..4 {
1685+
let conn = db.connect().unwrap();
1686+
let done = done.clone();
1687+
let violation_found = violation_found.clone();
1688+
handles.push(tokio::spawn(async move {
1689+
while !done.load(Ordering::Relaxed) {
1690+
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
1691+
continue;
1692+
}
1693+
let count1 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
1694+
tokio::task::yield_now().await; // Let writers commit between reads
1695+
let count2 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
1696+
let _ = conn.execute("COMMIT", ()).await;
1697+
if count1 != count2 {
1698+
violation_found.store(true, Ordering::Relaxed);
1699+
eprintln!(
1700+
"VIOLATION: COUNT changed {} -> {} within same txn (delta={})",
1701+
count1,
1702+
count2,
1703+
count2 - count1
1704+
);
1705+
}
1706+
}
1707+
}));
1708+
}
1709+
1710+
tokio::time::sleep(Duration::from_secs(3)).await;
1711+
done.store(true, Ordering::Relaxed);
1712+
for handle in handles {
1713+
let _ = handle.await;
1714+
}
1715+
1716+
assert!(
1717+
!violation_found.load(Ordering::Relaxed),
1718+
"Snapshot isolation violated: COUNT(*) changed within a single BEGIN CONCURRENT txn"
1719+
);
1720+
}

0 commit comments

Comments
 (0)