Skip to content

Commit b0a8be8

Browse files
committed
move statement type specific request code to their module
this is a no-op the goal is just to make the code less tangled/nested
1 parent 9a1d5c5 commit b0a8be8

File tree

5 files changed

+360
-358
lines changed

5 files changed

+360
-358
lines changed

scylla/src/client/session.rs

+6-294
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::errors::{
1515
ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
1616
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
1717
};
18-
use crate::frame::response::result;
1918
use crate::network::tls::TlsProvider;
2019
use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
2120
use crate::observability::driver_tracing::RequestSpan;
@@ -30,21 +29,20 @@ use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3029
use crate::policies::speculative_execution;
3130
use crate::policies::timestamp_generator::TimestampGenerator;
3231
use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
33-
use crate::response::{NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse};
32+
use crate::response::{NonErrorQueryResponse, PagingState, PagingStateResponse};
3433
use crate::routing::partitioner::PartitionerName;
3534
use crate::routing::{Shard, ShardAwarePortRange};
3635
use crate::statement::batch::BoundBatch;
3736
use crate::statement::batch::{Batch, BatchStatement};
3837
use crate::statement::bound::BoundStatement;
3938
use crate::statement::execute::{Execute, ExecutePageable};
40-
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
39+
use crate::statement::prepared::PreparedStatement;
4140
use crate::statement::unprepared::Statement;
42-
use crate::statement::{Consistency, PageSize, StatementConfig};
41+
use crate::statement::{Consistency, StatementConfig};
4342
use arc_swap::ArcSwapOption;
4443
use futures::future::join_all;
4544
use futures::future::try_join_all;
4645
use itertools::Itertools;
47-
use scylla_cql::frame::response::NonErrorResponse;
4846
use scylla_cql::serialize::batch::{BatchValues, BatchValuesIterator};
4947
use scylla_cql::serialize::row::SerializeRow;
5048
use std::borrow::{Borrow, Cow};
@@ -993,113 +991,7 @@ impl Session {
993991
Ok(session)
994992
}
995993

996-
/// Sends a request to the database.
997-
/// Optionally continues fetching results from a saved point.
998-
///
999-
/// This is now an internal method only.
1000-
///
1001-
/// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1002-
///
1003-
/// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1004-
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1005-
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1006-
/// should be made.
1007-
pub(crate) async fn query(
1008-
&self,
1009-
statement: &Statement,
1010-
values: impl SerializeRow,
1011-
page_size: Option<PageSize>,
1012-
paging_state: PagingState,
1013-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1014-
let execution_profile = statement
1015-
.get_execution_profile_handle()
1016-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1017-
.access();
1018-
1019-
let statement_info = RoutingInfo {
1020-
consistency: statement
1021-
.config
1022-
.consistency
1023-
.unwrap_or(execution_profile.consistency),
1024-
serial_consistency: statement
1025-
.config
1026-
.serial_consistency
1027-
.unwrap_or(execution_profile.serial_consistency),
1028-
..Default::default()
1029-
};
1030-
1031-
let span = RequestSpan::new_query(&statement.contents);
1032-
let span_ref = &span;
1033-
let run_request_result = self
1034-
.run_request(
1035-
statement_info,
1036-
&statement.config,
1037-
execution_profile,
1038-
|connection: Arc<Connection>,
1039-
consistency: Consistency,
1040-
execution_profile: &ExecutionProfileInner| {
1041-
let serial_consistency = statement
1042-
.config
1043-
.serial_consistency
1044-
.unwrap_or(execution_profile.serial_consistency);
1045-
// Needed to avoid moving query and values into async move block
1046-
let statement_ref = &statement;
1047-
let values_ref = &values;
1048-
let paging_state_ref = &paging_state;
1049-
async move {
1050-
if values_ref.is_empty() {
1051-
span_ref.record_request_size(0);
1052-
connection
1053-
.query_raw_with_consistency(
1054-
statement_ref,
1055-
consistency,
1056-
serial_consistency,
1057-
page_size,
1058-
paging_state_ref.clone(),
1059-
)
1060-
.await
1061-
.and_then(QueryResponse::into_non_error_query_response)
1062-
} else {
1063-
let statement =
1064-
connection.prepare(statement_ref).await?.bind(values_ref)?;
1065-
span_ref.record_request_size(statement.values.buffer_size());
1066-
connection
1067-
.execute_raw_with_consistency(
1068-
&statement,
1069-
consistency,
1070-
serial_consistency,
1071-
page_size,
1072-
paging_state_ref.clone(),
1073-
)
1074-
.await
1075-
.and_then(QueryResponse::into_non_error_query_response)
1076-
}
1077-
}
1078-
},
1079-
&span,
1080-
)
1081-
.instrument(span.span().clone())
1082-
.await?;
1083-
1084-
let response = match run_request_result {
1085-
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1086-
response: NonErrorResponse::Result(result::Result::Void),
1087-
tracing_id: None,
1088-
warnings: Vec::new(),
1089-
},
1090-
RunRequestResult::Completed(response) => response,
1091-
};
1092-
1093-
self.handle_set_keyspace_response(&response).await?;
1094-
self.handle_auto_await_schema_agreement(&response).await?;
1095-
1096-
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1097-
span.record_result_fields(&result);
1098-
1099-
Ok((result, paging_state_response))
1100-
}
1101-
1102-
async fn handle_set_keyspace_response(
994+
pub(crate) async fn handle_set_keyspace_response(
1103995
&self,
1104996
response: &NonErrorQueryResponse,
1105997
) -> Result<(), UseKeyspaceError> {
@@ -1115,7 +1007,7 @@ impl Session {
11151007
Ok(())
11161008
}
11171009

1118-
async fn handle_auto_await_schema_agreement(
1010+
pub(crate) async fn handle_auto_await_schema_agreement(
11191011
&self,
11201012
response: &NonErrorQueryResponse,
11211013
) -> Result<(), ExecutionError> {
@@ -1282,116 +1174,6 @@ impl Session {
12821174
statement.execute_pageable::<true>(self, paging_state).await
12831175
}
12841176

1285-
/// Sends a prepared request to the database, optionally continuing from a saved point.
1286-
///
1287-
/// This is now an internal method only.
1288-
///
1289-
/// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1290-
///
1291-
/// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1292-
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1293-
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1294-
/// should be made.
1295-
pub(crate) async fn execute_bound_statement(
1296-
&self,
1297-
statement: &BoundStatement,
1298-
page_size: Option<PageSize>,
1299-
paging_state: PagingState,
1300-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1301-
let paging_state_ref = &paging_state;
1302-
1303-
let (partition_key, token) = statement
1304-
.pk_and_token()
1305-
.map_err(PartitionKeyError::into_execution_error)?
1306-
.unzip();
1307-
1308-
let execution_profile = statement
1309-
.prepared
1310-
.get_execution_profile_handle()
1311-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1312-
.access();
1313-
1314-
let table_spec = statement.prepared.get_table_spec();
1315-
1316-
let statement_info = RoutingInfo {
1317-
consistency: statement
1318-
.prepared
1319-
.config
1320-
.consistency
1321-
.unwrap_or(execution_profile.consistency),
1322-
serial_consistency: statement
1323-
.prepared
1324-
.config
1325-
.serial_consistency
1326-
.unwrap_or(execution_profile.serial_consistency),
1327-
token,
1328-
table: table_spec,
1329-
is_confirmed_lwt: statement.prepared.is_confirmed_lwt(),
1330-
};
1331-
1332-
let span = RequestSpan::new_prepared(
1333-
partition_key.as_ref().map(|pk| pk.iter()),
1334-
token,
1335-
statement.values.buffer_size(),
1336-
);
1337-
1338-
if !span.span().is_disabled() {
1339-
if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
1340-
let cluster_state = self.get_cluster_state();
1341-
let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1342-
span.record_replicas(replicas)
1343-
}
1344-
}
1345-
1346-
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1347-
.run_request(
1348-
statement_info,
1349-
&statement.prepared.config,
1350-
execution_profile,
1351-
|connection: Arc<Connection>,
1352-
consistency: Consistency,
1353-
execution_profile: &ExecutionProfileInner| {
1354-
let serial_consistency = statement
1355-
.prepared
1356-
.config
1357-
.serial_consistency
1358-
.unwrap_or(execution_profile.serial_consistency);
1359-
async move {
1360-
connection
1361-
.execute_raw_with_consistency(
1362-
statement,
1363-
consistency,
1364-
serial_consistency,
1365-
page_size,
1366-
paging_state_ref.clone(),
1367-
)
1368-
.await
1369-
.and_then(QueryResponse::into_non_error_query_response)
1370-
}
1371-
},
1372-
&span,
1373-
)
1374-
.instrument(span.span().clone())
1375-
.await?;
1376-
1377-
let response = match run_request_result {
1378-
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1379-
response: NonErrorResponse::Result(result::Result::Void),
1380-
tracing_id: None,
1381-
warnings: Vec::new(),
1382-
},
1383-
RunRequestResult::Completed(response) => response,
1384-
};
1385-
1386-
self.handle_set_keyspace_response(&response).await?;
1387-
self.handle_auto_await_schema_agreement(&response).await?;
1388-
1389-
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1390-
span.record_result_fields(&result);
1391-
1392-
Ok((result, paging_state_response))
1393-
}
1394-
13951177
async fn do_execute_iter(
13961178
&self,
13971179
statement: BoundStatement,
@@ -1413,76 +1195,6 @@ impl Session {
14131195
.map_err(PagerExecutionError::NextPageError)
14141196
}
14151197

1416-
pub(crate) async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
1417-
// Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
1418-
// If users batch statements by shard, they will be rewarded with full shard awareness
1419-
let execution_profile = batch
1420-
.get_execution_profile_handle()
1421-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1422-
.access();
1423-
1424-
let consistency = batch
1425-
.config
1426-
.consistency
1427-
.unwrap_or(execution_profile.consistency);
1428-
1429-
let serial_consistency = batch
1430-
.config
1431-
.serial_consistency
1432-
.unwrap_or(execution_profile.serial_consistency);
1433-
1434-
let (table, token) = batch
1435-
.first_prepared
1436-
.as_ref()
1437-
.and_then(|(ps, token)| ps.get_table_spec().map(|table| (table, *token)))
1438-
.unzip();
1439-
1440-
let statement_info = RoutingInfo {
1441-
consistency,
1442-
serial_consistency,
1443-
token,
1444-
table,
1445-
is_confirmed_lwt: false,
1446-
};
1447-
1448-
let span = RequestSpan::new_batch();
1449-
1450-
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1451-
.run_request(
1452-
statement_info,
1453-
&batch.config,
1454-
execution_profile,
1455-
|connection: Arc<Connection>,
1456-
consistency: Consistency,
1457-
execution_profile: &ExecutionProfileInner| {
1458-
let serial_consistency = batch
1459-
.config
1460-
.serial_consistency
1461-
.unwrap_or(execution_profile.serial_consistency);
1462-
async move {
1463-
connection
1464-
.batch_with_consistency(batch, consistency, serial_consistency)
1465-
.await
1466-
.and_then(QueryResponse::into_non_error_query_response)
1467-
}
1468-
},
1469-
&span,
1470-
)
1471-
.instrument(span.span().clone())
1472-
.await?;
1473-
1474-
let result = match run_request_result {
1475-
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
1476-
RunRequestResult::Completed(non_error_query_response) => {
1477-
let result = non_error_query_response.into_query_result()?;
1478-
span.record_result_fields(&result);
1479-
result
1480-
}
1481-
};
1482-
1483-
Ok(result)
1484-
}
1485-
14861198
/// Prepares all statements within the batch and returns a new batch where every
14871199
/// statement is prepared.
14881200
/// /// # Example
@@ -1767,7 +1479,7 @@ impl Session {
17671479
/// On success, this request's result is returned.
17681480
// I tried to make this closures take a reference instead of an Arc but failed
17691481
// maybe once async closures get stabilized this can be fixed
1770-
async fn run_request<'a, QueryFut, ResT>(
1482+
pub(crate) async fn run_request<'a, QueryFut, ResT>(
17711483
&'a self,
17721484
statement_info: RoutingInfo<'a>,
17731485
statement_config: &'a StatementConfig,

0 commit comments

Comments
 (0)