Skip to content

Commit ec03223

Browse files
committed
proxy: pass CompressionReader to frame reading fns
CompressionReader is now passed down to functions that write frames. In the next commits, it will be actually used to compress frames if applicable.
1 parent 74f3ef8 commit ec03223

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

scylla-proxy/src/frame.rs

+5
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ impl RequestFrame {
6262
pub(crate) async fn write(
6363
&self,
6464
writer: &mut (impl AsyncWrite + Unpin),
65+
compression: &CompressionReader,
6566
) -> Result<(), tokio::io::Error> {
6667
write_frame(
6768
self.params,
6869
FrameOpcode::Request(self.opcode),
6970
&self.body,
7071
writer,
72+
compression,
7173
)
7274
.await
7375
}
@@ -136,12 +138,14 @@ impl ResponseFrame {
136138
pub(crate) async fn write(
137139
&self,
138140
writer: &mut (impl AsyncWrite + Unpin),
141+
compression: &CompressionReader,
139142
) -> Result<(), tokio::io::Error> {
140143
write_frame(
141144
self.params,
142145
FrameOpcode::Response(self.opcode),
143146
&self.body,
144147
writer,
148+
compression,
145149
)
146150
.await
147151
}
@@ -235,6 +239,7 @@ pub(crate) async fn write_frame(
235239
opcode: FrameOpcode,
236240
body: &[u8],
237241
writer: &mut (impl AsyncWrite + Unpin),
242+
compression: &CompressionReader,
238243
) -> Result<(), tokio::io::Error> {
239244
let mut header = [0; HEADER_SIZE];
240245

scylla-proxy/src/proxy.rs

+23-18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::frame::{
55
};
66
use crate::{RequestOpcode, TargetShard};
77
use bytes::Bytes;
8+
use compression::no_compression;
89
use scylla_cql::frame::types::read_string_multimap;
910
use std::collections::HashMap;
1011
use std::fmt::Display;
@@ -815,6 +816,7 @@ impl Doorkeeper {
815816
FrameOpcode::Request(RequestOpcode::Options),
816817
&Bytes::new(),
817818
connection,
819+
&no_compression()
818820
)
819821
.await
820822
.map_err(DoorkeeperError::ObtainingShardNumber)?;
@@ -1123,7 +1125,7 @@ impl ProxyWorker {
11231125
driver_addr,
11241126
&response.opcode
11251127
);
1126-
if response.write(&mut write_half).await.is_err() {
1128+
if response.write(&mut write_half, &compression).await.is_err() {
11271129
if terminate_notifier.try_recv().is_err()
11281130
&& connection_close_notifier.try_recv().is_err()
11291131
{
@@ -1172,7 +1174,7 @@ impl ProxyWorker {
11721174
&request.opcode
11731175
);
11741176

1175-
if request.write(&mut write_half).await.is_err() {
1177+
if request.write(&mut write_half, &compression).await.is_err() {
11761178
if terminate_notifier.try_recv().is_err()
11771179
&& connection_close_notifier.try_recv().is_err()
11781180
{
@@ -1486,6 +1488,7 @@ mod tests {
14861488
FrameOpcode::Response(ResponseOpcode::Supported),
14871489
&body,
14881490
conn,
1491+
&no_compression(),
14891492
)
14901493
.await
14911494
.unwrap();
@@ -1552,7 +1555,7 @@ mod tests {
15521555
let send_frame_to_shard = async {
15531556
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
15541557

1555-
write_frame(params, opcode, &body, &mut conn).await.unwrap();
1558+
write_frame(params, opcode, &body, &mut conn, &no_compression()).await.unwrap();
15561559
conn
15571560
};
15581561

@@ -1787,13 +1790,13 @@ mod tests {
17871790
let send_frame_to_shard = async {
17881791
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
17891792

1790-
write_frame(params1, opcode1, &body1, &mut conn)
1793+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
17911794
.await
17921795
.unwrap();
1793-
write_frame(params2, opcode2, &body2, &mut conn)
1796+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
17941797
.await
17951798
.unwrap();
1796-
write_frame(params3, opcode3, &body3, &mut conn)
1799+
write_frame(params3, opcode3, &body3, &mut conn, &no_compression())
17971800
.await
17981801
.unwrap();
17991802

@@ -1889,7 +1892,7 @@ mod tests {
18891892
body: &Bytes,
18901893
) -> Result<RequestFrame, ReadFrameError> {
18911894
let (send_res, recv_res) = join(
1892-
write_frame(params, opcode, &body.clone(), driver),
1895+
write_frame(params, opcode, &body.clone(), driver, &no_compression()),
18931896
read_request_frame(node, &no_compression()),
18941897
)
18951898
.await;
@@ -2004,7 +2007,7 @@ mod tests {
20042007
body: &Bytes,
20052008
) -> Result<RequestFrame, ReadFrameError> {
20062009
let (send_res, recv_res) = join(
2007-
write_frame(params, opcode, &body.clone(), driver),
2010+
write_frame(params, opcode, &body.clone(), driver, &no_compression()),
20082011
read_request_frame(node, &no_compression()),
20092012
)
20102013
.await;
@@ -2081,15 +2084,15 @@ mod tests {
20812084

20822085
let send_frame_to_shard = async {
20832086
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
2084-
write_frame(params, request_opcode, &body, &mut conn)
2087+
write_frame(params, request_opcode, &body, &mut conn, &no_compression())
20852088
.await
20862089
.unwrap();
20872090
conn
20882091
};
20892092

20902093
let mock_node_action = async {
20912094
let (mut conn, _) = mock_node_listener.accept().await.unwrap();
2092-
write_frame(params.for_response(), response_opcode, &body, &mut conn)
2095+
write_frame(params.for_response(), response_opcode, &body, &mut conn, &no_compression())
20932096
.await
20942097
.unwrap();
20952098
conn
@@ -2211,10 +2214,10 @@ mod tests {
22112214
let send_frame_to_shard = async {
22122215
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
22132216

2214-
write_frame(params1, opcode1, &body1, &mut conn)
2217+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
22152218
.await
22162219
.unwrap();
2217-
write_frame(params2, opcode2, &body2, &mut conn)
2220+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
22182221
.await
22192222
.unwrap();
22202223
conn
@@ -2262,7 +2265,7 @@ mod tests {
22622265

22632266
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
22642267

2265-
write_frame(params, opcode, &body, &mut conn).await.unwrap();
2268+
write_frame(params, opcode, &body, &mut conn, &no_compression()).await.unwrap();
22662269
// We assert that after sufficiently long time, no error happens inside proxy.
22672270
tokio::time::sleep(Duration::from_millis(3)).await;
22682271
running_proxy.finish().await.unwrap();
@@ -2340,13 +2343,13 @@ mod tests {
23402343

23412344
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
23422345

2343-
write_frame(params1, opcode1, &body1, &mut conn)
2346+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
23442347
.await
23452348
.unwrap();
2346-
write_frame(params2, opcode2, &body2, &mut conn)
2349+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
23472350
.await
23482351
.unwrap();
2349-
write_frame(params3, opcode3, &body3, &mut conn)
2352+
write_frame(params3, opcode3, &body3, &mut conn, &no_compression())
23502353
.await
23512354
.unwrap();
23522355

@@ -2453,7 +2456,7 @@ mod tests {
24532456
let socket = bind_socket_for_shard(shards_count, driver_shard).await;
24542457
let mut conn = socket.connect(node_proxy_addr).await.unwrap();
24552458

2456-
write_frame(params, request_opcode, body_ref, &mut conn)
2459+
write_frame(params, request_opcode, body_ref, &mut conn, &no_compression())
24572460
.await
24582461
.unwrap();
24592462
conn
@@ -2473,7 +2476,7 @@ mod tests {
24732476
&no_compression(),
24742477
)
24752478
.await;
2476-
write_frame(params.for_response(), response_opcode, body_ref, &mut conn)
2479+
write_frame(params.for_response(), response_opcode, body_ref, &mut conn, &no_compression())
24772480
.await
24782481
.unwrap();
24792482
conn
@@ -2583,6 +2586,7 @@ mod tests {
25832586
FrameOpcode::Request(req_opcode),
25842587
(body_base.to_string() + "|request|").as_bytes(),
25852588
client_socket_ref,
2589+
&no_compression()
25862590
)
25872591
.await
25882592
.unwrap();
@@ -2598,6 +2602,7 @@ mod tests {
25982602
FrameOpcode::Response(resp_opcode),
25992603
(body_base.to_string() + "|response|").as_bytes(),
26002604
server_socket_ref,
2605+
&no_compression()
26012606
)
26022607
.await
26032608
.unwrap();

0 commit comments

Comments
 (0)