@@ -247,3 +247,89 @@ async fn test_schema_await_with_transient_failure() {
247247 Err ( err) => panic ! ( "{}" , err) ,
248248 }
249249}
250+
251+ // Test that produces SchemaAgreementError::RequiredHostAbsent to prove that
252+ // such condition is possible, and handled correctly.
253+ #[ tokio:: test]
254+ #[ cfg_attr( scylla_cloud_tests, ignore) ]
255+ async fn test_schema_await_required_host_absent ( ) {
256+ setup_tracing ( ) ;
257+
258+ let res = test_with_3_node_cluster (
259+ ShardAwareness :: QueryNode ,
260+ |proxy_uris, translation_map, mut running_proxy| async move {
261+ // DB preparation phase
262+ let session: Session = SessionBuilder :: new ( )
263+ . known_node ( proxy_uris[ 0 ] . as_str ( ) )
264+ . address_translator ( Arc :: new ( translation_map. clone ( ) ) )
265+ // Needed to have pools filled immediately after session creation.
266+ . pool_size ( PoolSize :: PerHost ( 1 . try_into ( ) . unwrap ( ) ) )
267+ // Schema agreement will only return error after timeout,
268+ // so without this line the test would take over 60s.
269+ . schema_agreement_timeout ( Duration :: from_secs ( 1 ) )
270+ . schema_agreement_interval ( Duration :: from_millis ( 50 ) )
271+ . build ( )
272+ . await
273+ . unwrap ( ) ;
274+
275+ let host_ids = calculate_proxy_host_ids ( & proxy_uris, & translation_map, & session) ;
276+
277+ let coordinator = NodeIdentifier :: HostId ( host_ids[ 1 ] ) ;
278+
279+ running_proxy. running_nodes [ 1 ] . change_request_rules ( Some ( vec ! [
280+ // This prevents opening new connections to the node
281+ RequestRule (
282+ Condition :: RequestOpcode ( RequestOpcode :: Startup ) ,
283+ RequestReaction :: drop_connection( ) ,
284+ ) ,
285+ // This prevents schema agreement check on this node, and closes connection.
286+ // After some attempts, no connections will be left.
287+ RequestRule (
288+ Condition :: not( Condition :: ConnectionRegisteredAnyEvent )
289+ . and( Condition :: RequestOpcode ( RequestOpcode :: Query ) )
290+ . and( Condition :: BodyContainsCaseSensitive ( Box :: new(
291+ * b"system.local" ,
292+ ) ) ) ,
293+ RequestReaction :: drop_connection( ) ,
294+ ) ,
295+ ] ) ) ;
296+
297+ let ks = unique_keyspace_name ( ) ;
298+ let mut request = Statement :: new ( format ! (
299+ "CREATE KEYSPACE {ks} WITH
300+ REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
301+ ) ) ;
302+ request. set_load_balancing_policy ( Some ( SingleTargetLoadBalancingPolicy :: new (
303+ coordinator,
304+ None ,
305+ ) ) ) ;
306+
307+ let result = session. query_unpaged ( request, & [ ] ) . await ;
308+ let Err ( ExecutionError :: SchemaAgreementError (
309+ SchemaAgreementError :: RequiredHostAbsent ( host) ,
310+ ) ) = result
311+ else {
312+ panic ! ( "Unexpected error type: {:?}" , result) ;
313+ } ;
314+
315+ assert_eq ! ( host, host_ids[ 1 ] ) ;
316+
317+ // Cleanup
318+ running_proxy. running_nodes [ 1 ] . change_request_rules ( Some ( vec ! [ ] ) ) ;
319+ session. await_schema_agreement ( ) . await . unwrap ( ) ;
320+ session
321+ . query_unpaged ( format ! ( "DROP KEYSPACE {ks}" ) , & [ ] )
322+ . await
323+ . unwrap ( ) ;
324+
325+ running_proxy
326+ } ,
327+ )
328+ . await ;
329+
330+ match res {
331+ Ok ( ( ) ) => ( ) ,
332+ Err ( ProxyError :: Worker ( WorkerError :: DriverDisconnected ( _) ) ) => ( ) ,
333+ Err ( err) => panic ! ( "{}" , err) ,
334+ }
335+ }
0 commit comments