@@ -1007,6 +1007,12 @@ mod compression {
1007
1007
pub ( crate ) fn no_compression ( ) -> CompressionReader {
1008
1008
mock_compression_reader ( None )
1009
1009
}
1010
+
1011
+ // Compression explicitly turned on.
1012
+ #[ cfg( test) ] // Currently only used for tests.
1013
+ pub ( crate ) fn with_compression ( compression : Compression ) -> CompressionReader {
1014
+ mock_compression_reader ( Some ( compression) )
1015
+ }
1010
1016
}
1011
1017
pub ( crate ) use compression:: { CompressionReader , CompressionWriter } ;
1012
1018
@@ -1476,14 +1482,18 @@ mod tests {
1476
1482
use super :: * ;
1477
1483
use crate :: errors:: ReadFrameError ;
1478
1484
use crate :: frame:: { read_frame, read_request_frame, FrameType } ;
1485
+ use crate :: proxy:: compression:: with_compression;
1479
1486
use crate :: {
1480
1487
setup_tracing, Condition , Reaction as _, RequestReaction , ResponseOpcode , ResponseReaction ,
1481
1488
} ;
1482
1489
use assert_matches:: assert_matches;
1483
1490
use bytes:: { BufMut , BytesMut } ;
1484
1491
use futures:: future:: { join, join3} ;
1485
1492
use rand:: RngCore ;
1493
+ use scylla_cql:: frame:: request:: options:: COMPRESSION ;
1494
+ use scylla_cql:: frame:: request:: { SerializableRequest as _, Startup } ;
1486
1495
use scylla_cql:: frame:: types:: write_string_multimap;
1496
+ use scylla_cql:: frame:: { Compression , FLAG_COMPRESSION } ;
1487
1497
use std:: collections:: HashMap ;
1488
1498
use std:: mem;
1489
1499
use std:: str:: FromStr ;
@@ -2716,4 +2726,193 @@ mod tests {
2716
2726
let _ = request_feedback_rx. try_recv ( ) . unwrap_err ( ) ;
2717
2727
let _ = response_feedback_rx. try_recv ( ) . unwrap_err ( ) ;
2718
2728
}
2729
+
2730
+ #[ tokio:: test]
2731
+ #[ ntest:: timeout( 1000 ) ]
2732
+ async fn proxy_compresses_and_decompresses_frames_iff_compression_negociated ( ) {
2733
+ setup_tracing ( ) ;
2734
+ let node1_real_addr = next_local_address_with_port ( 9876 ) ;
2735
+ let node1_proxy_addr = next_local_address_with_port ( 9876 ) ;
2736
+
2737
+ let ( request_feedback_tx, mut request_feedback_rx) = mpsc:: unbounded_channel ( ) ;
2738
+ let ( response_feedback_tx, mut response_feedback_rx) = mpsc:: unbounded_channel ( ) ;
2739
+ let proxy = Proxy :: builder ( )
2740
+ . with_node (
2741
+ Node :: builder ( )
2742
+ . real_address ( node1_real_addr)
2743
+ . proxy_address ( node1_proxy_addr)
2744
+ . shard_awareness ( ShardAwareness :: Unaware )
2745
+ . request_rules ( vec ! [ RequestRule (
2746
+ Condition :: True ,
2747
+ RequestReaction :: noop( ) . with_feedback_when_performed( request_feedback_tx) ,
2748
+ ) ] )
2749
+ . response_rules ( vec ! [ ResponseRule (
2750
+ Condition :: True ,
2751
+ ResponseReaction :: noop( ) . with_feedback_when_performed( response_feedback_tx) ,
2752
+ ) ] )
2753
+ . build ( ) ,
2754
+ )
2755
+ . build ( ) ;
2756
+ let running_proxy = proxy. run ( ) . await . unwrap ( ) ;
2757
+
2758
+ let mock_node_listener = TcpListener :: bind ( node1_real_addr) . await . unwrap ( ) ;
2759
+
2760
+ const PARAMS_REQUEST_NO_COMPRESSION : FrameParams = FrameParams {
2761
+ flags : 0 ,
2762
+ version : 0x04 ,
2763
+ stream : 0 ,
2764
+ } ;
2765
+ const PARAMS_REQUEST_COMPRESSION : FrameParams = FrameParams {
2766
+ flags : FLAG_COMPRESSION ,
2767
+ ..PARAMS_REQUEST_NO_COMPRESSION
2768
+ } ;
2769
+ const PARAMS_RESPONSE_NO_COMPRESSION : FrameParams =
2770
+ PARAMS_REQUEST_NO_COMPRESSION . for_response ( ) ;
2771
+ const PARAMS_RESPONSE_COMPRESSION : FrameParams =
2772
+ PARAMS_REQUEST_NO_COMPRESSION . for_response ( ) ;
2773
+
2774
+ let make_driver_conn = async { TcpStream :: connect ( node1_proxy_addr) . await . unwrap ( ) } ;
2775
+ let make_node_conn = async { mock_node_listener. accept ( ) . await . unwrap ( ) } ;
2776
+
2777
+ let ( mut driver_conn, ( mut node_conn, _) ) = join ( make_driver_conn, make_node_conn) . await ;
2778
+
2779
+ /* Outline of the test:
2780
+ * 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
2781
+ * and "node" receives the uncompressed frame.
2782
+ * 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
2783
+ * and "driver" receives the uncompressed frame.
2784
+ * 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
2785
+ * and "node" receives the uncompressed frame.
2786
+ * 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
2787
+ * and "node" receives the compressed frame.
2788
+ * 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
2789
+ * and "driver" receives the compressed frame.
2790
+ */
2791
+
2792
+ // 1. "driver" sends an, uncompressed, e.g., QUERY frame, feedback returns its uncompressed body,
2793
+ // and "node" receives the uncompressed frame.
2794
+ {
2795
+ let sent_frame = RequestFrame {
2796
+ params : PARAMS_REQUEST_NO_COMPRESSION ,
2797
+ opcode : RequestOpcode :: Query ,
2798
+ body : random_body ( ) ,
2799
+ } ;
2800
+
2801
+ sent_frame
2802
+ . write ( & mut driver_conn, & no_compression ( ) )
2803
+ . await
2804
+ . unwrap ( ) ;
2805
+
2806
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2807
+ assert_eq ! ( captured_frame, sent_frame) ;
2808
+
2809
+ let received_frame = read_request_frame ( & mut node_conn, & no_compression ( ) )
2810
+ . await
2811
+ . unwrap ( ) ;
2812
+ assert_eq ! ( received_frame, sent_frame) ;
2813
+ }
2814
+
2815
+ // 2. "node" responds with an uncompressed RESULT frame, feedback returns its uncompressed body,
2816
+ // and "driver" receives the uncompressed frame.
2817
+ {
2818
+ let sent_frame = ResponseFrame {
2819
+ params : PARAMS_RESPONSE_NO_COMPRESSION ,
2820
+ opcode : ResponseOpcode :: Result ,
2821
+ body : random_body ( ) ,
2822
+ } ;
2823
+
2824
+ sent_frame
2825
+ . write ( & mut node_conn, & no_compression ( ) )
2826
+ . await
2827
+ . unwrap ( ) ;
2828
+
2829
+ let ( captured_frame, _) = response_feedback_rx. recv ( ) . await . unwrap ( ) ;
2830
+ assert_eq ! ( captured_frame, sent_frame) ;
2831
+
2832
+ let received_frame = read_response_frame ( & mut driver_conn, & no_compression ( ) )
2833
+ . await
2834
+ . unwrap ( ) ;
2835
+ assert_eq ! ( received_frame, sent_frame) ;
2836
+ }
2837
+
2838
+ // 3. "driver" sends an uncompressed STARTUP frame, feedback returns its uncompressed body,
2839
+ // and "node" receives the uncompressed frame.
2840
+ {
2841
+ let startup_body = Startup {
2842
+ options : std:: iter:: once ( ( COMPRESSION . into ( ) , Compression :: Lz4 . as_str ( ) . into ( ) ) )
2843
+ . collect ( ) ,
2844
+ }
2845
+ . to_bytes ( )
2846
+ . unwrap ( ) ;
2847
+
2848
+ let sent_frame = RequestFrame {
2849
+ params : PARAMS_REQUEST_NO_COMPRESSION ,
2850
+ opcode : RequestOpcode :: Startup ,
2851
+ body : startup_body,
2852
+ } ;
2853
+
2854
+ sent_frame
2855
+ . write ( & mut driver_conn, & no_compression ( ) )
2856
+ . await
2857
+ . unwrap ( ) ;
2858
+
2859
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2860
+ assert_eq ! ( captured_frame, sent_frame) ;
2861
+
2862
+ let received_frame = read_request_frame ( & mut node_conn, & no_compression ( ) )
2863
+ . await
2864
+ . unwrap ( ) ;
2865
+ assert_eq ! ( received_frame, sent_frame) ;
2866
+ }
2867
+
2868
+ // 4. "driver" sends a compressed, e.g., QUERY frame, feedback returns its uncompressed body,
2869
+ // and "node" receives the compressed frame.
2870
+ {
2871
+ let sent_frame = RequestFrame {
2872
+ params : PARAMS_REQUEST_COMPRESSION ,
2873
+ opcode : RequestOpcode :: Query ,
2874
+ body : random_body ( ) ,
2875
+ } ;
2876
+
2877
+ sent_frame
2878
+ . write ( & mut driver_conn, & with_compression ( Compression :: Lz4 ) )
2879
+ . await
2880
+ . unwrap ( ) ;
2881
+
2882
+ let ( captured_frame, _) = request_feedback_rx. recv ( ) . await . unwrap ( ) ;
2883
+ assert_eq ! ( captured_frame, sent_frame) ;
2884
+
2885
+ let received_frame =
2886
+ read_request_frame ( & mut node_conn, & with_compression ( Compression :: Lz4 ) )
2887
+ . await
2888
+ . unwrap ( ) ;
2889
+ assert_eq ! ( received_frame, sent_frame) ;
2890
+ }
2891
+
2892
+ // 5. "node" responds with a compressed RESULT frame, feedback returns its uncompressed body,
2893
+ // and "driver" receives the compressed frame.
2894
+ {
2895
+ let sent_frame = ResponseFrame {
2896
+ params : PARAMS_RESPONSE_COMPRESSION ,
2897
+ opcode : ResponseOpcode :: Result ,
2898
+ body : random_body ( ) ,
2899
+ } ;
2900
+
2901
+ sent_frame
2902
+ . write ( & mut node_conn, & with_compression ( Compression :: Lz4 ) )
2903
+ . await
2904
+ . unwrap ( ) ;
2905
+
2906
+ let ( captured_frame, _) = response_feedback_rx. recv ( ) . await . unwrap ( ) ;
2907
+ assert_eq ! ( captured_frame, sent_frame) ;
2908
+
2909
+ let received_frame =
2910
+ read_response_frame ( & mut driver_conn, & with_compression ( Compression :: Lz4 ) )
2911
+ . await
2912
+ . unwrap ( ) ;
2913
+ assert_eq ! ( received_frame, sent_frame) ;
2914
+ }
2915
+
2916
+ running_proxy. finish ( ) . await . unwrap ( ) ;
2917
+ }
2719
2918
}
0 commit comments