Skip to content

Commit 9a1d5c5

Browse files
committed
introduce Execute trait for statements that don't need values
For now it is a sealed trait implemented only for BoundStatement and BoundBatch
1 parent 3a619ae commit 9a1d5c5

File tree

3 files changed

+128
-55
lines changed

3 files changed

+128
-55
lines changed

scylla/src/client/session.rs

+19-55
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::routing::{Shard, ShardAwarePortRange};
3636
use crate::statement::batch::BoundBatch;
3737
use crate::statement::batch::{Batch, BatchStatement};
3838
use crate::statement::bound::BoundStatement;
39+
use crate::statement::execute::{Execute, ExecutePageable};
3940
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
4041
use crate::statement::unprepared::Statement;
4142
use crate::statement::{Consistency, PageSize, StatementConfig};
@@ -56,7 +57,7 @@ use std::time::Duration;
5657
use tokio::time::timeout;
5758
#[cfg(feature = "unstable-cloud")]
5859
use tracing::warn;
59-
use tracing::{debug, error, trace, trace_span, Instrument};
60+
use tracing::{debug, trace, trace_span, Instrument};
6061
use uuid::Uuid;
6162

6263
pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
@@ -481,7 +482,8 @@ impl Session {
481482
statement: impl Into<Statement>,
482483
values: impl SerializeRow,
483484
) -> Result<QueryResult, ExecutionError> {
484-
self.do_query_unpaged(&statement.into(), values).await
485+
let statement = statement.into();
486+
(&statement, values).execute(self).await
485487
}
486488

487489
/// Queries a single page from the database, optionally continuing from a saved point.
@@ -541,7 +543,9 @@ impl Session {
541543
values: impl SerializeRow,
542544
paging_state: PagingState,
543545
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
544-
self.do_query_single_page(&statement.into(), values, paging_state)
546+
let statement = statement.into();
547+
(&statement, values)
548+
.execute_pageable::<true>(self, paging_state)
545549
.await
546550
}
547551

@@ -806,9 +810,9 @@ impl Session {
806810
values: impl BatchValues,
807811
) -> Result<QueryResult, ExecutionError> {
808812
let batch = self.last_minute_prepare_batch(batch, &values).await?;
809-
let batch = BoundBatch::from_batch(batch.as_ref(), values)?;
810-
811-
self.do_batch(&batch).await
813+
BoundBatch::from_batch(batch.as_ref(), values)?
814+
.execute(self)
815+
.await
812816
}
813817
}
814818

@@ -989,38 +993,6 @@ impl Session {
989993
Ok(session)
990994
}
991995

992-
async fn do_query_unpaged(
993-
&self,
994-
statement: &Statement,
995-
values: impl SerializeRow,
996-
) -> Result<QueryResult, ExecutionError> {
997-
let (result, paging_state_response) = self
998-
.query(statement, values, None, PagingState::start())
999-
.await?;
1000-
if !paging_state_response.finished() {
1001-
error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1002-
return Err(ExecutionError::LastAttemptError(
1003-
RequestAttemptError::NonfinishedPagingState,
1004-
));
1005-
}
1006-
Ok(result)
1007-
}
1008-
1009-
async fn do_query_single_page(
1010-
&self,
1011-
statement: &Statement,
1012-
values: impl SerializeRow,
1013-
paging_state: PagingState,
1014-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1015-
self.query(
1016-
statement,
1017-
values,
1018-
Some(statement.get_validated_page_size()),
1019-
paging_state,
1020-
)
1021-
.await
1022-
}
1023-
1024996
/// Sends a request to the database.
1025997
/// Optionally continues fetching results from a saved point.
1026998
///
@@ -1032,7 +1004,7 @@ impl Session {
10321004
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
10331005
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
10341006
/// should be made.
1035-
async fn query(
1007+
pub(crate) async fn query(
10361008
&self,
10371009
statement: &Statement,
10381010
values: impl SerializeRow,
@@ -1299,23 +1271,15 @@ impl Session {
12991271
&self,
13001272
statement: &BoundStatement,
13011273
) -> Result<QueryResult, ExecutionError> {
1302-
let (result, paging_state) = self.execute(statement, None, PagingState::start()).await?;
1303-
if !paging_state.finished() {
1304-
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1305-
return Err(ExecutionError::LastAttemptError(
1306-
RequestAttemptError::NonfinishedPagingState,
1307-
));
1308-
}
1309-
Ok(result)
1274+
statement.execute(self).await
13101275
}
13111276

13121277
async fn do_execute_single_page(
13131278
&self,
13141279
statement: &BoundStatement,
13151280
paging_state: PagingState,
13161281
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1317-
let page_size = statement.prepared.get_validated_page_size();
1318-
self.execute(statement, Some(page_size), paging_state).await
1282+
statement.execute_pageable::<true>(self, paging_state).await
13191283
}
13201284

13211285
/// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1328,7 +1292,7 @@ impl Session {
13281292
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
13291293
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
13301294
/// should be made.
1331-
async fn execute(
1295+
pub(crate) async fn execute_bound_statement(
13321296
&self,
13331297
statement: &BoundStatement,
13341298
page_size: Option<PageSize>,
@@ -1449,7 +1413,7 @@ impl Session {
14491413
.map_err(PagerExecutionError::NextPageError)
14501414
}
14511415

1452-
async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
1416+
pub(crate) async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
14531417
// Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
14541418
// If users batch statements by shard, they will be rewarded with full shard awareness
14551419
let execution_profile = batch
@@ -1750,10 +1714,10 @@ impl Session {
17501714
traces_events_query.config.consistency = consistency;
17511715
traces_events_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
17521716

1753-
let (traces_session_res, traces_events_res) = tokio::try_join!(
1754-
self.do_query_unpaged(&traces_session_query, (tracing_id,)),
1755-
self.do_query_unpaged(&traces_events_query, (tracing_id,))
1756-
)?;
1717+
let session_query = (&traces_session_query, (tracing_id,));
1718+
let events_query = (&traces_events_query, (tracing_id,));
1719+
let (traces_session_res, traces_events_res) =
1720+
tokio::try_join!(session_query.execute(self), events_query.execute(self))?;
17571721

17581722
// Get tracing info
17591723
let maybe_tracing_info: Option<TracingInfo> = traces_session_res

scylla/src/statement/execute.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use scylla_cql::{
2+
frame::request::query::{PagingState, PagingStateResponse},
3+
serialize::row::SerializeRow,
4+
};
5+
use tracing::error;
6+
7+
use crate::{
8+
client::session::Session,
9+
errors::{ExecutionError, RequestAttemptError},
10+
response::query_result::QueryResult,
11+
};
12+
13+
use super::{batch::BoundBatch, bound::BoundStatement, Statement};
14+
15+
// seals the trait to foreign implementations
16+
mod private {
17+
#[allow(unnameable_types)]
18+
pub trait Sealed {}
19+
}
20+
21+
/// A type that can be executed on a [`Session`] without any additional values
22+
///
23+
/// In practice this means that the statement(s) all already had their values bound.
24+
pub trait Execute: private::Sealed {
25+
/// Executes on the session
26+
fn execute(
27+
&self,
28+
session: &Session,
29+
) -> impl std::future::Future<Output = Result<QueryResult, ExecutionError>>;
30+
}
31+
32+
/// A type that can be executed on a [`Session`] and is aware of pagination
33+
pub trait ExecutePageable {
34+
/// Executes a command with the `paging_state` determining where the results should start
35+
///
36+
/// If SINGLE_PAGE is set to true then a single page is returned. If SINGLE_PAGE is set to
37+
/// false, then all pages (starting at `paging_state`) are returned
38+
fn execute_pageable<const SINGLE_PAGE: bool>(
39+
&self,
40+
session: &Session,
41+
paging_state: PagingState,
42+
) -> impl std::future::Future<Output = Result<(QueryResult, PagingStateResponse), ExecutionError>>;
43+
}
44+
45+
impl<T: ExecutePageable + private::Sealed> Execute for T {
46+
/// Executes the pageable type but getting all pages from the start
47+
async fn execute(&self, session: &Session) -> Result<QueryResult, ExecutionError> {
48+
let (result, paging_state) = self
49+
.execute_pageable::<false>(session, PagingState::start())
50+
.await?;
51+
52+
if !paging_state.finished() {
53+
error!("Unpaged query returned a non-empty paging state! This is a driver-side or server-side bug.");
54+
return Err(ExecutionError::LastAttemptError(
55+
RequestAttemptError::NonfinishedPagingState,
56+
));
57+
}
58+
59+
Ok(result)
60+
}
61+
}
62+
63+
impl private::Sealed for BoundBatch {}
64+
65+
impl Execute for BoundBatch {
66+
async fn execute(&self, session: &Session) -> Result<QueryResult, ExecutionError> {
67+
session.do_batch(self).await
68+
}
69+
}
70+
71+
impl private::Sealed for BoundStatement {}
72+
73+
impl ExecutePageable for BoundStatement {
74+
async fn execute_pageable<const SINGLE_PAGE: bool>(
75+
&self,
76+
session: &Session,
77+
paging_state: PagingState,
78+
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
79+
let page_size = if SINGLE_PAGE {
80+
Some(self.prepared.get_validated_page_size())
81+
} else {
82+
None
83+
};
84+
session
85+
.execute_bound_statement(self, page_size, paging_state)
86+
.await
87+
}
88+
}
89+
90+
impl<V: SerializeRow> private::Sealed for (&Statement, V) {}
91+
92+
impl<V: SerializeRow> ExecutePageable for (&Statement, V) {
93+
async fn execute_pageable<const SINGLE_PAGE: bool>(
94+
&self,
95+
session: &Session,
96+
paging_state: PagingState,
97+
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
98+
let page_size = if SINGLE_PAGE {
99+
Some(self.0.get_validated_page_size())
100+
} else {
101+
None
102+
};
103+
104+
session
105+
.query(self.0, &self.1, page_size, paging_state)
106+
.await
107+
}
108+
}

scylla/src/statement/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::policies::retry::RetryPolicy;
1515

1616
pub mod batch;
1717
pub mod bound;
18+
pub mod execute;
1819
pub mod prepared;
1920
pub mod unprepared;
2021

0 commit comments

Comments
 (0)