Skip to content

Commit 4a1e59e

Browse files
committed
Add test for RequiredHostAbsent error
1 parent da49156 commit 4a1e59e

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

scylla/tests/integration/session/schema_agreement.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use assert_matches::assert_matches;
45
use scylla::client::PoolSize;
@@ -236,3 +237,86 @@ async fn test_schema_await_with_transient_failure() {
236237
Err(err) => panic!("{}", err),
237238
}
238239
}
240+
241+
// Test that produces SchemaAgreementError::RequiredHostAbsent to prove that
242+
// such condition is possible, and handled correctly.
243+
#[tokio::test]
244+
#[cfg_attr(scylla_cloud_tests, ignore)]
245+
async fn test_schema_await_required_host_absent() {
246+
setup_tracing();
247+
248+
let res = test_with_3_node_cluster(
249+
ShardAwareness::QueryNode,
250+
|proxy_uris, translation_map, mut running_proxy| async move {
251+
// DB preparation phase
252+
let session: Session = SessionBuilder::new()
253+
.known_node(proxy_uris[0].as_str())
254+
.address_translator(Arc::new(translation_map.clone()))
255+
// Needed to have pools filled immediately after session creation.
256+
.pool_size(PoolSize::PerHost(1.try_into().unwrap()))
257+
// Schema agreement will only return error after timeout,
258+
// so without this line the test would take over 60s.
259+
.schema_agreement_timeout(Duration::from_secs(1))
260+
.build()
261+
.await
262+
.unwrap();
263+
264+
let host_ids = calculate_proxy_host_ids(&proxy_uris, &translation_map, &session);
265+
266+
let coordinator = NodeIdentifier::HostId(host_ids[1]);
267+
268+
running_proxy.running_nodes[1].change_request_rules(Some(vec![
269+
// This prevents opening new connections to the node
270+
RequestRule(
271+
Condition::RequestOpcode(RequestOpcode::Startup),
272+
RequestReaction::drop_connection(),
273+
),
274+
// This prevents schema agreement check on this node, and closes connection.
275+
// After some attempts, no connections will be left.
276+
RequestRule(
277+
Condition::not(Condition::ConnectionRegisteredAnyEvent)
278+
.and(Condition::RequestOpcode(RequestOpcode::Query))
279+
.and(Condition::BodyContainsCaseSensitive(Box::new(
280+
*b"system.local",
281+
))),
282+
RequestReaction::drop_connection(),
283+
),
284+
]));
285+
286+
let ks = unique_keyspace_name();
287+
let mut request = Statement::new(format!(
288+
"CREATE KEYSPACE {ks} WITH
289+
REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
290+
));
291+
request.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new(
292+
coordinator,
293+
None,
294+
)));
295+
296+
let result = session.query_unpaged(request, &[]).await;
297+
let Err(ExecutionError::SchemaAgreementError(
298+
SchemaAgreementError::RequiredHostAbsent(_),
299+
)) = result
300+
else {
301+
panic!("Unexpected error type: {:?}", result);
302+
};
303+
304+
// Cleanup
305+
running_proxy.running_nodes[1].change_request_rules(Some(vec![]));
306+
session.await_schema_agreement().await.unwrap();
307+
session
308+
.query_unpaged(format!("DROP KEYSPACE {ks}"), &[])
309+
.await
310+
.unwrap();
311+
312+
running_proxy
313+
},
314+
)
315+
.await;
316+
317+
match res {
318+
Ok(()) => (),
319+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
320+
Err(err) => panic!("{}", err),
321+
}
322+
}

0 commit comments

Comments
 (0)