diff --git a/Cargo.toml b/Cargo.toml index 731227a..b0af8c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libsql-client" -version = "0.31.11" +version = "0.32.0" edition = "2021" license = "Apache-2.0" description = "HTTP-based client for libSQL and sqld" @@ -19,7 +19,6 @@ spin-sdk = { version = "1.4", git = "https://github.com/fermyon/spin", tag = "v1 sqlite3-parser = { version = "0.8.0", default-features = false, features = [ "YYNOERRORRECOVERY" ] } http = { version = "0.2", optional = true } bytes = { version = "1.4.0", optional = true } -anyhow = "1.0.69" reqwest = { version = "0.11.14", optional = true, default-features = false, features = ["rustls-tls"] } hrana-client = { version = "0.3", optional = true } hrana-client-proto = { version = "0.2" } @@ -28,13 +27,13 @@ serde = "1.0.159" tracing = "0.1.37" futures = "0.3.28" fallible-iterator = "0.2.0" -libsql = { version = "0.1.6", optional = true } +libsql = { version = "0.1.11", default-features = false } [features] default = ["local_backend", "hrana_backend", "reqwest_backend", "mapping_names_to_values_in_rows"] workers_backend = ["worker", "futures-util"] reqwest_backend = ["reqwest"] -local_backend = ["libsql"] +local_backend = ["libsql/core", "libsql/replication"] spin_backend = ["spin-sdk", "http", "bytes"] hrana_backend = ["hrana-client"] separate_url_for_queries = [] diff --git a/examples/connect_from_config.rs b/examples/connect_from_config.rs index 1cee65a..b0806b4 100644 --- a/examples/connect_from_config.rs +++ b/examples/connect_from_config.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use libsql_client::Result; use libsql_client::{args, Client, ResultSet, Statement}; use rand::prelude::SliceRandom; diff --git a/examples/connect_from_env.rs b/examples/connect_from_env.rs index 4f015a2..400d6e6 100644 --- a/examples/connect_from_env.rs +++ b/examples/connect_from_env.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use libsql_client::Result; use libsql_client::{args, Client, ResultSet, Statement}; use rand::prelude::SliceRandom; diff --git a/examples/select.rs b/examples/select.rs index c75ec97..6c18527 100644 --- a/examples/select.rs +++ b/examples/select.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use libsql_client::Result; use libsql_client::{args, de, Client, Statement}; use rand::prelude::SliceRandom; @@ -70,7 +70,7 @@ async fn bump_counter(db: Client) -> Result { .rows .iter() .map(de::from_row) - .collect::, _>>()?; + .collect::>>()?; let scoreboard = result_to_string(counter_response)?; let html = format!("Scoreboard:\n{scoreboard}"); diff --git a/src/client.rs b/src/client.rs index 76f9743..37121e6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ //! [Client] is the main structure to interact with the database. -use anyhow::Result; - -use crate::{proto, BatchResult, ResultSet, Statement, SyncTransaction, Transaction}; +use crate::{ + proto, BatchResult, Error, Result, ResultSet, Statement, SyncTransaction, Transaction, +}; static TRANSACTION_IDS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); @@ -57,7 +57,7 @@ impl Client { ) -> Result { match self { #[cfg(feature = "local_backend")] - Self::Local(l) => l.raw_batch(stmts), + Self::Local(l) => l.raw_batch(stmts).await, #[cfg(any( feature = "reqwest_backend", feature = "workers_backend", @@ -111,7 +111,7 @@ impl Client { .find(|e| e.is_some()) .flatten(); if let Some(error) = step_error { - return Err(anyhow::anyhow!(error.message)); + return Err(Error::Misuse(error.message)); } let mut step_results: Vec> = batch_results .step_results @@ -120,7 +120,7 @@ impl Client { .map(|maybe_rs| { maybe_rs .map(ResultSet::from) - .ok_or_else(|| anyhow::anyhow!("Unexpected missing result set")) + .ok_or_else(|| Error::Misuse("Unexpected missing result set".into())) }) .collect(); step_results.pop(); // END is not counted in the result, it's implicitly ignored @@ -179,7 +179,7 @@ impl Client { pub async fn execute(&self, stmt: impl Into + Send) -> Result { match self { #[cfg(feature = "local_backend")] - Self::Local(l) => l.execute(stmt), + Self::Local(l) => l.execute(stmt).await, #[cfg(any( feature = "reqwest_backend", feature = "workers_backend", @@ -218,7 +218,7 @@ impl Client { ) -> Result { match self { #[cfg(feature = "local_backend")] - Self::Local(l) => l.execute_in_transaction(tx_id, stmt), + Self::Local(l) => l.execute_in_transaction(tx_id, stmt).await, #[cfg(any( feature = "reqwest_backend", feature = "workers_backend", @@ -235,7 +235,7 @@ impl Client { pub(crate) async fn commit_transaction(&self, tx_id: u64) -> Result<()> { match self { #[cfg(feature = "local_backend")] - Self::Local(l) => l.commit_transaction(tx_id), + Self::Local(l) => l.commit_transaction(tx_id).await, #[cfg(any( feature = "reqwest_backend", feature = "workers_backend", @@ -252,7 +252,7 @@ impl Client { pub(crate) async fn rollback_transaction(&self, tx_id: u64) -> Result<()> { match self { #[cfg(feature = "local_backend")] - Self::Local(l) => l.rollback_transaction(tx_id), + Self::Local(l) => l.rollback_transaction(tx_id).await, #[cfg(any( feature = "reqwest_backend", feature = "workers_backend", @@ -279,7 +279,7 @@ impl Client { /// # } /// ``` #[cfg(feature = "local_backend")] - pub fn in_memory() -> anyhow::Result { + pub fn in_memory() -> Result { Ok(Client::Local(crate::local::Client::in_memory()?)) } @@ -298,7 +298,7 @@ impl Client { /// # } /// ``` #[allow(unreachable_patterns)] - pub async fn from_config<'a>(mut config: Config) -> anyhow::Result { + pub async fn from_config<'a>(mut config: Config) -> Result { config.url = if config.url.scheme() == "libsql" { // We cannot use url::Url::set_scheme() because it prevents changing the scheme to http... // Safe to unwrap, because we know that the scheme is libsql @@ -331,7 +331,7 @@ impl Client { let inner = crate::http::InnerClient::Spin(crate::spin::HttpClient::new()); Client::Http(crate::http::Client::from_config(inner, config)?) }, - _ => anyhow::bail!("Unknown scheme: {scheme}. Make sure your backend exists and is enabled with its feature flag"), + _ => return Err(Error::Misuse(format!("Unknown scheme: {scheme}. Make sure your backend exists and is enabled with its feature flag"))), }) } @@ -352,30 +352,32 @@ impl Client { /// let db = libsql_client::Client::from_env().await.unwrap(); /// # } /// ``` - pub async fn from_env() -> anyhow::Result { + pub async fn from_env() -> Result { let url = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| { - anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your libSQL/sqld database") + Error::Misuse( + "LIBSQL_CLIENT_URL variable should point to your libSQL/sqld database".into(), + ) })?; let auth_token = std::env::var("LIBSQL_CLIENT_TOKEN").ok(); Self::from_config(Config { - url: url::Url::parse(&url)?, + url: url::Url::parse(&url).map_err(|e| Error::Misuse(e.to_string()))?, auth_token, }) .await } #[cfg(feature = "workers_backend")] - pub fn from_workers_env(env: &worker::Env) -> anyhow::Result { + pub fn from_workers_env(env: &worker::Env) -> Result { let url = env .secret("LIBSQL_CLIENT_URL") - .map_err(|e| anyhow::anyhow!("{e}"))? + .map_err(|e| Error::Misuse(e.to_string()))? .to_string(); let token = env .secret("LIBSQL_CLIENT_TOKEN") - .map_err(|e| anyhow::anyhow!("{e}"))? + .map_err(|e| Error::Misuse(e.to_string()))? .to_string(); let config = Config { - url: url::Url::parse(&url)?, + url: url::Url::parse(&url).map_err(|e| Error::Misuse(e.to_string()))?, auth_token: Some(token), }; let inner = crate::http::InnerClient::Workers(crate::workers::HttpClient::new()); @@ -398,7 +400,7 @@ impl SyncClient { /// # } /// ``` #[cfg(feature = "local_backend")] - pub fn in_memory() -> anyhow::Result { + pub fn in_memory() -> Result { Ok(Self { inner: Client::in_memory()?, }) @@ -570,7 +572,7 @@ impl Config { /// # Examples /// /// ``` - /// # async fn f() -> anyhow::Result<()> { + /// # async fn f() -> libsql_client::Result<()> { /// # use libsql_client::Config; /// let config = Config::new("file:////tmp/example.db")?; /// let db = libsql_client::Client::from_config(config).await.unwrap(); @@ -584,7 +586,7 @@ impl Config { Ok(Self { url: url .try_into() - .map_err(|e| anyhow::anyhow!("Failed to parse url: {}", e))?, + .map_err(|e| Error::Misuse(format!("Failed to parse url: {e}")))?, auth_token: None, }) } @@ -593,7 +595,7 @@ impl Config { /// # Examples /// /// ``` - /// # async fn f() -> anyhow::Result<()> { + /// # async fn f() -> libsql_client::Result<()> { /// # use libsql_client::Config; /// let config = Config::new("https://example.com/db")?.with_auth_token("secret"); /// let db = libsql_client::Client::from_config(config).await.unwrap(); diff --git a/src/de.rs b/src/de.rs index 32f1bdb..3bc28cb 100644 --- a/src/de.rs +++ b/src/de.rs @@ -27,7 +27,7 @@ use crate::Row; /// # Example /// /// ```no_run -/// # async fn run(db: libsql_client::Client) -> anyhow::Result<()> { +/// # async fn run(db: libsql_client::Client) -> libsql_client::Result<()> { /// use libsql_client::de; /// /// #[derive(Debug, serde::Deserialize)] @@ -49,9 +49,9 @@ use crate::Row; /// # Ok(()) /// # } /// ``` -pub fn from_row<'de, T: Deserialize<'de>>(row: &'de Row) -> anyhow::Result { +pub fn from_row<'de, T: Deserialize<'de>>(row: &'de Row) -> crate::Result { let de = De { row }; - T::deserialize(de).map_err(Into::into) + T::deserialize(de).map_err(|e| crate::Error::Misuse(e.to_string())) } struct De<'de> { diff --git a/src/hrana.rs b/src/hrana.rs index 11f3169..8fe3cd9 100644 --- a/src/hrana.rs +++ b/src/hrana.rs @@ -1,5 +1,5 @@ use crate::client::Config; -use anyhow::Result; +use crate::{Error, Result}; use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; @@ -37,7 +37,9 @@ impl Client { let token = if token.is_empty() { None } else { Some(token) }; let url = url.into(); - let (client, client_future) = hrana_client::Client::connect(&url, token.clone()).await?; + let (client, client_future) = hrana_client::Client::connect(&url, token.clone()) + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; Ok(Self { url, @@ -49,8 +51,9 @@ impl Client { } pub async fn reconnect(&mut self) -> Result<()> { - let (client, client_future) = - hrana_client::Client::connect(&self.url, self.token.clone()).await?; + let (client, client_future) = hrana_client::Client::connect(&self.url, self.token.clone()) + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; self.client = client; self.client_future = client_future; Ok(()) @@ -64,7 +67,7 @@ impl Client { /// # Examples /// /// ``` - /// # async fn f() -> anyhow::Result<()> { + /// # async fn f() -> libsql_client::Result<()> { /// # use libsql_client::hrana::Client; /// use url::Url; /// @@ -73,13 +76,11 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub async fn from_url>(url: T) -> anyhow::Result + pub async fn from_url>(url: T) -> Result where >::Error: std::fmt::Display, { - let mut url: url::Url = url - .try_into() - .map_err(|e| anyhow::anyhow!(format!("{e}")))?; + let mut url: url::Url = url.try_into().map_err(|e| Error::Misuse(format!("{e}")))?; // remove the auth token from the URL so that it doesn't get logged anywhere let token = utils::pop_query_param(&mut url, "authToken".to_string()); let url_str = if url.scheme() == "libsql" { @@ -101,8 +102,13 @@ impl Client { } pub async fn shutdown(self) -> Result<()> { - self.client.shutdown().await?; - self.client_future.await?; + self.client + .shutdown() + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + self.client_future + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; Ok(()) } @@ -119,7 +125,12 @@ impl Client { // Pessimistic path - let's drop the mutex, create the stream and try to reinsert it. // Another way out of this situation is an async mutex, but I don't want to rely on Tokio or any other specific runtime // unless absolutely necessary. - let stream = Arc::new(self.client.open_stream().await?); + let stream = Arc::new( + self.client + .open_stream() + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?, + ); tracing::trace!("Created new stream"); let mut streams = self.streams_for_transactions.write().unwrap(); if let std::collections::hash_map::Entry::Vacant(e) = streams.entry(tx_id) { @@ -148,7 +159,7 @@ impl Client { pub async fn raw_batch( &self, stmts: impl IntoIterator>, - ) -> anyhow::Result { + ) -> Result { let mut batch = hrana_client::proto::Batch::new(); for stmt in stmts.into_iter() { let stmt: Statement = stmt.into(); @@ -159,22 +170,30 @@ impl Client { batch.step(None, hrana_stmt); } - let stream = self.client.open_stream().await?; + let stream = self + .client + .open_stream() + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; stream .execute_batch(batch) .await - .map_err(|e| anyhow::anyhow!("{}", e)) + .map_err(|e| Error::Misuse(e.to_string())) } pub async fn execute(&self, stmt: impl Into) -> Result { let stmt = Self::into_hrana(stmt.into()); - let stream = self.client.open_stream().await?; + let stream = self + .client + .open_stream() + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; stream .execute(stmt) .await .map(ResultSet::from) - .map_err(|e| anyhow::anyhow!("{}", e)) + .map_err(|e| Error::Misuse(e.to_string())) } pub async fn execute_in_transaction(&self, tx_id: u64, stmt: Statement) -> Result { @@ -185,7 +204,7 @@ impl Client { .execute(stmt) .await .map(ResultSet::from) - .map_err(|e| anyhow::anyhow!("{}", e)) + .map_err(|e| Error::Misuse(e.to_string())) } pub async fn commit_transaction(&self, tx_id: u64) -> Result<()> { @@ -196,7 +215,7 @@ impl Client { .execute(Self::into_hrana(Statement::from("COMMIT"))) .await .map(|_| ()) - .map_err(|e| anyhow::anyhow!("{}", e)) + .map_err(|e| Error::Misuse(e.to_string())) } pub async fn rollback_transaction(&self, tx_id: u64) -> Result<()> { @@ -207,6 +226,6 @@ impl Client { .execute(Self::into_hrana(Statement::from("ROLLBACK"))) .await .map(|_| ()) - .map_err(|e| anyhow::anyhow!("{}", e)) + .map_err(|e| Error::Misuse(e.to_string())) } } diff --git a/src/http.rs b/src/http.rs index f984474..37c3aa2 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,5 +1,5 @@ use crate::client::Config; -use anyhow::Result; +use crate::{Error, Result}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -78,7 +78,7 @@ impl Client { } /// Establishes a database client from a `Config` object - pub fn from_config(inner: InnerClient, config: Config) -> anyhow::Result { + pub fn from_config(inner: InnerClient, config: Config) -> Result { Ok(Self::new( inner, config.url, @@ -86,9 +86,9 @@ impl Client { )) } - pub fn from_env(inner: InnerClient) -> anyhow::Result { + pub fn from_env(inner: InnerClient) -> Result { let url = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| { - anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your sqld database") + Error::Misuse("LIBSQL_CLIENT_URL variable should point to your sqld database".into()) })?; let token = std::env::var("LIBSQL_CLIENT_TOKEN").unwrap_or_default(); @@ -108,7 +108,7 @@ impl Client { pub async fn raw_batch( &self, stmts: impl IntoIterator>, - ) -> anyhow::Result { + ) -> Result { let mut batch = crate::proto::Batch::new(); for stmt in stmts.into_iter() { batch.step(None, Self::into_hrana(stmt.into())); @@ -121,34 +121,35 @@ impl Client { pipeline::StreamRequest::Close, ], }; - let body = serde_json::to_string(&msg)?; + let body = serde_json::to_string(&msg).map_err(|e| Error::ConnectionFailed(e.to_string()))?; let mut response: pipeline::ServerMsg = self .inner .send(self.url_for_queries.clone(), self.auth.clone(), body) .await?; if response.results.is_empty() { - anyhow::bail!( + return Err(Error::Misuse(format!( "Unexpected empty response from server: {:?}", response.results - ); + ))); } if response.results.len() > 2 { // One with actual results, one closing the stream - anyhow::bail!( + return Err(Error::Misuse(format!( "Unexpected multiple responses from server: {:?}", response.results - ); + ))); } match response.results.swap_remove(0) { pipeline::Response::Ok(pipeline::StreamResponseOk { response: pipeline::StreamResponse::Batch(batch_result), }) => Ok(batch_result.result), - pipeline::Response::Ok(_) => { - anyhow::bail!("Unexpected response from server: {:?}", response.results) - } + pipeline::Response::Ok(_) => Err(Error::Misuse(format!( + "Unexpected response from server: {:?}", + response.results + ))), pipeline::Response::Error(e) => { - anyhow::bail!("Error from server: {:?}", e) + Err(Error::Misuse(format!("Error from server: {:?}", e))) } } } @@ -176,7 +177,7 @@ impl Client { pipeline::StreamExecuteReq { stmt }, )], }; - let body = serde_json::to_string(&msg)?; + let body = serde_json::to_string(&msg).map_err(|e| Error::ConnectionFailed(e.to_string()))?; let url = cookie .base_url .unwrap_or_else(|| self.url_for_queries.clone()); @@ -195,31 +196,36 @@ impl Client { }, ); } - None => anyhow::bail!("Stream closed: server returned empty baton"), + None => { + return Err(Error::ConnectionFailed( + "Stream closed: server returned empty baton".into(), + )) + } } } if response.results.is_empty() { - anyhow::bail!( + return Err(Error::ConnectionFailed(format!( "Unexpected empty response from server: {:?}", response.results - ); + ))); } if response.results.len() > 1 { - anyhow::bail!( + return Err(Error::ConnectionFailed(format!( "Unexpected multiple responses from server: {:?}", response.results - ); + ))); } match response.results.swap_remove(0) { pipeline::Response::Ok(pipeline::StreamResponseOk { response: pipeline::StreamResponse::Execute(execute_result), }) => Ok(ResultSet::from(execute_result.result)), - pipeline::Response::Ok(_) => { - anyhow::bail!("Unexpected response from server: {:?}", response.results) - } + pipeline::Response::Ok(_) => Err(Error::ConnectionFailed(format!( + "Unexpected response from server: {:?}", + response.results + ))), pipeline::Response::Error(e) => { - anyhow::bail!("Error from server: {:?}", e) + Err(Error::ConnectionFailed(format!("Error from server: {e:?}"))) } } } @@ -239,7 +245,8 @@ impl Client { let url = cookie .base_url .unwrap_or_else(|| self.url_for_queries.clone()); - let body = serde_json::to_string(&msg)?; + let body = + serde_json::to_string(&msg).map_err(|e| Error::ConnectionFailed(e.to_string()))?; self.inner.send(url, self.auth.clone(), body).await.ok(); self.cookies.write().unwrap().remove(&tx_id); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 087acb8..ee9dcf3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub mod statement; pub use statement::Statement; pub mod proto; +pub use libsql::{Error, Result}; pub use proto::{BatchResult, Col, Value}; #[cfg(feature = "mapping_names_to_values_in_rows")] @@ -45,15 +46,12 @@ impl<'a> Row { /// let text : &str = row.try_get(1).unwrap(); /// # } /// ``` - pub fn try_get>( - &'a self, - index: usize, - ) -> anyhow::Result { + pub fn try_get>(&'a self, index: usize) -> Result { let val = self .values .get(index) - .ok_or(anyhow::anyhow!("out of bound index {}", index))?; - val.try_into().map_err(|x: String| anyhow::anyhow!(x)) + .ok_or_else(|| Error::Misuse(format!("out of bound index {}", index)))?; + val.try_into().map_err(Error::Misuse) } /// Try to get a value given a column name from this row and convert it to the desired type @@ -75,15 +73,12 @@ impl<'a> Row { /// # } /// ``` #[cfg(feature = "mapping_names_to_values_in_rows")] - pub fn try_column>( - &'a self, - col: &str, - ) -> anyhow::Result { + pub fn try_column>(&'a self, col: &str) -> Result { let val = self .value_map .get(col) - .ok_or(anyhow::anyhow!("column `{}` not present", col))?; - val.try_into().map_err(|x: String| anyhow::anyhow!(x)) + .ok_or_else(|| Error::Misuse(format!("column `{}` not present", col)))?; + val.try_into().map_err(Error::Misuse) } } @@ -185,7 +180,7 @@ mod utils; /// # Example /// /// ```rust,no_run -/// # async fn f() -> anyhow::Result<()> { +/// # async fn f() -> libsql_client::Result<()> { /// # use crate::libsql_client::{Statement, args}; /// let db = libsql_client::Client::from_env().await?; /// db.execute( diff --git a/src/local.rs b/src/local.rs index 7298c5a..f04965a 100644 --- a/src/local.rs +++ b/src/local.rs @@ -1,5 +1,5 @@ use crate::{proto, proto::StmtResult, BatchResult, Col, ResultSet, Statement, Value}; -use anyhow::Result; +use crate::{Error, Result}; use sqlite3_parser::ast::{Cmd, Stmt}; use sqlite3_parser::lexer::sql::Parser; @@ -49,32 +49,39 @@ impl Client { /// /// # Arguments /// * `path` - path of the local database - pub fn new(path: impl Into) -> anyhow::Result { + pub fn new(path: impl Into) -> Result { let db = libsql::Database::open(path.into())?; let conn = db.connect()?; Ok(Self { db, conn }) } /// Establishes a new in-memory database and connects to it. - pub fn in_memory() -> anyhow::Result { + pub fn in_memory() -> Result { let db = libsql::Database::open(":memory:")?; let conn = db.connect()?; Ok(Self { db, conn }) } - pub fn from_env() -> anyhow::Result { + pub fn from_env() -> Result { let path = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| { - anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your sqld database") + Error::Misuse("LIBSQL_CLIENT_URL variable should point to your sqld database".into()) })?; let path = match path.strip_prefix("file:///") { Some(path) => path, - None => anyhow::bail!("Local URL needs to start with file:///"), + None => { + return Err(Error::Misuse( + "Local URL needs to start with file:///".into(), + )) + } }; Self::new(path) } - pub async fn sync(&self) -> anyhow::Result { - self.db.sync().await.map_err(|e| anyhow::anyhow!("{}", e)) + pub async fn sync(&self) -> Result { + self.db + .sync() + .await + .map_err(|e| Error::Misuse(e.to_string())) } /// Executes a batch of SQL statements. @@ -93,10 +100,10 @@ impl Client { /// .raw_batch(["CREATE TABLE t(id)", "INSERT INTO t VALUES (42)"]); /// # } /// ``` - pub fn raw_batch( + pub async fn raw_batch( &self, stmts: impl IntoIterator>, - ) -> anyhow::Result { + ) -> Result { let mut step_results = vec![]; let mut step_errors = vec![]; for stmt in stmts { @@ -109,7 +116,7 @@ impl Client { .map(libsql::Value::from) .collect::>() .into(); - let stmt = self.conn.prepare(sql_string)?; + let stmt = self.conn.prepare(sql_string).await?; let cols: Vec = stmt .columns() .into_iter() @@ -118,7 +125,7 @@ impl Client { }) .collect(); let mut rows = Vec::new(); - let input_rows = match stmt.query(¶ms) { + let mut input_rows = match stmt.query(¶ms).await { Ok(rows) => rows, Err(e) => { step_results.push(None); @@ -172,7 +179,7 @@ impl Client { /// /// # Arguments /// * `stmts` - SQL statements - pub fn batch( + pub async fn batch( &self, stmts: impl IntoIterator + Send> + Send, ) -> Result> { @@ -180,7 +187,7 @@ impl Client { std::iter::once(Statement::new("BEGIN")) .chain(stmts.into_iter().map(|s| s.into())) .chain(std::iter::once(Statement::new("END"))), - )?; + ).await?; let step_error: Option = batch_results .step_errors .into_iter() @@ -188,7 +195,7 @@ impl Client { .find(|e| e.is_some()) .flatten(); if let Some(error) = step_error { - return Err(anyhow::anyhow!(error.message)); + return Err(Error::Misuse(error.message)); } let mut step_results: Vec> = batch_results .step_results @@ -197,7 +204,7 @@ impl Client { .map(|maybe_rs| { maybe_rs .map(ResultSet::from) - .ok_or_else(|| anyhow::anyhow!("Unexpected missing result set")) + .ok_or_else(|| Error::Misuse("Unexpected missing result set".into())) }) .collect(); step_results.pop(); // END is not counted in the result, it's implicitly ignored @@ -207,24 +214,24 @@ impl Client { /// # Arguments /// * `stmt` - the SQL statement - pub fn execute(&self, stmt: impl Into + Send) -> Result { - let results = self.raw_batch(std::iter::once(stmt))?; + pub async fn execute(&self, stmt: impl Into + Send) -> Result { + let results = self.raw_batch(std::iter::once(stmt)).await?; match (results.step_results.first(), results.step_errors.first()) { (Some(Some(result)), Some(None)) => Ok(ResultSet::from(result.clone())), - (Some(None), Some(Some(err))) => Err(anyhow::anyhow!(err.message.clone())), + (Some(None), Some(Some(err))) => Err(Error::Misuse(err.message.clone())), _ => unreachable!(), } } - pub fn execute_in_transaction(&self, _tx_id: u64, stmt: Statement) -> Result { - self.execute(stmt) + pub async fn execute_in_transaction(&self, _tx_id: u64, stmt: Statement) -> Result { + self.execute(stmt).await } - pub fn commit_transaction(&self, _tx_id: u64) -> Result<()> { - self.execute("COMMIT").map(|_| ()) + pub async fn commit_transaction(&self, _tx_id: u64) -> Result<()> { + self.execute("COMMIT").await.map(|_| ()) } - pub fn rollback_transaction(&self, _tx_id: u64) -> Result<()> { - self.execute("ROLLBACK").map(|_| ()) + pub async fn rollback_transaction(&self, _tx_id: u64) -> Result<()> { + self.execute("ROLLBACK").await.map(|_| ()) } } diff --git a/src/reqwest.rs b/src/reqwest.rs index 988c1eb..4e9574e 100644 --- a/src/reqwest.rs +++ b/src/reqwest.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::{Error, Result}; use crate::proto::pipeline; @@ -26,14 +26,19 @@ impl HttpClient { .body(body) .header("Authorization", auth) .send() - .await?; + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; if response.status() != reqwest::StatusCode::OK { let status = response.status(); let txt = response.text().await.unwrap_or_default(); - anyhow::bail!("{status}: {txt}"); + return Err(Error::ConnectionFailed(format!("{status}: {txt}"))); } - let resp: String = response.text().await?; - let response: pipeline::ServerMsg = serde_json::from_str(&resp)?; + let resp: String = response + .text() + .await + .map_err(|e| Error::Misuse(e.to_string()))?; + let response: pipeline::ServerMsg = + serde_json::from_str(&resp).map_err(|e| Error::Misuse(e.to_string()))?; Ok(response) } } diff --git a/src/spin.rs b/src/spin.rs index e8f1f2a..62071f6 100644 --- a/src/spin.rs +++ b/src/spin.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::{Error, Result}; use crate::proto::pipeline; @@ -20,12 +20,16 @@ impl HttpClient { .uri(&url) .header("Authorization", &auth) .method("POST") - .body(Some(bytes::Bytes::copy_from_slice(body.as_bytes())))?; + .body(Some(bytes::Bytes::copy_from_slice(body.as_bytes()))) + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; - let response = spin_sdk::outbound_http::send_request(req); - let resp: String = - std::str::from_utf8(&response?.into_body().unwrap_or_default())?.to_string(); - let response: pipeline::ServerMsg = serde_json::from_str(&resp)?; + let response = spin_sdk::outbound_http::send_request(req) + .map_err(|e| Error::ConnectionFailed(e.to_string())); + let resp: String = std::str::from_utf8(&response?.into_body().unwrap_or_default()) + .map_err(|e| Error::ConnectionFailed(e.to_string()))? + .to_string(); + let response: pipeline::ServerMsg = + serde_json::from_str(&resp).map_err(|e| Error::ConnectionFailed(e.to_string()))?; Ok(response) } } diff --git a/src/transaction.rs b/src/transaction.rs index ab348fb..f20baa1 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,7 +1,7 @@ //! `Transaction` is a structure representing an interactive transaction. +use crate::Result; use crate::{Client, ResultSet, Statement, SyncClient}; -use anyhow::Result; pub struct Transaction<'a> { pub(crate) id: u64, @@ -20,7 +20,7 @@ impl<'a> Transaction<'a> { /// # Example /// /// ```rust,no_run - /// # async fn f() -> anyhow::Result<()> { + /// # async fn f() -> libsql_client::Result<()> { /// # use crate::libsql_client::{Statement, args}; /// let mut db = libsql_client::Client::from_env().await?; /// let tx = db.transaction().await?; @@ -66,7 +66,7 @@ impl<'a> SyncTransaction<'a> { /// # Example /// /// ```rust,no_run - /// # fn f() -> anyhow::Result<()> { + /// # fn f() -> libsql_client::Result<()> { /// # use crate::libsql_client::{Statement, args}; /// let mut db = libsql_client::SyncClient::from_env()?; /// let tx = db.transaction()?; diff --git a/src/workers.rs b/src/workers.rs index 514424e..099196c 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::{Error, Result}; use worker::*; use crate::proto::pipeline; @@ -27,18 +27,25 @@ impl HttpClient { method: Method::Post, redirect: RequestRedirect::Follow, }; - let req = - Request::new_with_init(&url, &request_init).map_err(|e| anyhow::anyhow!("{e}"))?; + let req = Request::new_with_init(&url, &request_init) + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; let mut response = Fetch::Request(req) .send() .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; if response.status_code() != 200 { - anyhow::bail!("Status {}", response.status_code()); + return Err(Error::ConnectionFailed(format!( + "Status {}", + response.status_code() + ))); } - let resp: String = response.text().await.map_err(|e| anyhow::anyhow!("{e}"))?; - let response: pipeline::ServerMsg = serde_json::from_str(&resp)?; + let resp: String = response + .text() + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + let response: pipeline::ServerMsg = + serde_json::from_str(&resp).map_err(|e| Error::ConnectionFailed(e.to_string()))?; Ok(response) } }