Skip to content

switch to hyper #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ sqlite3-parser = { version = "0.8.0", default-features = false, features = [ "YY
http = { version = "0.2", optional = true }
bytes = { version = "1.4.0", optional = true }
anyhow = "1.0.69"
reqwest = { version = "0.11.14", optional = true, default-features = false, features = ["rustls-tls"] }
hrana-client = { version = "0.3", optional = true }
hrana-client-proto = { version = "0.2" }
hyper = { version = "0.14.27", optional = true, default-features = false }
hyper-rustls = { version = "0.24.1", optional = true, features = ["http2"] }
# hrana-client = { version = "0.3", optional = true }
hrana-client = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1", optional = true }
# hrana-client-proto = { version = "0.2" }
hrana-client-proto = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1" }
futures-util = { version = "0.3.21", optional = true }
serde = "1.0.159"
tracing = "0.1.37"
futures = "0.3.28"
fallible-iterator = "0.2.0"
libsql = { version = "0.1.6", optional = true }
tower = { version = "0.4.13", features = ["make"] }
tokio = { version = "1", default-features = false, optional = true }

[features]
default = ["local_backend", "hrana_backend", "reqwest_backend", "mapping_names_to_values_in_rows"]
workers_backend = ["worker", "futures-util"]
reqwest_backend = ["reqwest"]
reqwest_backend = ["hyper_backend"]
hyper_backend = ["hyper", "hyper-rustls", "tokio"]
local_backend = ["libsql"]
spin_backend = ["spin-sdk", "http", "bytes"]
hrana_backend = ["hrana-client"]
Expand Down
103 changes: 62 additions & 41 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! [Client] is the main structure to interact with the database.
use anyhow::Result;
use hyper::{client::HttpConnector, Uri};
use hyper::client::connect::Connection as HyperConnection;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{make::MakeConnection, Service};

use crate::{proto, BatchResult, ResultSet, Statement, SyncTransaction, Transaction};

Expand All @@ -9,15 +13,15 @@ static TRANSACTION_IDS: std::sync::atomic::AtomicU64 = std::sync::atomic::Atomic
/// It's a convenience struct which allows implementing connect()
/// with backends being passed as env parameters.
#[derive(Debug)]
pub enum Client {
pub enum Client<C = HttpConnector> {
#[cfg(feature = "local_backend")]
Local(crate::local::Client),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Http(crate::http::Client),
Http(crate::http::Client<C>),
#[cfg(feature = "hrana_backend")]
Hrana(crate::hrana::Client),
Default,
Expand All @@ -29,7 +33,7 @@ pub struct SyncClient {
inner: Client,
}

unsafe impl Send for Client {}
unsafe impl<C: Send> Send for Client<C> {}

impl Client {
/// Executes a batch of independent SQL statements.
Expand Down Expand Up @@ -57,7 +61,7 @@ impl Client {
) -> Result<BatchResult> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.raw_batch(stmts),
Self::Local(l) => l.raw_batch(stmts).await,
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
Expand Down Expand Up @@ -179,7 +183,7 @@ impl Client {
pub async fn execute(&self, stmt: impl Into<Statement> + Send) -> Result<ResultSet> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.execute(stmt),
Self::Local(l) => l.execute(stmt).await,
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
Expand Down Expand Up @@ -218,7 +222,7 @@ impl Client {
) -> Result<ResultSet> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.execute_in_transaction(tx_id, stmt),
Self::Local(l) => l.execute_in_transaction(tx_id, stmt).await,
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
Expand All @@ -235,7 +239,7 @@ impl Client {
pub(crate) async fn commit_transaction(&self, tx_id: u64) -> Result<()> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.commit_transaction(tx_id),
Self::Local(l) => l.commit_transaction(tx_id).await,
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
Expand All @@ -252,7 +256,7 @@ impl Client {
pub(crate) async fn rollback_transaction(&self, tx_id: u64) -> Result<()> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.rollback_transaction(tx_id),
Self::Local(l) => l.rollback_transaction(tx_id).await,
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
Expand All @@ -267,38 +271,17 @@ impl Client {
}
}

impl Client {
/// Creates an in-memory database
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let db = libsql_client::Client::in_memory().unwrap();
/// # }
/// ```
#[cfg(feature = "local_backend")]
pub fn in_memory() -> anyhow::Result<Client> {
Ok(Client::Local(crate::local::Client::in_memory()?))
}

/// Establishes a database client based on [Config] struct
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let config = Config {
/// url: url::Url::parse("file:////tmp/example.db").unwrap(),
/// auth_token: None
/// };
/// let db = libsql_client::Client::from_config(config).await.unwrap();
/// # }
/// ```
#[allow(unreachable_patterns)]
pub async fn from_config<'a>(mut config: Config) -> anyhow::Result<Client> {
impl<C> Client<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
pub async fn from_config_with_connector(mut config: Config, connector: C) -> anyhow::Result<Client<C>>
where
C: MakeConnection<Uri>,
{
config.url = if config.url.scheme() == "libsql" {
// We cannot use url::Url::set_scheme() because it prevents changing the scheme to http...
// Safe to unwrap, because we know that the scheme is libsql
Expand All @@ -318,7 +301,7 @@ impl Client {
},
#[cfg(feature = "reqwest_backend")]
"http" | "https" => {
let inner = crate::http::InnerClient::Reqwest(crate::reqwest::HttpClient::new());
let inner = crate::http::InnerClient::Reqwest(crate::hyper::HttpClient::with_connector(connector));
Client::Http(crate::http::Client::from_config(inner, config)?)
},
#[cfg(feature = "workers_backend")]
Expand All @@ -335,6 +318,44 @@ impl Client {
})
}

}

impl Client {
/// Creates an in-memory database
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let db = libsql_client::Client::in_memory().unwrap();
/// # }
/// ```
#[cfg(feature = "local_backend")]
pub fn in_memory() -> anyhow::Result<Client> {
Ok(Client::Local(crate::local::Client::in_memory()?))
}

/// Establishes a database client based on [Config] struct
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let config = Config {
/// url: url::Url::parse("file:////tmp/example.db").unwrap(),
/// auth_token: None
/// };
/// let db = libsql_client::Client::from_config(config).await.unwrap();
/// # }
/// ```
#[allow(unreachable_patterns)]
pub async fn from_config(config: Config) -> anyhow::Result<Client> {
let connector = HttpConnector::new();
Self::from_config_with_connector(config, connector).await
}

/// Establishes a database client based on environment variables
///
/// # Env
Expand Down
43 changes: 33 additions & 10 deletions src/hrana.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use crate::client::Config;
use anyhow::Result;
use hyper::Uri;
use hyper::client::HttpConnector;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tower::Service;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
Expand All @@ -8,13 +13,14 @@ use crate::{utils, BatchResult, ResultSet, Statement};

/// Database client. This is the main structure used to
/// communicate with the database.
pub struct Client {
pub struct Client<C = HttpConnector> {
url: String,
token: Option<String>,

client: hrana_client::Client,
client_future: hrana_client::ConnFut,
streams_for_transactions: RwLock<HashMap<u64, Arc<hrana_client::Stream>>>,
connector: C,
}

impl std::fmt::Debug for Client {
Expand All @@ -26,35 +32,52 @@ impl std::fmt::Debug for Client {
}
}

impl Client {
/// Creates a database client with JWT authentication.
///
/// # Arguments
/// * `url` - URL of the database endpoint
/// * `token` - auth token
pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
impl<C> Client<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: std::error::Error + Sync + Send + 'static,
{
/// Same as `new`, but uses `connector` to create connections.
pub async fn new_with_connector(url: impl Into<String>, token: impl Into<String>, connector: C) -> Result<Self>

{
let token = token.into();
let token = if token.is_empty() { None } else { Some(token) };
let url = url.into();

let (client, client_future) = hrana_client::Client::connect(&url, token.clone()).await?;
let (client, client_future) = hrana_client::Client::with_connector(&url, token.clone(), connector.clone()).await?;

Ok(Self {
url,
token,
client,
client_future,
streams_for_transactions: RwLock::new(HashMap::new()),
connector,
})
}

pub async fn reconnect(&mut self) -> Result<()> {
let (client, client_future) =
hrana_client::Client::connect(&self.url, self.token.clone()).await?;
hrana_client::Client::with_connector(&self.url, self.token.clone(), self.connector.clone()).await?;
self.client = client;
self.client_future = client_future;
Ok(())
}
}

impl Client {
/// Creates a database client with JWT authentication.
///
/// # Arguments
/// * `url` - URL of the database endpoint
/// * `token` - auth token
pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
let connector = HttpConnector::new();
Self::new_with_connector(url, token, connector).await
}

/// Creates a database client, given a `Url`
///
Expand Down
Loading