Skip to content

Commit 18debc9

Browse files
committed
Add test for RequiredHostAbsent error
1 parent d567ace commit 18debc9

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

scylla/tests/integration/session/schema_agreement.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use assert_matches::assert_matches;
45
use scylla::client::PoolSize;
@@ -237,3 +238,86 @@ async fn test_schema_await_with_transient_failure() {
237238
Err(err) => panic!("{}", err),
238239
}
239240
}
241+
242+
// Test that produces SchemaAgreementError::RequiredHostAbsent to prove that
243+
// such condition is possible, and handled correctly.
244+
#[tokio::test]
245+
#[cfg_attr(scylla_cloud_tests, ignore)]
246+
async fn test_schema_await_required_host_absent() {
247+
setup_tracing();
248+
249+
let res = test_with_3_node_cluster(
250+
ShardAwareness::QueryNode,
251+
|proxy_uris, translation_map, mut running_proxy| async move {
252+
// DB preparation phase
253+
let session: Session = SessionBuilder::new()
254+
.known_node(proxy_uris[0].as_str())
255+
.address_translator(Arc::new(translation_map.clone()))
256+
// Needed to have pools filled immediately after session creation.
257+
.pool_size(PoolSize::PerHost(1.try_into().unwrap()))
258+
// Schema agreement will only return error after timeout,
259+
// so without this line the test would take over 60s.
260+
.schema_agreement_timeout(Duration::from_secs(1))
261+
.build()
262+
.await
263+
.unwrap();
264+
265+
let host_ids = calculate_proxy_host_ids(&proxy_uris, &translation_map, &session);
266+
267+
let coordinator = NodeIdentifier::HostId(host_ids[1]);
268+
269+
running_proxy.running_nodes[1].change_request_rules(Some(vec![
270+
// This prevents opening new connections to the node
271+
RequestRule(
272+
Condition::RequestOpcode(RequestOpcode::Startup),
273+
RequestReaction::drop_connection(),
274+
),
275+
// This prevents schema agreement check on this node, and closes connection.
276+
// After some attempts, no connections will be left.
277+
RequestRule(
278+
Condition::not(Condition::ConnectionRegisteredAnyEvent)
279+
.and(Condition::RequestOpcode(RequestOpcode::Query))
280+
.and(Condition::BodyContainsCaseSensitive(Box::new(
281+
*b"system.local",
282+
))),
283+
RequestReaction::drop_connection(),
284+
),
285+
]));
286+
287+
let ks = unique_keyspace_name();
288+
let mut request = Statement::new(format!(
289+
"CREATE KEYSPACE {ks} WITH
290+
REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
291+
));
292+
request.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new(
293+
coordinator,
294+
None,
295+
)));
296+
297+
let result = session.query_unpaged(request, &[]).await;
298+
let Err(ExecutionError::SchemaAgreementError(
299+
SchemaAgreementError::RequiredHostAbsent(_),
300+
)) = result
301+
else {
302+
panic!("Unexpected error type: {:?}", result);
303+
};
304+
305+
// Cleanup
306+
running_proxy.running_nodes[1].change_request_rules(Some(vec![]));
307+
session.await_schema_agreement().await.unwrap();
308+
session
309+
.query_unpaged(format!("DROP KEYSPACE {ks}"), &[])
310+
.await
311+
.unwrap();
312+
313+
running_proxy
314+
},
315+
)
316+
.await;
317+
318+
match res {
319+
Ok(()) => (),
320+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
321+
Err(err) => panic!("{}", err),
322+
}
323+
}

0 commit comments

Comments
 (0)