Skip to content

Commit 623b5f4

Browse files
committed
proxy: pass CompressionReader to frame reading fns
CompressionReader is now passed down to functions that write frames. In the next commits, it will be actually used to compress frames if applicable.
1 parent 09e4cfc commit 623b5f4

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

scylla-proxy/src/frame.rs

+5
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ impl RequestFrame {
6262
pub(crate) async fn write(
6363
&self,
6464
writer: &mut (impl AsyncWrite + Unpin),
65+
compression: &CompressionReader,
6566
) -> Result<(), tokio::io::Error> {
6667
write_frame(
6768
self.params,
6869
FrameOpcode::Request(self.opcode),
6970
&self.body,
7071
writer,
72+
compression,
7173
)
7274
.await
7375
}
@@ -136,12 +138,14 @@ impl ResponseFrame {
136138
pub(crate) async fn write(
137139
&self,
138140
writer: &mut (impl AsyncWrite + Unpin),
141+
compression: &CompressionReader,
139142
) -> Result<(), tokio::io::Error> {
140143
write_frame(
141144
self.params,
142145
FrameOpcode::Response(self.opcode),
143146
&self.body,
144147
writer,
148+
compression,
145149
)
146150
.await
147151
}
@@ -235,6 +239,7 @@ pub(crate) async fn write_frame(
235239
opcode: FrameOpcode,
236240
body: &[u8],
237241
writer: &mut (impl AsyncWrite + Unpin),
242+
compression: &CompressionReader,
238243
) -> Result<(), tokio::io::Error> {
239244
let mut header = [0; HEADER_SIZE];
240245

scylla-proxy/src/proxy.rs

+23-18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::frame::{
55
};
66
use crate::{RequestOpcode, TargetShard};
77
use bytes::Bytes;
8+
use compression::no_compression;
89
use scylla_cql::frame::types::read_string_multimap;
910
use std::collections::HashMap;
1011
use std::fmt::Display;
@@ -815,6 +816,7 @@ impl Doorkeeper {
815816
FrameOpcode::Request(RequestOpcode::Options),
816817
&Bytes::new(),
817818
connection,
819+
&no_compression()
818820
)
819821
.await
820822
.map_err(DoorkeeperError::ObtainingShardNumber)?;
@@ -1123,7 +1125,7 @@ impl ProxyWorker {
11231125
driver_addr,
11241126
&response.opcode
11251127
);
1126-
if response.write(&mut write_half).await.is_err() {
1128+
if response.write(&mut write_half, &compression).await.is_err() {
11271129
if terminate_notifier.try_recv().is_err()
11281130
&& connection_close_notifier.try_recv().is_err()
11291131
{
@@ -1172,7 +1174,7 @@ impl ProxyWorker {
11721174
&request.opcode
11731175
);
11741176

1175-
if request.write(&mut write_half).await.is_err() {
1177+
if request.write(&mut write_half, &compression).await.is_err() {
11761178
if terminate_notifier.try_recv().is_err()
11771179
&& connection_close_notifier.try_recv().is_err()
11781180
{
@@ -1487,6 +1489,7 @@ mod tests {
14871489
FrameOpcode::Response(ResponseOpcode::Supported),
14881490
&body,
14891491
conn,
1492+
&no_compression(),
14901493
)
14911494
.await
14921495
.unwrap();
@@ -1553,7 +1556,7 @@ mod tests {
15531556
let send_frame_to_shard = async {
15541557
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
15551558

1556-
write_frame(params, opcode, &body, &mut conn).await.unwrap();
1559+
write_frame(params, opcode, &body, &mut conn, &no_compression()).await.unwrap();
15571560
conn
15581561
};
15591562

@@ -1788,13 +1791,13 @@ mod tests {
17881791
let send_frame_to_shard = async {
17891792
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
17901793

1791-
write_frame(params1, opcode1, &body1, &mut conn)
1794+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
17921795
.await
17931796
.unwrap();
1794-
write_frame(params2, opcode2, &body2, &mut conn)
1797+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
17951798
.await
17961799
.unwrap();
1797-
write_frame(params3, opcode3, &body3, &mut conn)
1800+
write_frame(params3, opcode3, &body3, &mut conn, &no_compression())
17981801
.await
17991802
.unwrap();
18001803

@@ -1890,7 +1893,7 @@ mod tests {
18901893
body: &Bytes,
18911894
) -> Result<RequestFrame, ReadFrameError> {
18921895
let (send_res, recv_res) = join(
1893-
write_frame(params, opcode, &body.clone(), driver),
1896+
write_frame(params, opcode, &body.clone(), driver, &no_compression()),
18941897
read_request_frame(node, &no_compression()),
18951898
)
18961899
.await;
@@ -2005,7 +2008,7 @@ mod tests {
20052008
body: &Bytes,
20062009
) -> Result<RequestFrame, ReadFrameError> {
20072010
let (send_res, recv_res) = join(
2008-
write_frame(params, opcode, &body.clone(), driver),
2011+
write_frame(params, opcode, &body.clone(), driver, &no_compression()),
20092012
read_request_frame(node, &no_compression()),
20102013
)
20112014
.await;
@@ -2082,15 +2085,15 @@ mod tests {
20822085

20832086
let send_frame_to_shard = async {
20842087
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
2085-
write_frame(params, request_opcode, &body, &mut conn)
2088+
write_frame(params, request_opcode, &body, &mut conn, &no_compression())
20862089
.await
20872090
.unwrap();
20882091
conn
20892092
};
20902093

20912094
let mock_node_action = async {
20922095
let (mut conn, _) = mock_node_listener.accept().await.unwrap();
2093-
write_frame(params.for_response(), response_opcode, &body, &mut conn)
2096+
write_frame(params.for_response(), response_opcode, &body, &mut conn, &no_compression())
20942097
.await
20952098
.unwrap();
20962099
conn
@@ -2212,10 +2215,10 @@ mod tests {
22122215
let send_frame_to_shard = async {
22132216
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
22142217

2215-
write_frame(params1, opcode1, &body1, &mut conn)
2218+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
22162219
.await
22172220
.unwrap();
2218-
write_frame(params2, opcode2, &body2, &mut conn)
2221+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
22192222
.await
22202223
.unwrap();
22212224
conn
@@ -2263,7 +2266,7 @@ mod tests {
22632266

22642267
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
22652268

2266-
write_frame(params, opcode, &body, &mut conn).await.unwrap();
2269+
write_frame(params, opcode, &body, &mut conn, &no_compression()).await.unwrap();
22672270
// We assert that after sufficiently long time, no error happens inside proxy.
22682271
tokio::time::sleep(Duration::from_millis(3)).await;
22692272
running_proxy.finish().await.unwrap();
@@ -2341,13 +2344,13 @@ mod tests {
23412344

23422345
let mut conn = TcpStream::connect(node1_proxy_addr).await.unwrap();
23432346

2344-
write_frame(params1, opcode1, &body1, &mut conn)
2347+
write_frame(params1, opcode1, &body1, &mut conn, &no_compression())
23452348
.await
23462349
.unwrap();
2347-
write_frame(params2, opcode2, &body2, &mut conn)
2350+
write_frame(params2, opcode2, &body2, &mut conn, &no_compression())
23482351
.await
23492352
.unwrap();
2350-
write_frame(params3, opcode3, &body3, &mut conn)
2353+
write_frame(params3, opcode3, &body3, &mut conn, &no_compression())
23512354
.await
23522355
.unwrap();
23532356

@@ -2454,7 +2457,7 @@ mod tests {
24542457
let socket = bind_socket_for_shard(shards_count, driver_shard).await;
24552458
let mut conn = socket.connect(node_proxy_addr).await.unwrap();
24562459

2457-
write_frame(params, request_opcode, body_ref, &mut conn)
2460+
write_frame(params, request_opcode, body_ref, &mut conn, &no_compression())
24582461
.await
24592462
.unwrap();
24602463
conn
@@ -2474,7 +2477,7 @@ mod tests {
24742477
&no_compression(),
24752478
)
24762479
.await;
2477-
write_frame(params.for_response(), response_opcode, body_ref, &mut conn)
2480+
write_frame(params.for_response(), response_opcode, body_ref, &mut conn, &no_compression())
24782481
.await
24792482
.unwrap();
24802483
conn
@@ -2584,6 +2587,7 @@ mod tests {
25842587
FrameOpcode::Request(req_opcode),
25852588
(body_base.to_string() + "|request|").as_bytes(),
25862589
client_socket_ref,
2590+
&no_compression()
25872591
)
25882592
.await
25892593
.unwrap();
@@ -2599,6 +2603,7 @@ mod tests {
25992603
FrameOpcode::Response(resp_opcode),
26002604
(body_base.to_string() + "|response|").as_bytes(),
26012605
server_socket_ref,
2606+
&no_compression()
26022607
)
26032608
.await
26042609
.unwrap();

0 commit comments

Comments
 (0)