Skip to content

Commit 487f8bc

Browse files
aschranclaude
andauthored
anemo: allow separate max_frame_size for requests and responses (#71)
* anemo: allow separate max_frame_size for requests and responses Adds `max_request_frame_size` and `max_response_frame_size` Config options. Each falls back to `max_frame_size` when unset, preserving existing behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix rustdoc intra-doc link to public Config fields Rename the private accessors (`max_request_frame_size`, `max_response_frame_size`) to `request_frame_size` / `response_frame_size` so the public field names referenced in the `max_frame_size` doc comment resolve to the public fields rather than the private methods. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 23eea2b commit 487f8bc

5 files changed

Lines changed: 146 additions & 14 deletions

File tree

crates/anemo/src/config.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,26 @@ pub struct Config {
9191

9292
/// Set the maximum frame size in bytes.
9393
///
94-
/// This controls the maximum size of a request or response.
94+
/// This controls the maximum size of a request or response. Acts as the default
95+
/// for both directions when [`Config::max_request_frame_size`] or
96+
/// [`Config::max_response_frame_size`] are not set.
9597
///
9698
/// If unspecified, there will be no limit.
9799
#[serde(skip_serializing_if = "Option::is_none")]
98100
pub max_frame_size: Option<usize>,
99101

102+
/// Set the maximum frame size in bytes for request messages.
103+
///
104+
/// If unspecified, falls back to [`Config::max_frame_size`].
105+
#[serde(skip_serializing_if = "Option::is_none")]
106+
pub max_request_frame_size: Option<usize>,
107+
108+
/// Set the maximum frame size in bytes for response messages.
109+
///
110+
/// If unspecified, falls back to [`Config::max_frame_size`].
111+
#[serde(skip_serializing_if = "Option::is_none")]
112+
pub max_response_frame_size: Option<usize>,
113+
100114
/// Set a timeout, in milliseconds, for all inbound requests.
101115
///
102116
/// When an inbound timeout is hit when processing a request a Response is sent to the
@@ -271,8 +285,12 @@ impl Config {
271285
.unwrap_or(PEER_EVENT_BROADCAST_CHANNEL_CAPACITY)
272286
}
273287

274-
pub(crate) fn max_frame_size(&self) -> Option<usize> {
275-
self.max_frame_size
288+
pub(crate) fn request_frame_size(&self) -> Option<usize> {
289+
self.max_request_frame_size.or(self.max_frame_size)
290+
}
291+
292+
pub(crate) fn response_frame_size(&self) -> Option<usize> {
293+
self.max_response_frame_size.or(self.max_frame_size)
276294
}
277295

278296
pub(crate) fn inbound_request_timeout(&self) -> Option<Duration> {

crates/anemo/src/network/peer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::{
2-
wire::{network_message_frame_codec, read_response, write_request},
2+
wire::{
3+
read_response, request_message_frame_codec, response_message_frame_codec, write_request,
4+
},
35
OutboundRequestLayer,
46
};
57
use crate::{connection::Connection, Config, PeerId, Request, Response, Result};
@@ -50,9 +52,9 @@ impl Peer {
5052
async fn do_rpc(&self, request: Request<Bytes>) -> Result<Response<Bytes>> {
5153
let (send_stream, recv_stream) = self.connection.open_bi().await?;
5254
let mut send_stream =
53-
FramedWrite::new(send_stream, network_message_frame_codec(&self.config));
55+
FramedWrite::new(send_stream, request_message_frame_codec(&self.config));
5456
let mut recv_stream =
55-
FramedRead::new(recv_stream, network_message_frame_codec(&self.config));
57+
FramedRead::new(recv_stream, response_message_frame_codec(&self.config));
5658

5759
//
5860
// Write Request

crates/anemo/src/network/request_handler.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::wire::MessageFrameCodec;
22
use super::{
3-
wire::{network_message_frame_codec, read_request, write_response},
3+
wire::{
4+
read_request, request_message_frame_codec, response_message_frame_codec, write_response,
5+
},
46
ActivePeers,
57
};
68
use crate::{
@@ -137,8 +139,8 @@ impl BiStreamRequestHandler {
137139
Self {
138140
connection,
139141
service,
140-
send_stream: FramedWrite::new(send_stream, network_message_frame_codec(config)),
141-
recv_stream: FramedRead::new(recv_stream, network_message_frame_codec(config)),
142+
send_stream: FramedWrite::new(send_stream, response_message_frame_codec(config)),
143+
recv_stream: FramedRead::new(recv_stream, request_message_frame_codec(config)),
142144
}
143145
}
144146

crates/anemo/src/network/tests.rs

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{types::PeerEvent, Network, NetworkRef, Request, Response, Result};
1+
use crate::{types::PeerEvent, Config, Network, NetworkRef, Request, Response, Result};
22
use bytes::{Buf, BufMut, Bytes, BytesMut};
33
use futures::FutureExt;
44
use std::{convert::Infallible, time::Duration};
@@ -868,3 +868,104 @@ async fn network_ref_via_extension() -> Result<()> {
868868

869869
Ok(())
870870
}
871+
872+
fn build_network_with_config(config: Config) -> Result<Network> {
873+
let network = Network::bind("localhost:0")
874+
.random_private_key()
875+
.server_name("test")
876+
.config(config)
877+
.start(echo_service())?;
878+
879+
trace!(
880+
address =% network.local_addr(),
881+
peer_id =% network.peer_id(),
882+
"starting network"
883+
);
884+
885+
Ok(network)
886+
}
887+
888+
#[tokio::test]
889+
async fn server_max_request_frame_size_rejects_large_requests() -> Result<()> {
890+
let _guard = crate::init_tracing_for_testing();
891+
892+
let server = build_network_with_config(Config {
893+
max_request_frame_size: Some(128),
894+
..Default::default()
895+
})?;
896+
let client = build_network()?;
897+
898+
let peer = client.connect(server.local_addr()).await?;
899+
900+
// Request body within the server's request limit succeeds.
901+
let small = vec![0u8; 32];
902+
let response = client.rpc(peer, Request::new(small.clone().into())).await?;
903+
assert_eq!(response.into_body().as_ref(), small.as_slice());
904+
905+
// Request body exceeding the server's request limit is rejected by the server,
906+
// and the client sees the RPC fail.
907+
let big = vec![0u8; 4096];
908+
client
909+
.rpc(peer, Request::new(big.into()))
910+
.await
911+
.unwrap_err();
912+
913+
Ok(())
914+
}
915+
916+
#[tokio::test]
917+
async fn client_max_response_frame_size_rejects_large_responses() -> Result<()> {
918+
let _guard = crate::init_tracing_for_testing();
919+
920+
let server = build_network()?;
921+
let client = build_network_with_config(Config {
922+
max_response_frame_size: Some(128),
923+
..Default::default()
924+
})?;
925+
926+
let peer = client.connect(server.local_addr()).await?;
927+
928+
// Response body within the client's response limit succeeds.
929+
let small = vec![0u8; 32];
930+
let response = client.rpc(peer, Request::new(small.clone().into())).await?;
931+
assert_eq!(response.into_body().as_ref(), small.as_slice());
932+
933+
// The echoed response exceeds the client's response limit. The request itself
934+
// is still within the (default) request limit, so it leaves the client and the
935+
// server processes it; the failure happens when the client tries to decode the
936+
// response.
937+
let big = vec![0u8; 4096];
938+
client
939+
.rpc(peer, Request::new(big.into()))
940+
.await
941+
.unwrap_err();
942+
943+
Ok(())
944+
}
945+
946+
#[tokio::test]
947+
async fn max_frame_size_falls_back_for_both_directions() -> Result<()> {
948+
let _guard = crate::init_tracing_for_testing();
949+
950+
// Setting only the legacy `max_frame_size` should constrain both directions,
951+
// matching pre-existing behavior.
952+
let server = build_network_with_config(Config {
953+
max_frame_size: Some(128),
954+
..Default::default()
955+
})?;
956+
let client = build_network()?;
957+
958+
let peer = client.connect(server.local_addr()).await?;
959+
960+
let small = vec![0u8; 32];
961+
let response = client.rpc(peer, Request::new(small.clone().into())).await?;
962+
assert_eq!(response.into_body().as_ref(), small.as_slice());
963+
964+
let big = vec![0u8; 4096];
965+
client
966+
.rpc(peer, Request::new(big.into()))
967+
.await
968+
.unwrap_err();
969+
970+
Ok(())
971+
}

crates/anemo/src/network/wire.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,19 @@ impl Encoder<Bytes> for MessageFrameCodec {
122122
}
123123
}
124124

125-
/// Returns a fully configured message frame codec for writing/reading
126-
/// serialized frames to/from a socket.
127-
pub(crate) fn network_message_frame_codec(config: &Config) -> MessageFrameCodec {
128-
let max_frame_length = config.max_frame_size().unwrap_or(DEFAULT_MAX_FRAME_LENGTH);
125+
/// Returns a message frame codec sized for request messages.
126+
pub(crate) fn request_message_frame_codec(config: &Config) -> MessageFrameCodec {
127+
let max_frame_length = config
128+
.request_frame_size()
129+
.unwrap_or(DEFAULT_MAX_FRAME_LENGTH);
130+
MessageFrameCodec::new(max_frame_length)
131+
}
132+
133+
/// Returns a message frame codec sized for response messages.
134+
pub(crate) fn response_message_frame_codec(config: &Config) -> MessageFrameCodec {
135+
let max_frame_length = config
136+
.response_frame_size()
137+
.unwrap_or(DEFAULT_MAX_FRAME_LENGTH);
129138
MessageFrameCodec::new(max_frame_length)
130139
}
131140

0 commit comments

Comments
 (0)