Skip to content

RPC TTFB On RPC Response #5769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ tempfile = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-channel = { workspace = true }
logging = { workspace = true }

[features]
libp2p-websocket = []
18 changes: 16 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ where
inner: TOutboundCodec,
/// Keeps track of the current response code for a chunk.
current_response_code: Option<u8>,
/// Whether the first byte has been received or not.
received_first_byte: bool,
phantom: PhantomData<E>,
}

@@ -66,6 +68,7 @@ where
BaseOutboundCodec {
inner: codec,
current_response_code: None,
received_first_byte: false,
phantom: PhantomData,
}
}
@@ -88,7 +91,7 @@ where
dst.reserve(1);
dst.put_u8(
item.as_u8()
.expect("Should never encode a stream termination"),
.expect("Should never encode a stream termination or first byte"),
);
self.inner.encode(item, dst)
}
@@ -134,10 +137,21 @@ where
type Error = <TCodec as Decoder>::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() == 0 {
return Ok(None);
}

if !self.received_first_byte {
// Notify the handler that the first byte has been received.
self.received_first_byte = true;
return Ok(Some(RPCCodedResponse::<E>::FirstByte));
}

// if we have only received the response code, wait for more bytes
if src.len() <= 1 {
if src.len() == 1 {
return Ok(None);
}

// using the response code determine which kind of payload needs to be decoded.
let response_code = self.current_response_code.unwrap_or_else(|| {
let resp_code = src.split_to(1)[0];
3 changes: 3 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
@@ -92,6 +92,9 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
RPCCodedResponse::FirstByte => {
unreachable!("Code error - attempting to encode first byte")
}
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
31 changes: 29 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
@@ -137,8 +137,11 @@ where
/// Logger for handling RPC streams
log: slog::Logger,

/// Timeout that will me used for inbound and outbound responses.
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,

/// Timeout that will be used for outbound response. If the first byte does not arrive within this, substreams will be terminated.
ttfb_timeout: Duration,
}

enum HandlerState {
@@ -222,6 +225,7 @@ where
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
ttfb_timeout: Duration,
) -> Self {
RPCHandler {
listen_protocol,
@@ -241,6 +245,7 @@ where
waker: None,
log: log.clone(),
resp_timeout,
ttfb_timeout,
}
}

@@ -690,6 +695,25 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if matches!(response, RPCCodedResponse::FirstByte) {
// Reset the timeout from ttfb_timeout to resp_timeout.
self.outbound_substreams_delay
.reset(&entry.get().delay_key, self.resp_timeout);

// Since the second and subsequent bytes might have already been received, we wake the task to process them.
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}

// No processing is required for the first byte.
entry.get_mut().state =
OutboundSubstreamState::RequestPendingResponse {
substream,
request,
};
continue;
}

if request.expect_exactly_one_response() || response.close_after() {
// either this is a single response request or this response closes the
// stream
@@ -736,6 +760,9 @@ where
error: RPCError::ErrorResponse(*code, r.to_string()),
})
}
RPCCodedResponse::FirstByte => {
unreachable!("No processing is required for the first byte.")
}
};

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received));
@@ -983,7 +1010,7 @@ where
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(self.current_outbound_substream_id, self.resp_timeout);
.insert(self.current_outbound_substream_id, self.ttfb_timeout);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream),
request,
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
@@ -434,6 +434,9 @@ pub enum RPCCodedResponse<E: EthSpec> {

/// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination),

/// This is used to notify the handler by the outbound codec that the first byte has been received.
FirstByte,
}

/// Request a light_client_bootstrap for light_clients peers.
@@ -462,6 +465,7 @@ impl<E: EthSpec> RPCCodedResponse<E> {
RPCCodedResponse::Success(_) => Some(0),
RPCCodedResponse::Error(code, _) => Some(code.as_u8()),
RPCCodedResponse::StreamTermination(_) => None,
RPCCodedResponse::FirstByte => None,
}
}

@@ -486,6 +490,7 @@ impl<E: EthSpec> RPCCodedResponse<E> {
/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
!matches!(self, RPCCodedResponse::Success(_))
&& !matches!(self, RPCCodedResponse::FirstByte)
}
}

@@ -585,6 +590,7 @@ impl<E: EthSpec> std::fmt::Display for RPCCodedResponse<E> {
RPCCodedResponse::Success(res) => write!(f, "{}", res),
RPCCodedResponse::Error(code, err) => write!(f, "{}: {}", code, err),
RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"),
RPCCodedResponse::FirstByte => write!(f, "First Byte"),
}
}
}
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -246,6 +246,7 @@ where
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
self.network_params.ttfb_timeout,
);

Ok(handler)
@@ -278,6 +279,7 @@ where
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
self.network_params.ttfb_timeout,
);

Ok(handler)
94 changes: 94 additions & 0 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
@@ -1162,3 +1162,97 @@ fn quic_test_goodbye_rpc() {
let enable_logging = false;
goodbye_test(log_level, enable_logging, Protocol::Quic);
}

// Tests TTFB timeout
#[test]
fn test_ttfb_timeout() {
let rt = Arc::new(Runtime::new().unwrap());
let log = logging::test_logger();
let spec = E::default_spec();

rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});

// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});

let sender_peer_id = sender.local_peer_id;
let receiver_peer_id = receiver.local_peer_id;

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
}
NetworkEvent::RPCFailed { peer_id, error, .. } => {
debug!(log, "The RPC failed as expected"; "error" => ?error);
assert_eq!(peer_id, receiver_peer_id);
assert!(matches!(error, RPCError::StreamTimeout));
// The test has been successfully completed; exiting the loop.
return;
}
_ => {}
}
}
};

// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
NetworkEvent::RequestReceived {
peer_id,
id,
request,
} => {
assert_eq!(peer_id, sender_peer_id);
assert_eq!(request, rpc_request);

debug!(log, "Receiver received");
debug!(log, "Waiting until the ttfb_timeout is reached"; "ttfb_timeout" => spec.ttfb_timeout);
tokio::time::sleep(Duration::from_secs(spec.ttfb_timeout + 1)).await;

receiver.send_response(peer_id, id, rpc_response.clone());
debug!(log, "A dummy response was sent, which should not be received by the sender due to the timeout");
}
_ => {}
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
});
}