Skip to content
This repository was archived by the owner on Jan 2, 2025. It is now read-only.

Implements FromStr for Consistency #357

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ e2e-tests = []
unstable-dynamic-cluster = []

[dependencies]
serde = "1.0.*"
byteorder = "1"
log = "0.4.1"
lz4-compress = "=0.1.0"
Expand Down
88 changes: 87 additions & 1 deletion examples/paged_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use cdrs::query::*;
use cdrs::frame::IntoBytes;
use cdrs::types::from_cdrs::FromCDRSByName;
use cdrs::types::prelude::*;
use cdrs::consistency::Consistency;

type CurrentSession = Session<RoundRobin<TcpConnectionPool<NoneAuthenticator>>>;

Expand All @@ -26,6 +27,21 @@ impl RowStruct {
}
}

#[derive(Clone, Debug, IntoCDRSValue, TryFromRow, PartialEq)]
struct AnotherTestTable {
a: i32,
b: i32,
c: i32,
d: i32,
e: i32,
}

impl AnotherTestTable {
fn into_query_values(self) -> QueryValues {
query_values!("a" => self.a, "b" => self.b, "c" => self.c, "d" => self.d, "e" => self.e)
}
}

fn main() {
let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {}).build();
let cluster_config = ClusterTcpConfig(vec![node]);
Expand All @@ -39,6 +55,12 @@ fn main() {
paged_selection_query(&no_compression);
println!("\n\nExternal pager state for stateless executions\n");
paged_selection_query_with_state(&no_compression, PagerState::new());
println!("\n\nPager with query values (list)\n");
// TODO: Why does this method throws an error?
//paged_with_values_list(&no_compression);
println!("\n\nPager with query value (no list)\n");
paged_with_value(&no_compression);
println!("\n\nFinished paged query tests\n");
}

fn create_keyspace(session: &CurrentSession) {
Expand Down Expand Up @@ -68,6 +90,70 @@ fn fill_table(session: &CurrentSession) {
}
}

fn paged_with_value(session: &CurrentSession) {
let create_table_cql =
"CREATE TABLE IF NOT EXISTS test_ks.another_test_table (a int, b int, c int, d int, e int, primary key((a, b), c, d));";
session
.query(create_table_cql)
.expect("Table creation error");

for v in 1..=10 {
session
.query_with_values("INSERT INTO test_ks.another_test_table (a, b, c, d, e) VALUES (?, ?, ?, ?, ?)",
AnotherTestTable {
a: 1,
b: 1,
c: 2,
d: v,
e: v,
}.into_query_values(),
).unwrap();
}


let q = "SELECT * FROM test_ks.another_test_table where a = ? and b = 1 and c = ?";
let mut pager = session.paged(3);
let mut query_pager = pager.query_with_param(q, QueryParamsBuilder::new().values(query_values!(1, 2)).finalize());

// Oddly enough, this returns false the first time...
assert!(!query_pager.has_more());

let mut assert_amount = |a| {
let rows = query_pager.next().expect("pager next");

assert_eq!(a, rows.len());
};

assert_amount(3);
assert_amount(3);
assert_amount(3);
assert_amount(1);

assert!(!query_pager.has_more());
}

// TODO: Why does this throw 'Expected 4 or 0 byte int (52)'
fn paged_with_values_list(session: &CurrentSession) {
let q = "SELECT * FROM test_ks.my_test_table where key in (?)";
let mut pager = session.paged(2);
let mut query_pager = pager.query_with_param(q, QueryParamsBuilder::new()
.values(query_values!(vec![100, 101, 102, 103, 104, 105]))
.finalize());

let mut assert_amount = |a| {
let rows = query_pager.next().expect("pager next");

assert_eq!(a, rows.len());
};

assert_amount(2);
assert_amount(2);
assert_amount(1);
assert_amount(0);

assert!(!query_pager.has_more())
}

fn paged_selection_query(session: &CurrentSession) {
let q = "SELECT * FROM test_ks.my_test_table;";
let mut pager = session.paged(2);
Expand All @@ -92,7 +178,7 @@ fn paged_selection_query_with_state(session: &CurrentSession, state: PagerState)
loop {
let q = "SELECT * FROM test_ks.my_test_table;";
let mut pager = session.paged(2);
let mut query_pager = pager.query_with_pager_state(q, st);
let mut query_pager = pager.query_with_pager_state(q, st, QueryParams::default());

let rows = query_pager.next().expect("pager next");
for row in rows {
Expand Down
33 changes: 27 additions & 6 deletions src/cluster/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::marker::PhantomData;
use crate::cluster::CDRSSession;
use crate::error;
use crate::frame::frame_result::{RowsMetadata, RowsMetadataFlag};
use crate::query::{PreparedQuery, QueryParamsBuilder};
use crate::query::{PreparedQuery, QueryParamsBuilder, QueryValues, QueryParams};
use crate::transport::CDRSTransport;
use crate::types::rows::Row;
use crate::types::CBytes;
use crate::consistency::Consistency;

pub struct SessionPager<
'a,
Expand Down Expand Up @@ -43,6 +44,7 @@ impl<
&'a mut self,
query: Q,
state: PagerState,
qp: QueryParams,
) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where
Q: ToString,
Expand All @@ -51,14 +53,25 @@ impl<
pager: self,
pager_state: state,
query,
qv: qp.values,
consistency: qp.consistency
}
}

pub fn query<Q>(&'a mut self, query: Q) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where
Q: ToString,
{
self.query_with_pager_state(query, PagerState::new())
self.query_with_param(query,QueryParamsBuilder::new()
.consistency(Consistency::One)
.finalize())
}

pub fn query_with_param<Q>(&'a mut self, query: Q, qp: QueryParams) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where
Q: ToString,
{
self.query_with_pager_state(query, PagerState::new(), qp)
}

pub fn exec_with_pager_state(
Expand All @@ -83,8 +96,10 @@ impl<

pub struct QueryPager<'a, Q: ToString, P: 'a> {
pager: &'a mut P,
pager_state: PagerState,
pub pager_state: PagerState,
query: Q,
qv: Option<QueryValues>,
consistency: Consistency
}

impl<
Expand All @@ -96,9 +111,15 @@ impl<
> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
{
pub fn next(&mut self) -> error::Result<Vec<Row>> {
let mut params = QueryParamsBuilder::new().page_size(self.pager.page_size);
if self.pager_state.cursor.is_some() {
params = params.paging_state(self.pager_state.cursor.clone().unwrap());
let mut params = QueryParamsBuilder::new()
.consistency(self.consistency)
.page_size(self.pager.page_size);

if let Some(qv) = &self.qv {
params = params.values(qv.clone());
}
if let Some(cursor) = &self.pager_state.cursor {
params = params.paging_state(cursor.clone());
}

let body = self
Expand Down
38 changes: 34 additions & 4 deletions src/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ use std::io;
use crate::error;
use crate::frame::{FromBytes, FromCursor, IntoBytes};
use crate::types::*;
use std::str::FromStr;

/// `Consistency` is an enum which represents Cassandra's consistency levels.
/// To find more details about each consistency level please refer to Cassandra official docs.
/// To find more details about each consistency level please refer to the following documentation:
/// https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlshConsistency.html
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Consistency {
/// A write must be written to the commit log and memtable on all replica nodes in the cluster
/// for that partition key. Provides the highest consistency
/// and the lowest availability of any other level.
/// Closest replica, as determined by the snitch.
/// If all replica nodes are down, write succeeds after a hinted handoff.
/// Provides low latency, guarantees writes never fail.
/// Note: this consistency level can only be used for writes.
/// It provides the lowest consistency and the highest availability.
Any,
///
/// A write must be written to the commit log and memtable of at least one replica node.
Expand Down Expand Up @@ -97,6 +101,32 @@ impl IntoBytes for Consistency {
}
}

impl FromStr for Consistency {
type Err = error::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let consistency = match s {
"Any" => Consistency::Any,
"One" => Consistency::One,
"Two" => Consistency::Two,
"Three" => Consistency::Three,
"Quorum" => Consistency::Quorum,
"All" => Consistency::All,
"LocalQuorum" => Consistency::LocalQuorum,
"EachQuorum" => Consistency::EachQuorum,
"Serial" => Consistency::Serial,
"LocalSerial" => Consistency::LocalSerial,
"LocalOne" => Consistency::LocalOne,
_ => Err(error::Error::General(format!(
"Invalid consistency provided: {}",
s
)))?,
};

Ok(consistency)
}
}

impl From<i32> for Consistency {
fn from(bytes: i32) -> Consistency {
match bytes {
Expand Down
4 changes: 2 additions & 2 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl FromCursor for CStringList {
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
/// The structure that represents Cassandra byte type.
pub struct CBytes {
bytes: Option<Vec<u8>>,
pub bytes: Option<Vec<u8>>,
}

impl CBytes {
Expand Down Expand Up @@ -546,7 +546,7 @@ impl CBytes {
// self.bytes.map(|v| v.as_slice())
}
pub fn is_empty(&self) -> bool {
self.bytes.is_some()
self.bytes.is_none()
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ impl<T: Into<Bytes>> From<T> for Value {
}
}

impl<T: Into<Bytes>> From<Option<T>> for Value {
fn from(b: Option<T>) -> Value {
match b {
Some(b) => Value::new_normal(b.into()),
None => Value::new_null(),
impl <T: serde::Serialize + Into<Bytes>> Into<Bytes> for Option<T> {
fn into(self) -> Bytes {
match self {
// Create empty bytes
None => "".to_string().into(),
Some(t) => t.into(),
}
}
}
Expand Down