@@ -1007,6 +1007,12 @@ mod compression {
1007
1007
pub ( crate ) fn no_compression ( ) -> CompressionReader {
1008
1008
mock_compression_reader ( None )
1009
1009
}
1010
+
1011
+ // Compression explicitly turned on.
1012
+ #[ cfg( test) ] // Currently only used for tests.
1013
+ pub ( crate ) fn with_compression ( compression : Compression ) -> CompressionReader {
1014
+ mock_compression_reader ( Some ( compression) )
1015
+ }
1010
1016
}
1011
1017
pub ( crate ) use compression:: { CompressionReader , CompressionWriter } ;
1012
1018
@@ -1475,14 +1481,18 @@ mod tests {
1475
1481
use super :: * ;
1476
1482
use crate :: errors:: ReadFrameError ;
1477
1483
use crate :: frame:: { read_frame, read_request_frame, FrameType } ;
1484
+ use crate :: proxy:: compression:: with_compression;
1478
1485
use crate :: {
1479
1486
setup_tracing, Condition , Reaction as _, RequestReaction , ResponseOpcode , ResponseReaction ,
1480
1487
} ;
1481
1488
use assert_matches:: assert_matches;
1482
1489
use bytes:: { BufMut , BytesMut } ;
1483
1490
use futures:: future:: { join, join3} ;
1484
1491
use rand:: RngCore ;
1492
+ use scylla_cql:: frame:: request:: options:: COMPRESSION ;
1493
+ use scylla_cql:: frame:: request:: { SerializableRequest as _, Startup } ;
1485
1494
use scylla_cql:: frame:: types:: write_string_multimap;
1495
+ use scylla_cql:: frame:: { Compression , FLAG_COMPRESSION } ;
1486
1496
use std:: collections:: HashMap ;
1487
1497
use std:: mem;
1488
1498
use std:: str:: FromStr ;
@@ -2715,4 +2725,193 @@ mod tests {
2715
2725
let _ = request_feedback_rx. try_recv ( ) . unwrap_err ( ) ;
2716
2726
let _ = response_feedback_rx. try_recv ( ) . unwrap_err ( ) ;
2717
2727
}
2728
+
2729
+ #[ tokio:: test]
2730
+ #[ ntest:: timeout( 1000 ) ]
2731
+ async fn proxy_compresses_and_decompresses_frames_iff_compression_negociated ( ) {
2732
+ setup_tracing ( ) ;
2733
+ let node1_real_addr = next_local_address_with_port ( 9876 ) ;
2734
+ let node1_proxy_addr = next_local_address_with_port ( 9876 ) ;
2735
+
2736
+ let ( request_feedback_tx, mut request_feedback_rx) = mpsc:: unbounded_channel ( ) ;
2737
+ let ( response_feedback_tx, mut response_feedback_rx) = mpsc:: unbounded_channel ( ) ;
2738
+ let proxy = Proxy :: builder ( )
2739
+ . with_node (
2740
+ Node :: builder ( )
2741
+ . real_address ( node1_real_addr)
2742
+ . proxy_address ( node1_proxy_addr)
2743
+ . shard_awareness ( ShardAwareness :: Unaware )
2744
+ . request_rules ( vec ! [ RequestRule (
2745
+ Condition :: True ,
2746
+ RequestReaction :: noop( ) . with_feedback_when_performed( request_feedback_tx) ,
2747
+ ) ] )
2748
+ . response_rules ( vec ! [ ResponseRule (
2749
+ Condition :: True ,
2750
+ ResponseReaction :: noop( ) . with_feedback_when_performed( response_feedback_tx) ,
2751
+ ) ] )
2752
+ . build ( ) ,
2753
+ )
2754
+ . build ( ) ;
2755
+ let running_proxy = proxy. run ( ) . await . unwrap ( ) ;
2756
+
2757
+ let mock_node_listener = TcpListener :: bind ( node1_real_addr) . await . unwrap ( ) ;
2758
+
2759
+ const PARAMS_REQUEST_NO_COMPRESSION : FrameParams = FrameParams {
2760
+ flags : 0 ,
2761
+ version : 0x04 ,
2762
+ stream : 0 ,
2763
+ } ;
2764
+ const PARAMS_REQUEST_COMPRESSION : FrameParams = FrameParams {
2765
+ flags : FLAG_COMPRESSION ,
2766
+ ..PARAMS_REQUEST_NO_COMPRESSION
2767
+ } ;
2768
+ const PARAMS_RESPONSE_NO_COMPRESSION : FrameParams =
2769
+ PARAMS_REQUEST_NO_COMPRESSION . for_response ( ) ;
2770
+ const PARAMS_RESPONSE_COMPRESSION : FrameParams =
2771
+ PARAMS_REQUEST_NO_COMPRESSION . for_response ( ) ;
2772
+
2773
+ let make_driver_conn = async { TcpStream :: connect ( node1_proxy_addr) . await . unwrap ( ) } ;
2774
+ let make_node_conn = async { mock_node_listener. accept ( ) . await . unwrap ( ) } ;
2775
+
2776
+ let ( mut driver_conn, ( mut node_conn, _) ) = join ( make_driver_conn, make_node_conn) . await ;
2777
+
2778
+ /* Outline of the test:
2779
+ * 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
2780
+ * and "node" receives the uncompressed frame.
2781
+ * 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
2782
+ * and "driver" receives the uncompressed frame.
2783
+ * 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
2784
+ * and "node" receives the uncompressed frame.
2785
+ * 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
2786
+ * and "node" receives the compressed frame.
2787
+ * 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
2788
+ * and "driver" receives the compressed frame.
2789
+ */
2790
+
2791
+ // 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
2792
+ // and "node" receives the uncompressed frame.
2793
+ {
2794
+ let sent_frame = RequestFrame {
2795
+ params : PARAMS_REQUEST_NO_COMPRESSION ,
2796
+ opcode : RequestOpcode :: Query ,
2797
+ body : random_body ( ) ,
2798
+ } ;
2799
+
2800
+ sent_frame
2801
+ . write ( & mut driver_conn, & no_compression ( ) )
2802
+ . await
2803
+ . unwrap ( ) ;
2804
+
2805
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2806
+ assert_eq ! ( captured_frame, sent_frame) ;
2807
+
2808
+ let received_frame = read_request_frame ( & mut node_conn, & no_compression ( ) )
2809
+ . await
2810
+ . unwrap ( ) ;
2811
+ assert_eq ! ( received_frame, sent_frame) ;
2812
+ }
2813
+
2814
+ // 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
2815
+ // and "driver" receives the uncompressed frame.
2816
+ {
2817
+ let sent_frame = ResponseFrame {
2818
+ params : PARAMS_RESPONSE_NO_COMPRESSION ,
2819
+ opcode : ResponseOpcode :: Result ,
2820
+ body : random_body ( ) ,
2821
+ } ;
2822
+
2823
+ sent_frame
2824
+ . write ( & mut node_conn, & no_compression ( ) )
2825
+ . await
2826
+ . unwrap ( ) ;
2827
+
2828
+ let ( captured_frame, _) = response_feedback_rx. recv ( ) . await . unwrap ( ) ;
2829
+ assert_eq ! ( captured_frame, sent_frame) ;
2830
+
2831
+ let received_frame = read_response_frame ( & mut driver_conn, & no_compression ( ) )
2832
+ . await
2833
+ . unwrap ( ) ;
2834
+ assert_eq ! ( received_frame, sent_frame) ;
2835
+ }
2836
+
2837
+ // 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
2838
+ // and "node" receives the uncompressed frame.
2839
+ {
2840
+ let startup_body = Startup {
2841
+ options : std:: iter:: once ( ( COMPRESSION . into ( ) , Compression :: Lz4 . as_str ( ) . into ( ) ) )
2842
+ . collect ( ) ,
2843
+ }
2844
+ . to_bytes ( )
2845
+ . unwrap ( ) ;
2846
+
2847
+ let sent_frame = RequestFrame {
2848
+ params : PARAMS_REQUEST_NO_COMPRESSION ,
2849
+ opcode : RequestOpcode :: Startup ,
2850
+ body : startup_body,
2851
+ } ;
2852
+
2853
+ sent_frame
2854
+ . write ( & mut driver_conn, & no_compression ( ) )
2855
+ . await
2856
+ . unwrap ( ) ;
2857
+
2858
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2859
+ assert_eq ! ( captured_frame, sent_frame) ;
2860
+
2861
+ let received_frame = read_request_frame ( & mut node_conn, & no_compression ( ) )
2862
+ . await
2863
+ . unwrap ( ) ;
2864
+ assert_eq ! ( received_frame, sent_frame) ;
2865
+ }
2866
+
2867
+ // 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
2868
+ // and "node" receives the compressed frame.
2869
+ {
2870
+ let sent_frame = RequestFrame {
2871
+ params : PARAMS_REQUEST_COMPRESSION ,
2872
+ opcode : RequestOpcode :: Query ,
2873
+ body : random_body ( ) ,
2874
+ } ;
2875
+
2876
+ sent_frame
2877
+ . write ( & mut driver_conn, & with_compression ( Compression :: Lz4 ) )
2878
+ . await
2879
+ . unwrap ( ) ;
2880
+
2881
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2882
+ assert_eq ! ( captured_frame, sent_frame) ;
2883
+
2884
+ let received_frame =
2885
+ read_request_frame ( & mut node_conn, & with_compression ( Compression :: Lz4 ) )
2886
+ . await
2887
+ . unwrap ( ) ;
2888
+ assert_eq ! ( received_frame, sent_frame) ;
2889
+ }
2890
+
2891
+ // 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
2892
+ // and "driver" receives the compressed frame.
2893
+ {
2894
+ let sent_frame = ResponseFrame {
2895
+ params : PARAMS_RESPONSE_COMPRESSION ,
2896
+ opcode : ResponseOpcode :: Result ,
2897
+ body : random_body ( ) ,
2898
+ } ;
2899
+
2900
+ sent_frame
2901
+ . write ( & mut node_conn, & with_compression ( Compression :: Lz4 ) )
2902
+ . await
2903
+ . unwrap ( ) ;
2904
+
2905
+ let ( captured_frame, _) = response_feedback_rx. recv ( ) . await . unwrap ( ) ;
2906
+ assert_eq ! ( captured_frame, sent_frame) ;
2907
+
2908
+ let received_frame =
2909
+ read_response_frame ( & mut driver_conn, & with_compression ( Compression :: Lz4 ) )
2910
+ . await
2911
+ . unwrap ( ) ;
2912
+ assert_eq ! ( received_frame, sent_frame) ;
2913
+ }
2914
+
2915
+ running_proxy. finish ( ) . await . unwrap ( ) ;
2916
+ }
2718
2917
}
0 commit comments