Skip to content

integration: direct node connectivity test #1189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/serverless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:

- name: Create serverless cluster
run: |
ccm create serverless -i 127.0.1. -n 1 --scylla -v release:5.1.6
ccm create serverless -i 127.0.1. -n 1 --scylla -v release:6.2
ccm start --sni-proxy --sni-port 7777
- name: Update rust toolchain
run: rustup update
Expand Down
1 change: 1 addition & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod hygiene;
mod large_batch_statements;
mod lwt_optimisation;
mod new_session;
mod reachability;
mod retries;
mod self_identity;
mod shards;
Expand Down
60 changes: 60 additions & 0 deletions scylla/tests/integration/reachability.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! These tests check that all nodes are reachable, i.e. they can serve requests.

use scylla::client::session::Session;
use scylla::serialize::row::SerializeRow;

use crate::utils::{
create_new_session_builder, execute_prepared_statement_everywhere, setup_tracing,
unique_keyspace_name, PerformDDL as _,
};

/// Tests that all nodes are reachable and can serve requests.
#[tokio::test]
#[ntest::timeout(30000)]
async fn test_all_nodes_are_reachable_and_serving() {
setup_tracing();

let session = create_new_session_builder().build().await.unwrap();

let ks = unique_keyspace_name();

/* Prepare schema */
prepare_schema(&session, &ks, "t").await;

let prepared = session
.prepare(format!(
"INSERT INTO {}.t (a, b, c) VALUES (?, ?, 'abc')",
ks
))
.await
.unwrap();

// FIXME: rename to get_cluster_state
let cluster_state = session.get_cluster_data();
execute_prepared_statement_everywhere(
&session,
&cluster_state,
&prepared,
&(1, 2) as &dyn SerializeRow,
)
.await
.unwrap();
}

async fn prepare_schema(session: &Session, ks: &str, table: &str) {
session
.ddl(format!(
"CREATE KEYSPACE IF NOT EXISTS {}
WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}",
ks
))
.await
.unwrap();
session
.ddl(format!(
"CREATE TABLE IF NOT EXISTS {}.{} (a int, b int, c text, primary key (a, b))",
ks, table
))
.await
.unwrap();
}
94 changes: 5 additions & 89 deletions scylla/tests/integration/tablets.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
use std::sync::Arc;

use crate::utils::{
execute_prepared_statement_everywhere, execute_unprepared_statement_everywhere,
scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name,
PerformDDL,
};

use futures::future::try_join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use scylla::client::execution_profile::ExecutionProfile;
use scylla::client::session::Session;
use scylla::cluster::ClusterState;
use scylla::cluster::Node;
use scylla::cluster::NodeRef;
use scylla::policies::load_balancing::FallbackPlan;
use scylla::policies::load_balancing::LoadBalancingPolicy;
use scylla::policies::load_balancing::RoutingInfo;
use scylla::prepared_statement::PreparedStatement;
use scylla::query::Query;
use scylla::response::query_result::QueryResult;
use scylla::serialize::row::SerializeRow;

use scylla::errors::ExecutionError;
use scylla_proxy::{
Condition, ProxyError, Reaction, ResponseFrame, ResponseOpcode, ResponseReaction, ResponseRule,
ShardAwareness, TargetShard, WorkerError,
Expand Down Expand Up @@ -156,83 +148,6 @@ fn calculate_key_per_tablet(tablets: &[Tablet], prepared: &PreparedStatement) ->
value_lists
}

#[derive(Debug)]
struct SingleTargetLBP {
target: (Arc<Node>, Option<u32>),
}

impl LoadBalancingPolicy for SingleTargetLBP {
fn pick<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> Option<(NodeRef<'a>, Option<u32>)> {
Some((&self.target.0, self.target.1))
}

fn fallback<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> FallbackPlan<'a> {
Box::new(std::iter::empty())
}

fn name(&self) -> String {
"SingleTargetLBP".to_owned()
}
}

async fn send_statement_everywhere(
session: &Session,
cluster: &ClusterState,
statement: &PreparedStatement,
values: &dyn SerializeRow,
) -> Result<Vec<QueryResult>, ExecutionError> {
let tasks = cluster.get_nodes_info().iter().flat_map(|node| {
let shard_count: u16 = node.sharder().unwrap().nr_shards.into();
(0..shard_count).map(|shard| {
let mut stmt = statement.clone();
let values_ref = &values;
let policy = SingleTargetLBP {
target: (node.clone(), Some(shard as u32)),
};
let execution_profile = ExecutionProfile::builder()
.load_balancing_policy(Arc::new(policy))
.build();
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));

async move { session.execute_unpaged(&stmt, values_ref).await }
})
});

try_join_all(tasks).await
}

async fn send_unprepared_query_everywhere(
session: &Session,
cluster: &ClusterState,
query: &Query,
) -> Result<Vec<QueryResult>, ExecutionError> {
let tasks = cluster.get_nodes_info().iter().flat_map(|node| {
let shard_count: u16 = node.sharder().unwrap().nr_shards.into();
(0..shard_count).map(|shard| {
let mut stmt = query.clone();
let policy = SingleTargetLBP {
target: (node.clone(), Some(shard as u32)),
};
let execution_profile = ExecutionProfile::builder()
.load_balancing_policy(Arc::new(policy))
.build();
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));

async move { session.query_unpaged(stmt, &()).await }
})
});

try_join_all(tasks).await
}

fn frame_has_tablet_feedback(frame: ResponseFrame) -> bool {
let response =
scylla_cql::frame::parse_response_body_extensions(frame.params.flags, None, frame.body)
Expand Down Expand Up @@ -451,16 +366,17 @@ async fn test_tablet_feedback_not_sent_for_unprepared_queries() {

// I expect Scylla to not send feedback for unprepared queries,
// as such queries cannot be token-aware anyway
send_unprepared_query_everywhere(
execute_unprepared_statement_everywhere(
&session,
session.get_cluster_data().as_ref(),
&Query::new(format!("INSERT INTO {ks}.t (a, b, c) VALUES (1, 1, 'abc')")),
Comment on lines 366 to 372
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I don't really like "execute" used with unprepared statements, because EXECUTE is a CQL command to execute prepared statements. Would "send_unprepared_statement_everywhere" be an acceptable name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking that once we do the execution API refactor for 2.0, there will be only one method for all kinds of statements: execute. Do you agree? If so, then why not use it here?

Copy link
Collaborator

@Lorak-mmk Lorak-mmk Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about it lately, and I came to the conclusion that even in new execution API we should keep separate methods for unprepared statements, prepared statements, and batches.
Why? The choice of the query type should be more concious, a single method makes this less obvious.
Also the interface would be simpler to learn: user would still have methods of the struct that accept simple types, and not traits which then user needs to research and learn what actually implements them.

What I think request execution refactor should be mostly about is enabling configuration of the request, meaning we can do session.execute(something).with_timestamp(....).paging_iter().await instead of having an exponential amount of methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH, the fact that the CQL protocol has some specific names for execution of prepared and unprepared statements does not imply that those names are a good fit for the names of high-level user-facing API functions. It's not intuitive at all for anyone not well-versed in the CQL protocol that "query" is related to unprepared statements and "execute" to prepared statements.

If in the future we were to have distinct names for execution of different types of statements, then I'd go for:

  • execute_unprepared(),
  • execute_prepared(),
  • execute_batch().

The names then make it perfectly clear that the action is execution and the object is a particular kind of a statement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the names do not need to be the same (but in that case the documentation should clearly state which CQL command it corresponds to).
The proposed names are a bit too long for my taste.

&(),
)
.await
.unwrap();

let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum();
assert!(feedbacks == 0);
assert_eq!(feedbacks, 0);

running_proxy
},
Expand Down Expand Up @@ -553,7 +469,7 @@ async fn test_lwt_optimization_works_with_tablets() {
.unwrap()
.value()
);
send_statement_everywhere(
execute_prepared_statement_everywhere(
&session,
session.get_cluster_data().as_ref(),
&prepared_insert,
Expand Down
115 changes: 115 additions & 0 deletions scylla/tests/integration/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::future::try_join_all;
use futures::Future;
use itertools::Either;
use scylla::client::execution_profile::ExecutionProfile;
use scylla::client::session::Session;
use scylla::client::session_builder::{GenericSessionBuilder, SessionBuilderKind};
Expand All @@ -7,8 +9,11 @@ use scylla::cluster::NodeRef;
use scylla::deserialize::DeserializeValue;
use scylla::errors::ExecutionError;
use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
use scylla::prepared_statement::PreparedStatement;
use scylla::query::Query;
use scylla::response::query_result::QueryResult;
use scylla::routing::Shard;
use scylla::serialize::row::SerializeRow;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
Expand Down Expand Up @@ -233,3 +238,113 @@ impl PerformDDL for Session {
self.query_unpaged(query, &[]).await.map(|_| ())
}
}

#[derive(Debug)]
pub(crate) struct SingleTargetLBP {
target: (Arc<scylla::cluster::Node>, Option<u32>),
}

impl LoadBalancingPolicy for SingleTargetLBP {
fn pick<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> Option<(NodeRef<'a>, Option<u32>)> {
Some((&self.target.0, self.target.1))
}

fn fallback<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterState,
) -> FallbackPlan<'a> {
Box::new(std::iter::empty())
}

fn name(&self) -> String {
"SingleTargetLBP".to_owned()
}
}

async fn for_each_target_execute<ExecuteFn, ExecuteFut>(
cluster: &ClusterState,
execute: ExecuteFn,
) -> Result<Vec<QueryResult>, ExecutionError>
where
ExecuteFn: Fn(Arc<scylla::cluster::Node>, Option<Shard>) -> ExecuteFut,
ExecuteFut: Future<Output = Result<QueryResult, ExecutionError>>,
{
let tasks = cluster.get_nodes_info().iter().flat_map(|node| {
let maybe_shard_count: Option<u16> = node.sharder().map(|sharder| sharder.nr_shards.into());
match maybe_shard_count {
Some(shard_count) => Either::Left(
(0..shard_count).map(|shard| execute(node.clone(), Some(shard as u32))),
),
None => Either::Right(std::iter::once(execute(node.clone(), None))),
}
});

try_join_all(tasks).await
}

pub(crate) async fn execute_prepared_statement_everywhere(
session: &Session,
cluster: &ClusterState,
statement: &PreparedStatement,
values: &dyn SerializeRow,
) -> Result<Vec<QueryResult>, ExecutionError> {
async fn send_to_target(
session: &Session,
node: Arc<scylla::cluster::Node>,
shard: Option<Shard>,
statement: &PreparedStatement,
values: &dyn SerializeRow,
) -> Result<QueryResult, ExecutionError> {
let mut stmt = statement.clone();
let values_ref = &values;
let policy = SingleTargetLBP {
target: (node, shard),
};
let execution_profile = ExecutionProfile::builder()
.load_balancing_policy(Arc::new(policy))
.build();
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));

session.execute_unpaged(&stmt, values_ref).await
}

for_each_target_execute(cluster, |node, shard| {
send_to_target(session, node, shard, statement, values)
})
.await
}

pub(crate) async fn execute_unprepared_statement_everywhere(
session: &Session,
cluster: &ClusterState,
query: &Query,
values: &dyn SerializeRow,
) -> Result<Vec<QueryResult>, ExecutionError> {
async fn send_to_target(
session: &Session,
node: Arc<scylla::cluster::Node>,
shard: Option<Shard>,
mut statement: Query,
values: &dyn SerializeRow,
) -> Result<QueryResult, ExecutionError> {
let policy = SingleTargetLBP {
target: (node, shard),
};
let execution_profile = ExecutionProfile::builder()
.load_balancing_policy(Arc::new(policy))
.build();
statement.set_execution_profile_handle(Some(execution_profile.into_handle()));

session.query_unpaged(statement, values).await
}

for_each_target_execute(cluster, |node, shard| {
send_to_target(session, node, shard, query.clone(), values)
})
.await
}