Skip to content

Commit 6b5606a

Browse files
authored
Merge pull request #653 from Horusiath/libsql-transactions
libsql: remote hrana transactions
2 parents 5316796 + dcd07bf commit 6b5606a

15 files changed

Lines changed: 940 additions & 246 deletions

File tree

libsql-server/tests/hrana/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use tempfile::tempdir;
55

66
use crate::common::net::{init_tracing, SimServer, TestServer};
77
mod batch;
8+
mod transaction;
89

910
async fn make_standalone_server() -> Result<(), Box<dyn std::error::Error>> {
1011
init_tracing();
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use crate::common::net::TurmoilConnector;
2+
use libsql::{params, Database, TransactionBehavior};
3+
4+
#[test]
5+
fn transaction_commit_and_rollback() {
6+
let mut sim = turmoil::Builder::new().build();
7+
sim.host("primary", super::make_standalone_server);
8+
sim.client("client", async {
9+
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
10+
let conn = db.connect()?;
11+
12+
// initialize tables
13+
let tx = conn.transaction().await?;
14+
tx.execute_batch(r#"create table t(x text);"#).await?;
15+
tx.commit().await?;
16+
17+
// transaction with temporary data
18+
let tx = conn.transaction().await?;
19+
tx.execute("insert into t(x) values('hello');", ()).await?;
20+
21+
let mut rows = tx
22+
.query("select * from t where x = ?", params!["hello"])
23+
.await?;
24+
25+
assert_eq!(rows.column_count(), 1);
26+
assert_eq!(rows.column_name(0), Some("x"));
27+
assert_eq!(rows.next()?.unwrap().get::<String>(0)?, "hello");
28+
assert!(rows.next()?.is_none());
29+
tx.rollback().await?;
30+
31+
// confirm that temporary that was not committed
32+
let mut rows = conn
33+
.query("select * from t where x = ?", params!["hello"])
34+
.await?;
35+
36+
assert_eq!(rows.column_count(), 1);
37+
assert_eq!(rows.column_name(0), Some("x"));
38+
assert!(rows.next()?.is_none());
39+
40+
Ok(())
41+
});
42+
43+
sim.run().unwrap();
44+
}
45+
46+
#[test]
47+
fn multiple_concurrent_transactions() {
48+
let mut sim = turmoil::Builder::new().build();
49+
sim.host("primary", super::make_standalone_server);
50+
sim.client("client", async {
51+
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
52+
let conn = db.connect()?;
53+
conn.execute_batch(r#"create table t(x text);"#).await?;
54+
55+
// open first transaction and alter data
56+
let tx1 = conn
57+
.transaction_with_behavior(TransactionBehavior::Deferred)
58+
.await?;
59+
tx1.execute("insert into t(x) values('hello');", ()).await?;
60+
61+
// while first transaction is still open open another read-only transaction and try to read
62+
let tx2 = conn
63+
.transaction_with_behavior(TransactionBehavior::ReadOnly)
64+
.await?;
65+
let mut rows = tx2
66+
.query("select * from t where x = ?", params!["hello"])
67+
.await?;
68+
assert_eq!(rows.column_count(), 1);
69+
assert_eq!(rows.column_name(0), Some("x"));
70+
assert!(rows.next()?.is_none());
71+
72+
// commit first transaction - T2 should still read old data
73+
tx1.commit().await?;
74+
75+
let mut rows = tx2
76+
.query("select * from t where x = ?", params!["hello"])
77+
.await?;
78+
assert_eq!(rows.column_count(), 1);
79+
assert_eq!(rows.column_name(0), Some("x"));
80+
assert!(rows.next()?.is_none());
81+
tx2.commit().await?;
82+
83+
// finally open new transaction - it now should read actual data
84+
let tx3 = conn
85+
.transaction_with_behavior(TransactionBehavior::ReadOnly)
86+
.await?;
87+
let mut rows = tx3
88+
.query("select * from t where x = ?", params!["hello"])
89+
.await?;
90+
assert_eq!(rows.column_count(), 1);
91+
assert_eq!(rows.column_name(0), Some("x"));
92+
assert_eq!(rows.next()?.unwrap().get::<String>(0)?, "hello");
93+
assert!(rows.next()?.is_none());
94+
95+
Ok(())
96+
});
97+
sim.run().unwrap();
98+
}

libsql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ hrana = [
8888
"serde",
8989
"dep:base64",
9090
"dep:serde_json",
91+
"dep:futures"
9192
]
9293
serde = ["dep:serde"]
9394
remote = [

libsql/src/connection.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ pub(crate) trait Conn {
1616

1717
async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result<Transaction>;
1818

19-
fn is_autocommit(&self) -> bool;
19+
async fn is_autocommit(&self) -> Result<bool>;
2020

2121
fn changes(&self) -> u64;
2222

2323
fn last_insert_rowid(&self) -> i64;
24-
25-
fn close(&mut self);
2624
}
2725

2826
#[derive(Clone)]
@@ -69,8 +67,8 @@ impl Connection {
6967
self.conn.transaction(tx_behavior).await
7068
}
7169

72-
pub fn is_autocommit(&self) -> bool {
73-
self.conn.is_autocommit()
70+
pub async fn is_autocommit(&self) -> Result<bool> {
71+
self.conn.is_autocommit().await
7472
}
7573

7674
pub fn changes(&self) -> u64 {
@@ -80,10 +78,4 @@ impl Connection {
8078
pub fn last_insert_rowid(&self) -> i64 {
8179
self.conn.last_insert_rowid()
8280
}
83-
84-
pub fn close(&mut self) {
85-
if let Some(conn) = Arc::get_mut(&mut self.conn) {
86-
conn.close()
87-
}
88-
}
8981
}

0 commit comments

Comments
 (0)