diff --git a/Cargo.toml b/Cargo.toml index c2a69c91..63eaf2d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,6 @@ version = "1.6" optional = true [dependencies.clickhouse-rs-cityhash-sys] -path = "clickhouse-rs-cityhash-sys" version = "0.1.2" [dependencies.log] diff --git a/examples/simple.rs b/examples/simple.rs index ca2f4b4d..10866621 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,6 +1,6 @@ -use std::{env, error::Error}; use clickhouse_rs::{row, types::Block, Pool}; use futures_util::StreamExt; +use std::{env, error::Error}; async fn execute(database_url: String) -> Result<(), Box> { env::set_var("RUST_LOG", "clickhouse_rs=debug"); @@ -25,16 +25,20 @@ async fn execute(database_url: String) -> Result<(), Box> { let mut client = pool.get_handle().await?; client.execute(ddl).await?; client.insert("payment", block).await?; - let mut stream = client.query("SELECT * FROM payment").stream(); - while let Some(row) = stream.next().await { - let row = row?; - let id: u32 = row.get("customer_id")?; - let amount: u32 = row.get("amount")?; - let name: Option<&str> = row.get("account_name")?; - println!("Found payment {}: {} {:?}", id, amount, name); + { + let mut stream = client.query("SELECT * FROM payment").stream(); + while let Some(row) = stream.next().await { + let row = row?; + let id: u32 = row.get("customer_id")?; + let amount: u32 = row.get("amount")?; + let name: Option<&str> = row.get("account_name")?; + println!("Found payment {}: {} {:?}", id, amount, name); + } } + let progress = client.progress(); + println!("Progress results: {:?}", progress); Ok(()) } @@ -49,9 +53,8 @@ async fn main() -> Result<(), Box> { #[cfg(all(feature = "tokio_io", feature = "tls"))] #[tokio::main] async fn main() -> Result<(), Box> { - let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| { - "tcp://localhost:9440?secure=true&skip_verify=true".into() - }); + let database_url = env::var("DATABASE_URL") + .unwrap_or_else(|_| "tcp://localhost:9440?secure=true&skip_verify=true".into()); execute(database_url).await } diff --git a/src/lib.rs b/src/lib.rs index 9b79b2a3..b71c724c 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,6 +114,7 @@ use futures_util::{ future, future::BoxFuture, future::FutureExt, stream, stream::BoxStream, StreamExt, }; use log::{info, warn}; +use types::Progress; use crate::{ connecting_stream::ConnectingStream, @@ -238,6 +239,7 @@ pub struct ClientHandle { inner: Option, context: Context, pool: PoolBinding, + progress: Progress, } impl fmt::Debug for ClientHandle { @@ -288,6 +290,7 @@ impl Client { None => PoolBinding::None, Some(p) => PoolBinding::Detached(p), }, + progress: Progress::default(), }; handle.hello().await?; @@ -553,6 +556,10 @@ impl ClientHandle { unreachable!() } } + + pub fn progress(&self) -> &Progress { + &self.progress + } } fn column_name_to_string(name: &str) -> Result { diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 76d4bea1..666c2996 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -1,7 +1,8 @@ use std::{ - fmt, mem, pin::Pin, - sync::Arc, + fmt, mem, + pin::Pin, sync::atomic::{self, Ordering}, + sync::Arc, task::{Context, Poll, Waker}, }; @@ -9,9 +10,9 @@ use futures_util::future::BoxFuture; use log::error; use crate::{ - Client, - ClientHandle, - errors::Result, types::{IntoOptions, OptionsSource}, + errors::Result, + types::{IntoOptions, OptionsSource, Progress}, + Client, ClientHandle, }; pub use self::futures::GetHandle; @@ -215,16 +216,16 @@ impl Pool { match new.poll_unpin(cx) { Poll::Ready(Ok(client)) => { self.inner.idle.push(client).unwrap(); - }, + } Poll::Pending => { // NOTE: it is okay to drop the construction task // because another construction will be attempted // later in Pool::poll let _ = self.inner.new.push(new); - }, + } Poll::Ready(Err(err)) => { return Err(err); - }, + } } } @@ -277,6 +278,7 @@ impl Drop for ClientHandle { let client = Self { inner: Some(inner), pool: pool.clone(), + progress: Progress::default(), context, }; pool.return_conn(client); @@ -294,7 +296,7 @@ mod test { use futures_util::future; - use crate::{Block, errors::Result, Options, test_misc::DATABASE_URL}; + use crate::{errors::Result, test_misc::DATABASE_URL, Block, Options}; use super::Pool; use url::Url; @@ -404,7 +406,8 @@ mod test { #[test] fn test_get_addr() { - let options = Options::from_str("tcp://host1:9000?alt_hosts=host2:9000,host3:9000").unwrap(); + let options = + Options::from_str("tcp://host1:9000?alt_hosts=host2:9000,host3:9000").unwrap(); let pool = Pool::new(options); assert_eq!(pool.get_addr(), &Url::from_str("tcp://host1:9000").unwrap()); diff --git a/src/types/mod.rs b/src/types/mod.rs index 153a6d4c..1d04aeb6 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::HashMap, fmt, mem, pin::Pin, sync::Mutex, str::FromStr}; +use std::{borrow::Cow, collections::HashMap, fmt, mem, pin::Pin, str::FromStr, sync::Mutex}; use chrono::prelude::*; use chrono_tz::Tz; @@ -10,15 +10,15 @@ use crate::errors::ServerError; pub use self::{ block::{Block, RCons, RNil, Row, RowBuilder, Rows}, - column::{Column, ColumnType, Simple, Complex}, + column::{Column, ColumnType, Complex, Simple}, decimal::Decimal, enums::{Enum16, Enum8}, from_sql::{FromSql, FromSqlResult}, - value_ref::ValueRef, options::Options, query::Query, query_result::QueryResult, value::Value, + value_ref::ValueRef, }; pub(crate) use self::{ @@ -51,12 +51,26 @@ mod enums; mod options; #[derive(Copy, Clone, Debug, Default, PartialEq)] -pub(crate) struct Progress { +pub struct Progress { pub rows: u64, pub bytes: u64, pub total_rows: u64, } +impl Progress { + pub(crate) fn update(&mut self, progress: Progress) { + self.rows += progress.rows; + self.bytes += progress.bytes; + self.total_rows += progress.total_rows; + } + + pub(crate) fn reset(&mut self) { + self.rows = 0; + self.bytes = 0; + self.total_rows = 0; + } +} + #[derive(Copy, Clone, Default, Debug, PartialEq)] pub(crate) struct ProfileInfo { pub rows: u64, @@ -200,7 +214,7 @@ has_sql_type! { pub enum DateTimeType { DateTime32, DateTime64(u32, Tz), - Chrono + Chrono, } #[derive(Debug, Copy, Clone, PartialOrd, Eq, PartialEq, Hash)] @@ -351,7 +365,9 @@ impl SqlType { SqlType::Float32 => "Float32".into(), SqlType::Float64 => "Float64".into(), SqlType::Date => "Date".into(), - SqlType::DateTime(DateTimeType::DateTime64(precision, tz)) => format!("DateTime64({}, '{:?}')", precision, tz).into(), + SqlType::DateTime(DateTimeType::DateTime64(precision, tz)) => { + format!("DateTime64({}, '{:?}')", precision, tz).into() + } SqlType::DateTime(_) => "DateTime".into(), SqlType::Ipv4 => "IPv4".into(), SqlType::Ipv6 => "IPv6".into(), diff --git a/src/types/query_result/mod.rs b/src/types/query_result/mod.rs index 32134ef4..7f2635cf 100644 --- a/src/types/query_result/mod.rs +++ b/src/types/query_result/mod.rs @@ -1,4 +1,3 @@ -use std::{marker::PhantomData, sync::Arc}; use futures_util::stream::BoxStream; use futures_util::{ future, @@ -6,10 +5,11 @@ use futures_util::{ TryStreamExt, }; use log::info; +use std::{marker::PhantomData, sync::Arc}; use crate::{ - try_opt, errors::Result, + try_opt, types::{ block::BlockRef, query_result::stream_blocks::BlockStream, Block, Cmd, Complex, Query, Row, Rows, Simple, @@ -93,6 +93,7 @@ impl<'a> QueryResult<'a> { let inner = c.inner.take().unwrap().call(Cmd::SendQuery(query, context)); + c.progress.reset(); BlockStream::<'a>::new(c, inner, skip_first_block) }) } diff --git a/src/types/query_result/stream_blocks.rs b/src/types/query_result/stream_blocks.rs index 706dec69..1c4039ea 100644 --- a/src/types/query_result/stream_blocks.rs +++ b/src/types/query_result/stream_blocks.rs @@ -37,7 +37,11 @@ impl<'a> Drop for BlockStream<'a> { } impl<'a> BlockStream<'a> { - pub(crate) fn new(client: &mut ClientHandle, inner: PacketStream, skip_first_block: bool) -> BlockStream { + pub(crate) fn new( + client: &mut ClientHandle, + inner: PacketStream, + skip_first_block: bool, + ) -> BlockStream { BlockStream { client, inner, @@ -75,10 +79,13 @@ impl<'a> Stream for BlockStream<'a> { } self.eof = true; } - Packet::ProfileInfo(_) | Packet::Progress(_) => {} + Packet::ProfileInfo(_) => {} Packet::Exception(exception) => { self.eof = true; - return Poll::Ready(Some(Err(Error::Server(exception)))) + return Poll::Ready(Some(Err(Error::Server(exception)))); + } + Packet::Progress(progress) => { + self.client.progress.update(progress); } Packet::Block(block) => { self.block_index += 1;