Skip to content

Commit fc92c9b

Browse files
committed
Add test for RequiredHostAbsent error
1 parent d780890 commit fc92c9b

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

scylla/tests/integration/session/schema_agreement.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use assert_matches::assert_matches;
45
use scylla::client::PoolSize;
@@ -238,3 +239,86 @@ async fn test_schema_await_with_transient_failure() {
238239
Err(err) => panic!("{}", err),
239240
}
240241
}
242+
243+
// Test that produces SchemaAgreementError::RequiredHostAbsent to prove that
244+
// such condition is possible, and handled correctly.
245+
#[tokio::test]
246+
#[cfg_attr(scylla_cloud_tests, ignore)]
247+
async fn test_schema_await_required_host_absent() {
248+
setup_tracing();
249+
250+
let res = test_with_3_node_cluster(
251+
ShardAwareness::QueryNode,
252+
|proxy_uris, translation_map, mut running_proxy| async move {
253+
// DB preparation phase
254+
let session: Session = SessionBuilder::new()
255+
.known_node(proxy_uris[0].as_str())
256+
.address_translator(Arc::new(translation_map.clone()))
257+
// Needed to have pools filled immediately after session creation.
258+
.pool_size(PoolSize::PerHost(1.try_into().unwrap()))
259+
// Schema agreement will only return error after timeout,
260+
// so without this line the test would take over 60s.
261+
.schema_agreement_timeout(Duration::from_secs(1))
262+
.build()
263+
.await
264+
.unwrap();
265+
266+
let host_ids = calculate_proxy_host_ids(&proxy_uris, &translation_map, &session);
267+
268+
let coordinator = NodeIdentifier::HostId(host_ids[1]);
269+
270+
running_proxy.running_nodes[1].change_request_rules(Some(vec![
271+
// This prevents opening new connections to the node
272+
RequestRule(
273+
Condition::RequestOpcode(RequestOpcode::Startup),
274+
RequestReaction::drop_connection(),
275+
),
276+
// This prevents schema agreement check on this node, and closes connection.
277+
// After some attempts, no connections will be left.
278+
RequestRule(
279+
Condition::not(Condition::ConnectionRegisteredAnyEvent)
280+
.and(Condition::RequestOpcode(RequestOpcode::Query))
281+
.and(Condition::BodyContainsCaseSensitive(Box::new(
282+
*b"system.local",
283+
))),
284+
RequestReaction::drop_connection(),
285+
),
286+
]));
287+
288+
let ks = unique_keyspace_name();
289+
let mut request = Statement::new(format!(
290+
"CREATE KEYSPACE {ks} WITH
291+
REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
292+
));
293+
request.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new(
294+
coordinator,
295+
None,
296+
)));
297+
298+
let result = session.query_unpaged(request, &[]).await;
299+
let Err(ExecutionError::SchemaAgreementError(
300+
SchemaAgreementError::RequiredHostAbsent(_),
301+
)) = result
302+
else {
303+
panic!("Unexpected error type: {:?}", result);
304+
};
305+
306+
// Cleanup
307+
running_proxy.running_nodes[1].change_request_rules(Some(vec![]));
308+
session.await_schema_agreement().await.unwrap();
309+
session
310+
.query_unpaged(format!("DROP KEYSPACE {ks}"), &[])
311+
.await
312+
.unwrap();
313+
314+
running_proxy
315+
},
316+
)
317+
.await;
318+
319+
match res {
320+
Ok(()) => (),
321+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
322+
Err(err) => panic!("{}", err),
323+
}
324+
}

0 commit comments

Comments
 (0)