Skip to content

RPC TTFB On RPC Response #5769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tempfile = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-channel = { workspace = true }
logging = { workspace = true }

[features]
libp2p-websocket = []
Expand Down
18 changes: 16 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ where
inner: TOutboundCodec,
/// Keeps track of the current response code for a chunk.
current_response_code: Option<u8>,
/// Whether the first byte has been received or not.
received_first_byte: bool,
phantom: PhantomData<E>,
}

Expand All @@ -66,6 +68,7 @@ where
BaseOutboundCodec {
inner: codec,
current_response_code: None,
received_first_byte: false,
phantom: PhantomData,
}
}
Expand All @@ -88,7 +91,7 @@ where
dst.reserve(1);
dst.put_u8(
item.as_u8()
.expect("Should never encode a stream termination"),
.expect("Should never encode a stream termination or first byte"),
);
self.inner.encode(item, dst)
}
Expand Down Expand Up @@ -134,10 +137,21 @@ where
type Error = <TCodec as Decoder>::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() == 0 {
return Ok(None);
}

if !self.received_first_byte {
// Notify the handler that the first byte has been received.
self.received_first_byte = true;
return Ok(Some(RPCCodedResponse::<E>::FirstByte));
}

// if we have only received the response code, wait for more bytes
if src.len() <= 1 {
if src.len() == 1 {
return Ok(None);
}

// using the response code determine which kind of payload needs to be decoded.
let response_code = self.current_response_code.unwrap_or_else(|| {
let resp_code = src.split_to(1)[0];
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
RPCCodedResponse::FirstByte => {
unreachable!("Code error - attempting to encode first byte")
}
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
Expand Down
31 changes: 29 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ where
/// Logger for handling RPC streams
log: slog::Logger,

/// Timeout that will me used for inbound and outbound responses.
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,

/// Timeout that will be used for outbound response. If the first byte does not arrive within this, substreams will be terminated.
ttfb_timeout: Duration,
}

enum HandlerState {
Expand Down Expand Up @@ -222,6 +225,7 @@ where
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
ttfb_timeout: Duration,
) -> Self {
RPCHandler {
listen_protocol,
Expand All @@ -241,6 +245,7 @@ where
waker: None,
log: log.clone(),
resp_timeout,
ttfb_timeout,
}
}

Expand Down Expand Up @@ -690,6 +695,25 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if matches!(response, RPCCodedResponse::FirstByte) {
// Reset the timeout from ttfb_timeout to resp_timeout.
self.outbound_substreams_delay
.reset(&entry.get().delay_key, self.resp_timeout);

// Since the second and subsequent bytes might have already been received, we wake the task to process them.
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}

// No processing is required for the first byte.
entry.get_mut().state =
OutboundSubstreamState::RequestPendingResponse {
substream,
request,
};
continue;
}

if request.expect_exactly_one_response() || response.close_after() {
// either this is a single response request or this response closes the
// stream
Expand Down Expand Up @@ -736,6 +760,9 @@ where
error: RPCError::ErrorResponse(*code, r.to_string()),
})
}
RPCCodedResponse::FirstByte => {
unreachable!("No processing is required for the first byte.")
}
};

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received));
Expand Down Expand Up @@ -983,7 +1010,7 @@ where
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(self.current_outbound_substream_id, self.resp_timeout);
.insert(self.current_outbound_substream_id, self.ttfb_timeout);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream),
request,
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ pub enum RPCCodedResponse<E: EthSpec> {

/// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination),

/// This is used to notify the handler by the outbound codec that the first byte has been received.
FirstByte,
}

/// Request a light_client_bootstrap for light_clients peers.
Expand Down Expand Up @@ -462,6 +465,7 @@ impl<E: EthSpec> RPCCodedResponse<E> {
RPCCodedResponse::Success(_) => Some(0),
RPCCodedResponse::Error(code, _) => Some(code.as_u8()),
RPCCodedResponse::StreamTermination(_) => None,
RPCCodedResponse::FirstByte => None,
}
}

Expand All @@ -486,6 +490,7 @@ impl<E: EthSpec> RPCCodedResponse<E> {
/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
!matches!(self, RPCCodedResponse::Success(_))
&& !matches!(self, RPCCodedResponse::FirstByte)
}
}

Expand Down Expand Up @@ -585,6 +590,7 @@ impl<E: EthSpec> std::fmt::Display for RPCCodedResponse<E> {
RPCCodedResponse::Success(res) => write!(f, "{}", res),
RPCCodedResponse::Error(code, err) => write!(f, "{}: {}", code, err),
RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"),
RPCCodedResponse::FirstByte => write!(f, "First Byte"),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ where
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
self.network_params.ttfb_timeout,
);

Ok(handler)
Expand Down Expand Up @@ -278,6 +279,7 @@ where
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
self.network_params.ttfb_timeout,
);

Ok(handler)
Expand Down
94 changes: 94 additions & 0 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,3 +1162,97 @@ fn quic_test_goodbye_rpc() {
let enable_logging = false;
goodbye_test(log_level, enable_logging, Protocol::Quic);
}

// Tests TTFB timeout
#[test]
fn test_ttfb_timeout() {
let rt = Arc::new(Runtime::new().unwrap());
let log = logging::test_logger();
let spec = E::default_spec();

rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});

// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});

let sender_peer_id = sender.local_peer_id;
let receiver_peer_id = receiver.local_peer_id;

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
}
NetworkEvent::RPCFailed { peer_id, error, .. } => {
debug!(log, "The RPC failed as expected"; "error" => ?error);
assert_eq!(peer_id, receiver_peer_id);
assert!(matches!(error, RPCError::StreamTimeout));
// The test has been successfully completed; exiting the loop.
return;
}
_ => {}
}
}
};

// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
NetworkEvent::RequestReceived {
peer_id,
id,
request,
} => {
assert_eq!(peer_id, sender_peer_id);
assert_eq!(request, rpc_request);

debug!(log, "Receiver received");
debug!(log, "Waiting until the ttfb_timeout is reached"; "ttfb_timeout" => spec.ttfb_timeout);
tokio::time::sleep(Duration::from_secs(spec.ttfb_timeout + 1)).await;

receiver.send_response(peer_id, id, rpc_response.clone());
debug!(log, "A dummy response was sent, which should not be received by the sender due to the timeout");
}
_ => {}
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
});
}
Loading