diff --git a/Cargo.toml b/Cargo.toml index bafa8987..e95b48a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ e2e-tests = [] unstable-dynamic-cluster = [] [dependencies] +serde = "1.0.*" byteorder = "1" log = "0.4.1" lz4-compress = "=0.1.0" diff --git a/examples/paged_query.rs b/examples/paged_query.rs index 141d2b4d..8e27c0af 100644 --- a/examples/paged_query.rs +++ b/examples/paged_query.rs @@ -12,6 +12,7 @@ use cdrs::query::*; use cdrs::frame::IntoBytes; use cdrs::types::from_cdrs::FromCDRSByName; use cdrs::types::prelude::*; +use cdrs::consistency::Consistency; type CurrentSession = Session>>; @@ -26,6 +27,21 @@ impl RowStruct { } } +#[derive(Clone, Debug, IntoCDRSValue, TryFromRow, PartialEq)] +struct AnotherTestTable { + a: i32, + b: i32, + c: i32, + d: i32, + e: i32, +} + +impl AnotherTestTable { + fn into_query_values(self) -> QueryValues { + query_values!("a" => self.a, "b" => self.b, "c" => self.c, "d" => self.d, "e" => self.e) + } +} + fn main() { let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {}).build(); let cluster_config = ClusterTcpConfig(vec![node]); @@ -39,6 +55,12 @@ fn main() { paged_selection_query(&no_compression); println!("\n\nExternal pager state for stateless executions\n"); paged_selection_query_with_state(&no_compression, PagerState::new()); + println!("\n\nPager with query values (list)\n"); + // TODO: Why does this method throws an error? + //paged_with_values_list(&no_compression); + println!("\n\nPager with query value (no list)\n"); + paged_with_value(&no_compression); + println!("\n\nFinished paged query tests\n"); } fn create_keyspace(session: &CurrentSession) { @@ -68,6 +90,70 @@ fn fill_table(session: &CurrentSession) { } } +fn paged_with_value(session: &CurrentSession) { + let create_table_cql = + "CREATE TABLE IF NOT EXISTS test_ks.another_test_table (a int, b int, c int, d int, e int, primary key((a, b), c, d));"; + session + .query(create_table_cql) + .expect("Table creation error"); + + for v in 1..=10 { + session + .query_with_values("INSERT INTO test_ks.another_test_table (a, b, c, d, e) VALUES (?, ?, ?, ?, ?)", + AnotherTestTable { + a: 1, + b: 1, + c: 2, + d: v, + e: v, + }.into_query_values(), + ).unwrap(); + } + + + let q = "SELECT * FROM test_ks.another_test_table where a = ? and b = 1 and c = ?"; + let mut pager = session.paged(3); + let mut query_pager = pager.query_with_param(q, QueryParamsBuilder::new().values(query_values!(1, 2)).finalize()); + + // Oddly enough, this returns false the first time... + assert!(!query_pager.has_more()); + + let mut assert_amount = |a| { + let rows = query_pager.next().expect("pager next"); + + assert_eq!(a, rows.len()); + }; + + assert_amount(3); + assert_amount(3); + assert_amount(3); + assert_amount(1); + + assert!(!query_pager.has_more()); +} + +// TODO: Why does this throw 'Expected 4 or 0 byte int (52)' +fn paged_with_values_list(session: &CurrentSession) { + let q = "SELECT * FROM test_ks.my_test_table where key in (?)"; + let mut pager = session.paged(2); + let mut query_pager = pager.query_with_param(q, QueryParamsBuilder::new() + .values(query_values!(vec![100, 101, 102, 103, 104, 105])) + .finalize()); + + let mut assert_amount = |a| { + let rows = query_pager.next().expect("pager next"); + + assert_eq!(a, rows.len()); + }; + + assert_amount(2); + assert_amount(2); + assert_amount(1); + assert_amount(0); + + assert!(!query_pager.has_more()) +} + fn paged_selection_query(session: &CurrentSession) { let q = "SELECT * FROM test_ks.my_test_table;"; let mut pager = session.paged(2); @@ -92,7 +178,7 @@ fn paged_selection_query_with_state(session: &CurrentSession, state: PagerState) loop { let q = "SELECT * FROM test_ks.my_test_table;"; let mut pager = session.paged(2); - let mut query_pager = pager.query_with_pager_state(q, st); + let mut query_pager = pager.query_with_pager_state(q, st, QueryParams::default()); let rows = query_pager.next().expect("pager next"); for row in rows { diff --git a/src/cluster/pager.rs b/src/cluster/pager.rs index d47949ca..3448e860 100644 --- a/src/cluster/pager.rs +++ b/src/cluster/pager.rs @@ -5,10 +5,11 @@ use std::marker::PhantomData; use crate::cluster::CDRSSession; use crate::error; use crate::frame::frame_result::{RowsMetadata, RowsMetadataFlag}; -use crate::query::{PreparedQuery, QueryParamsBuilder}; +use crate::query::{PreparedQuery, QueryParamsBuilder, QueryValues, QueryParams}; use crate::transport::CDRSTransport; use crate::types::rows::Row; use crate::types::CBytes; +use crate::consistency::Consistency; pub struct SessionPager< 'a, @@ -43,6 +44,7 @@ impl< &'a mut self, query: Q, state: PagerState, + qp: QueryParams, ) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>> where Q: ToString, @@ -51,6 +53,8 @@ impl< pager: self, pager_state: state, query, + qv: qp.values, + consistency: qp.consistency } } @@ -58,7 +62,16 @@ impl< where Q: ToString, { - self.query_with_pager_state(query, PagerState::new()) + self.query_with_param(query,QueryParamsBuilder::new() + .consistency(Consistency::One) + .finalize()) + } + + pub fn query_with_param(&'a mut self, query: Q, qp: QueryParams) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>> + where + Q: ToString, + { + self.query_with_pager_state(query, PagerState::new(), qp) } pub fn exec_with_pager_state( @@ -83,8 +96,10 @@ impl< pub struct QueryPager<'a, Q: ToString, P: 'a> { pager: &'a mut P, - pager_state: PagerState, + pub pager_state: PagerState, query: Q, + qv: Option, + consistency: Consistency } impl< @@ -96,9 +111,15 @@ impl< > QueryPager<'a, Q, SessionPager<'a, M, S, T>> { pub fn next(&mut self) -> error::Result> { - let mut params = QueryParamsBuilder::new().page_size(self.pager.page_size); - if self.pager_state.cursor.is_some() { - params = params.paging_state(self.pager_state.cursor.clone().unwrap()); + let mut params = QueryParamsBuilder::new() + .consistency(self.consistency) + .page_size(self.pager.page_size); + + if let Some(qv) = &self.qv { + params = params.values(qv.clone()); + } + if let Some(cursor) = &self.pager_state.cursor { + params = params.paging_state(cursor.clone()); } let body = self diff --git a/src/consistency.rs b/src/consistency.rs index 8c71082d..bff1535b 100644 --- a/src/consistency.rs +++ b/src/consistency.rs @@ -7,14 +7,18 @@ use std::io; use crate::error; use crate::frame::{FromBytes, FromCursor, IntoBytes}; use crate::types::*; +use std::str::FromStr; /// `Consistency` is an enum which represents Cassandra's consistency levels. -/// To find more details about each consistency level please refer to Cassandra official docs. +/// To find more details about each consistency level please refer to the following documentation: +/// https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlshConsistency.html #[derive(Debug, PartialEq, Clone, Copy)] pub enum Consistency { - /// A write must be written to the commit log and memtable on all replica nodes in the cluster - /// for that partition key. Provides the highest consistency - /// and the lowest availability of any other level. + /// Closest replica, as determined by the snitch. + /// If all replica nodes are down, write succeeds after a hinted handoff. + /// Provides low latency, guarantees writes never fail. + /// Note: this consistency level can only be used for writes. + /// It provides the lowest consistency and the highest availability. Any, /// /// A write must be written to the commit log and memtable of at least one replica node. @@ -97,6 +101,32 @@ impl IntoBytes for Consistency { } } +impl FromStr for Consistency { + type Err = error::Error; + + fn from_str(s: &str) -> Result { + let consistency = match s { + "Any" => Consistency::Any, + "One" => Consistency::One, + "Two" => Consistency::Two, + "Three" => Consistency::Three, + "Quorum" => Consistency::Quorum, + "All" => Consistency::All, + "LocalQuorum" => Consistency::LocalQuorum, + "EachQuorum" => Consistency::EachQuorum, + "Serial" => Consistency::Serial, + "LocalSerial" => Consistency::LocalSerial, + "LocalOne" => Consistency::LocalOne, + _ => Err(error::Error::General(format!( + "Invalid consistency provided: {}", + s + )))?, + }; + + Ok(consistency) + } +} + impl From for Consistency { fn from(bytes: i32) -> Consistency { match bytes { diff --git a/src/types/mod.rs b/src/types/mod.rs index f53a165a..04f0ba83 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -516,7 +516,7 @@ impl FromCursor for CStringList { #[derive(Debug, Clone, Hash, PartialEq, Eq)] /// The structure that represents Cassandra byte type. pub struct CBytes { - bytes: Option>, + pub bytes: Option>, } impl CBytes { @@ -546,7 +546,7 @@ impl CBytes { // self.bytes.map(|v| v.as_slice()) } pub fn is_empty(&self) -> bool { - self.bytes.is_some() + self.bytes.is_none() } } diff --git a/src/types/value.rs b/src/types/value.rs index 11b1cc75..aefa007b 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -84,11 +84,12 @@ impl> From for Value { } } -impl> From> for Value { - fn from(b: Option) -> Value { - match b { - Some(b) => Value::new_normal(b.into()), - None => Value::new_null(), +impl > Into for Option { + fn into(self) -> Bytes { + match self { + // Create empty bytes + None => "".to_string().into(), + Some(t) => t.into(), } } }