@@ -255,3 +255,89 @@ async fn test_schema_await_with_transient_failure() {
255255 Err ( err) => panic ! ( "{}" , err) ,
256256 }
257257}
258+
259+ // Test that produces SchemaAgreementError::RequiredHostAbsent to prove that
260+ // such condition is possible, and handled correctly.
261+ #[ tokio:: test]
262+ #[ cfg_attr( scylla_cloud_tests, ignore) ]
263+ async fn test_schema_await_required_host_absent ( ) {
264+ setup_tracing ( ) ;
265+
266+ let res = test_with_3_node_cluster (
267+ ShardAwareness :: QueryNode ,
268+ |proxy_uris, translation_map, mut running_proxy| async move {
269+ // DB preparation phase
270+ let session: Session = SessionBuilder :: new ( )
271+ . known_node ( proxy_uris[ 0 ] . as_str ( ) )
272+ . address_translator ( Arc :: new ( translation_map. clone ( ) ) )
273+ // Needed to have pools filled immediately after session creation.
274+ . pool_size ( PoolSize :: PerHost ( 1 . try_into ( ) . unwrap ( ) ) )
275+ // Schema agreement will only return error after timeout,
276+ // so without this line the test would take over 60s.
277+ . schema_agreement_timeout ( Duration :: from_secs ( 1 ) )
278+ . schema_agreement_interval ( Duration :: from_millis ( 50 ) )
279+ . build ( )
280+ . await
281+ . unwrap ( ) ;
282+
283+ let host_ids = calculate_proxy_host_ids ( & proxy_uris, & translation_map, & session) ;
284+
285+ let coordinator = NodeIdentifier :: HostId ( host_ids[ 1 ] ) ;
286+
287+ running_proxy. running_nodes [ 1 ] . change_request_rules ( Some ( vec ! [
288+ // This prevents opening new connections to the node
289+ RequestRule (
290+ Condition :: RequestOpcode ( RequestOpcode :: Startup ) ,
291+ RequestReaction :: drop_connection( ) ,
292+ ) ,
293+ // This prevents schema agreement check on this node, and closes connection.
294+ // After some attempts, no connections will be left.
295+ RequestRule (
296+ Condition :: not( Condition :: ConnectionRegisteredAnyEvent )
297+ . and( Condition :: RequestOpcode ( RequestOpcode :: Query ) )
298+ . and( Condition :: BodyContainsCaseSensitive ( Box :: new(
299+ * b"system.local" ,
300+ ) ) ) ,
301+ RequestReaction :: drop_connection( ) ,
302+ ) ,
303+ ] ) ) ;
304+
305+ let ks = unique_keyspace_name ( ) ;
306+ let mut request = Statement :: new ( format ! (
307+ "CREATE KEYSPACE {ks} WITH
308+ REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
309+ ) ) ;
310+ request. set_load_balancing_policy ( Some ( SingleTargetLoadBalancingPolicy :: new (
311+ coordinator,
312+ None ,
313+ ) ) ) ;
314+
315+ let result = session. query_unpaged ( request, & [ ] ) . await ;
316+ let Err ( ExecutionError :: SchemaAgreementError (
317+ SchemaAgreementError :: RequiredHostAbsent ( host) ,
318+ ) ) = result
319+ else {
320+ panic ! ( "Unexpected error type: {:?}" , result) ;
321+ } ;
322+
323+ assert_eq ! ( host, host_ids[ 1 ] ) ;
324+
325+ // Cleanup
326+ running_proxy. running_nodes [ 1 ] . change_request_rules ( Some ( vec ! [ ] ) ) ;
327+ session. await_schema_agreement ( ) . await . unwrap ( ) ;
328+ session
329+ . query_unpaged ( format ! ( "DROP KEYSPACE {ks}" ) , & [ ] )
330+ . await
331+ . unwrap ( ) ;
332+
333+ running_proxy
334+ } ,
335+ )
336+ . await ;
337+
338+ match res {
339+ Ok ( ( ) ) => ( ) ,
340+ Err ( ProxyError :: Worker ( WorkerError :: DriverDisconnected ( _) ) ) => ( ) ,
341+ Err ( err) => panic ! ( "{}" , err) ,
342+ }
343+ }
0 commit comments