diff --git a/scylla/tests/integration/query_result.rs b/scylla/tests/integration/query_result.rs index eaeed37291..cf0c0c85b4 100644 --- a/scylla/tests/integration/query_result.rs +++ b/scylla/tests/integration/query_result.rs @@ -1,10 +1,11 @@ use assert_matches::assert_matches; use futures::TryStreamExt; -use scylla::errors::PagerExecutionError; +use scylla::errors::{ExecutionError, PagerExecutionError}; use scylla::{ batch::{Batch, BatchType}, client::session::Session, query::Query, + response::query_result::QueryResult, }; use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; @@ -295,3 +296,166 @@ async fn execute_iter_serialization_error() { Err(PagerExecutionError::SerializationError(_)) ) } + +async fn create_session(table_name: &str) -> Session { + let session: Session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + let cql_create_ks = format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION =\ + {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", + ks + ); + session.ddl(cql_create_ks).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + + let cql_create_table = format!( + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val int)", + table_name, + ); + session.ddl(cql_create_table).await.unwrap(); + + session +} + +async fn check_query_unpaged(insert_num: u64, use_prepared_statements: bool) { + let table_name = if use_prepared_statements { + "execute_unpaged" + } else { + "query_unpaged" + }; + let session: Session = create_session(table_name).await; + + for i in 0..insert_num { + if use_prepared_statements { + let prepared_statement = session + .prepare(format!("INSERT INTO {}(id, val) VALUES (?, ?)", table_name)) + .await + .unwrap(); + session + .execute_unpaged(&prepared_statement, &vec![i as i32, i as i32]) + .await + .unwrap(); + } else { + let cql = format!("INSERT INTO {}(id, val) VALUES ({}, {})", table_name, i, i); + session.query_unpaged(cql, &[]).await.unwrap(); + } + } + + let query_result: QueryResult; + if use_prepared_statements { + let prepared_statement = session + .prepare(format!("SELECT * FROM {}", table_name)) + .await + .unwrap(); + query_result = session + .execute_unpaged(&prepared_statement, &[]) + .await + .unwrap(); + } else { + let select_query = Query::new(format!("SELECT * FROM {}", table_name)).with_page_size(5); + query_result = session.query_unpaged(select_query, &[]).await.unwrap(); + } + let rows = query_result.into_rows_result().unwrap(); + + // NOTE: check rows number using 'rows_num()' method. + assert_eq!(rows.rows_num(), insert_num as usize); + + // NOTE: check actual rows number. + let actual_rows = rows + .rows::<(i32, i32)>() + .unwrap() + .collect::, _>>() + .unwrap(); + assert_eq!(actual_rows.len(), insert_num as usize); +} + +async fn unpaged_error(use_prepared_statements: bool) { + let table_name = if use_prepared_statements { + "execute_unpaged" + } else { + "query_unpaged" + }; + let session: Session = create_session(table_name).await; + + let query_result: Result; + if use_prepared_statements { + let prepared_statement = session + .prepare(format!("SELECT * FROM {}", table_name)) + .await + .unwrap(); + // NOTE: drop table to make the main query return error + session + .ddl(format!("DROP TABLE IF EXISTS {}", table_name)) + .await + .unwrap(); + query_result = session.execute_unpaged(&prepared_statement, &[]).await; + } else { + let select_query = Query::new(format!("SELECT * FROM fake{}", table_name)); + query_result = session.query_unpaged(select_query, &[]).await; + } + match query_result { + Ok(_) => panic!("Unexpected success"), + Err(err) => println!("Table not found as expected: {:?}", err), + } +} + +#[tokio::test] +async fn test_query_unpaged_error() { + unpaged_error(false).await +} + +#[tokio::test] +async fn test_execute_unpaged_error() { + unpaged_error(true).await +} + +#[tokio::test] +async fn test_query_unpaged_no_rows() { + check_query_unpaged(0, false).await; +} + +#[tokio::test] +async fn test_query_unpaged_one_row() { + check_query_unpaged(1, false).await; +} + +#[tokio::test] +async fn test_query_unpaged_ten_rows() { + check_query_unpaged(10, false).await; +} + +#[tokio::test] +async fn test_query_unpaged_hundred_rows() { + check_query_unpaged(100, false).await; +} + +#[tokio::test] +async fn test_query_unpaged_thousand_rows() { + check_query_unpaged(1000, false).await; +} + +#[tokio::test] +async fn test_execute_unpaged_no_rows() { + check_query_unpaged(0, true).await; +} + +#[tokio::test] +async fn test_execute_unpaged_one_row() { + check_query_unpaged(1, true).await; +} + +#[tokio::test] +async fn test_execute_unpaged_ten_rows() { + check_query_unpaged(10, true).await; +} + +#[tokio::test] +async fn test_execute_unpaged_hundred_rows() { + check_query_unpaged(100, true).await; +} + +#[tokio::test] +async fn test_execute_unpaged_thousand_rows() { + check_query_unpaged(1000, true).await; +}