Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl TcpConnection {
async fn handle_negotiated_substream(
&mut self,
result: Result<NegotiatedSubstream, ConnectionError>,
) {
) -> crate::Result<()> {
match result {
Err(error) => {
tracing::debug!(
Expand All @@ -603,19 +603,18 @@ impl TcpConnection {

match (protocol, substream_id) {
(Some(protocol), Some(substream_id)) => {
if let Err(error) = self
.protocol_set
self.protocol_set
.report_substream_open_failure(protocol.clone(), substream_id, error)
.await
{
tracing::error!(
target: LOG_TARGET,
?protocol,
endpoint = ?self.endpoint,
?error,
"failed to register substream open failure to protocol"
);
}
.inspect_err(|error| {
tracing::error!(
target: LOG_TARGET,
?protocol,
endpoint = ?self.endpoint,
?error,
"failed to register substream open failure to protocol"
);
})?;
}
_ => {}
}
Expand All @@ -634,22 +633,23 @@ impl TcpConnection {
self.protocol_set.protocol_codec(&protocol),
);

if let Err(error) = self
.protocol_set
self.protocol_set
.report_substream_open(self.peer, protocol.clone(), direction, substream)
.await
{
tracing::error!(
target: LOG_TARGET,
?protocol,
peer = ?self.peer,
endpoint = ?self.endpoint,
?error,
"failed to register opened substream to protocol",
);
}
.inspect_err(|error| {
tracing::error!(
target: LOG_TARGET,
?protocol,
peer = ?self.peer,
endpoint = ?self.endpoint,
?error,
"failed to register opened substream to protocol",
);
})?;
}
}

Ok(())
}

/// Handles protocol command.
Expand Down Expand Up @@ -744,7 +744,7 @@ impl TcpConnection {
}
},
substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => {
self.handle_negotiated_substream(substream).await;
self.handle_negotiated_substream(substream).await?;
}
protocol = self.protocol_set.next() => {
if self.handle_protocol_command(protocol).await? {
Expand Down
Loading