Skip to content

Commit f46cf41

Browse files
committed
Improve logging
1 parent fa89991 commit f46cf41

File tree

4 files changed

+126
-97
lines changed

4 files changed

+126
-97
lines changed

leaf/src/proxy/amux/mod.rs

Lines changed: 105 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
2323
use tokio::sync::mpsc::{self, Receiver, Sender};
2424
use tokio::sync::Mutex;
2525
use tokio::time::{sleep, Instant};
26-
use tracing::{debug, trace};
26+
use tracing::{debug, trace, Instrument};
2727

2828
#[cfg(feature = "inbound-amux")]
2929
pub mod inbound;
@@ -190,12 +190,15 @@ impl AsyncWrite for MuxStream {
190190
let to_write = min(buf.len(), MAX_STREAM_FRAME_DATA_LEN.into());
191191
let frame = MuxFrame::Stream(self.stream_id, buf[..to_write].to_vec());
192192
let tx = self.frame_write_tx.clone();
193-
let task = Box::pin(async move {
194-
tx.send(frame)
195-
.map_ok(|_| to_write)
196-
.map_err(|_| broken_pipe())
197-
.await
198-
});
193+
let task = Box::pin(
194+
async move {
195+
tx.send(frame)
196+
.map_ok(|_| to_write)
197+
.map_err(|_| broken_pipe())
198+
.await
199+
}
200+
.instrument(tracing::Span::current()),
201+
);
199202
self.write_state = TaskState::Pending(task);
200203
}
201204
TaskState::Pending(ref mut task) => {
@@ -217,12 +220,15 @@ impl AsyncWrite for MuxStream {
217220
TaskState::Idle => {
218221
let frame = MuxFrame::StreamFin(self.stream_id);
219222
let tx = self.frame_write_tx.clone();
220-
let task = Box::pin(async move {
221-
tx.send(frame)
222-
.map_ok(|_| 0) // FIXME temp workaround the signature
223-
.map_err(|_| broken_pipe())
224-
.await
225-
});
223+
let task = Box::pin(
224+
async move {
225+
tx.send(frame)
226+
.map_ok(|_| 0) // FIXME temp workaround the signature
227+
.map_err(|_| broken_pipe())
228+
.await
229+
}
230+
.instrument(tracing::Span::current()),
231+
);
226232
self.shutdown_state = TaskState::Pending(task);
227233
}
228234
TaskState::Pending(ref mut task) => {
@@ -408,76 +414,79 @@ impl MuxSession {
408414
where
409415
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
410416
{
411-
let task = Box::pin(async move {
412-
while let Some(frame) = frame_stream.next().await {
413-
match frame {
414-
Ok(frame) => {
415-
match frame {
416-
MuxFrame::Stream(stream_id, data) => {
417-
// In accept mode.
418-
if let Some(Accept {
419-
session_id,
420-
stream_accept_tx,
421-
frame_write_tx,
422-
}) = accept.as_mut()
423-
{
424-
// Accepts new stream for an unseen stream ID.
425-
if let std::collections::hash_map::Entry::Vacant(e) =
426-
streams.lock().await.entry(stream_id)
417+
let task = Box::pin(
418+
async move {
419+
while let Some(frame) = frame_stream.next().await {
420+
match frame {
421+
Ok(frame) => {
422+
match frame {
423+
MuxFrame::Stream(stream_id, data) => {
424+
// In accept mode.
425+
if let Some(Accept {
426+
session_id,
427+
stream_accept_tx,
428+
frame_write_tx,
429+
}) = accept.as_mut()
427430
{
428-
let (mux_stream, stream_read_tx) = MuxStream::new(
429-
*session_id,
430-
stream_id,
431-
frame_write_tx.clone(),
432-
Arc::new(AtomicBool::new(false)),
433-
);
434-
e.insert(stream_read_tx);
435-
if stream_accept_tx.send(mux_stream).await.is_err() {
436-
// The `Incoming` transport has been dropped.
437-
break;
431+
// Accepts new stream for an unseen stream ID.
432+
if let std::collections::hash_map::Entry::Vacant(e) =
433+
streams.lock().await.entry(stream_id)
434+
{
435+
let (mux_stream, stream_read_tx) = MuxStream::new(
436+
*session_id,
437+
stream_id,
438+
frame_write_tx.clone(),
439+
Arc::new(AtomicBool::new(false)),
440+
);
441+
e.insert(stream_read_tx);
442+
if stream_accept_tx.send(mux_stream).await.is_err() {
443+
// The `Incoming` transport has been dropped.
444+
break;
445+
}
438446
}
439447
}
440-
}
441-
// Sends data to the stream.
442-
if let Some(stream_read_tx) =
443-
streams.lock().await.get(&stream_id).cloned()
444-
{
445-
if let Some(c) = recv_bytes_counter.as_ref() {
446-
c.fetch_add(data.len(), Ordering::Relaxed);
448+
// Sends data to the stream.
449+
if let Some(stream_read_tx) =
450+
streams.lock().await.get(&stream_id).cloned()
451+
{
452+
if let Some(c) = recv_bytes_counter.as_ref() {
453+
c.fetch_add(data.len(), Ordering::Relaxed);
454+
}
455+
// FIXME error
456+
let _ = stream_read_tx.send(data).await;
447457
}
448-
// FIXME error
449-
let _ = stream_read_tx.send(data).await;
450458
}
451-
}
452-
MuxFrame::StreamFin(stream_id) => {
453-
// Send an empty buffer to indicate EOF.
454-
if let Some(stream_read_tx) =
455-
streams.lock().await.get(&stream_id).cloned()
456-
{
457-
// FIXME error
458-
let _ = stream_read_tx.send(Vec::new()).await;
459+
MuxFrame::StreamFin(stream_id) => {
460+
// Send an empty buffer to indicate EOF.
461+
if let Some(stream_read_tx) =
462+
streams.lock().await.get(&stream_id).cloned()
463+
{
464+
// FIXME error
465+
let _ = stream_read_tx.send(Vec::new()).await;
466+
}
467+
let streams2 = streams.clone();
468+
tokio::spawn(async move {
469+
sleep(Duration::from_secs(4)).await;
470+
streams2.lock().await.remove(&stream_id);
471+
});
459472
}
460-
let streams2 = streams.clone();
461-
tokio::spawn(async move {
462-
sleep(Duration::from_secs(4)).await;
463-
streams2.lock().await.remove(&stream_id);
464-
});
465473
}
466474
}
467-
}
468-
// Borken pipe.
469-
Err(e) => {
470-
debug!("receiving frame failed: {}", e);
471-
break;
475+
// Borken pipe.
476+
Err(e) => {
477+
debug!("receiving frame failed: {}", e);
478+
break;
479+
}
472480
}
473481
}
482+
// Stop receving.
483+
if let Some(recv_end) = recv_end {
484+
*recv_end.lock().await = true;
485+
}
486+
streams.lock().await.clear();
474487
}
475-
// Stop receving.
476-
if let Some(recv_end) = recv_end {
477-
*recv_end.lock().await = true;
478-
}
479-
streams.lock().await.clear();
480-
});
488+
.instrument(tracing::Span::current()),
489+
);
481490
let (task, handle) = abortable(task);
482491
tokio::spawn(task);
483492
handle
@@ -492,27 +501,33 @@ impl MuxSession {
492501
where
493502
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
494503
{
495-
let task = Box::pin(async move {
496-
while let Some(frame) = frame_write_rx.recv().await {
497-
// Peek EOF.
498-
if let MuxFrame::StreamFin(ref stream_id) = frame {
499-
let streams2 = streams.clone();
500-
let stream_id2 = *stream_id;
501-
tokio::spawn(async move {
502-
sleep(Duration::from_secs(4)).await;
503-
streams2.lock().await.remove(&stream_id2);
504-
});
504+
let task = Box::pin(
505+
async move {
506+
while let Some(frame) = frame_write_rx.recv().await {
507+
// Peek EOF.
508+
if let MuxFrame::StreamFin(ref stream_id) = frame {
509+
let streams2 = streams.clone();
510+
let stream_id2 = *stream_id;
511+
tokio::spawn(
512+
async move {
513+
sleep(Duration::from_secs(4)).await;
514+
streams2.lock().await.remove(&stream_id2);
515+
}
516+
.instrument(tracing::Span::current()),
517+
);
518+
}
519+
// Send
520+
if frame_sink.send(frame).await.is_err() {
521+
break;
522+
}
505523
}
506-
// Send
507-
if frame_sink.send(frame).await.is_err() {
508-
break;
524+
if let Some(send_end) = send_end {
525+
*send_end.lock().await = true;
509526
}
527+
streams.lock().await.clear();
510528
}
511-
if let Some(send_end) = send_end {
512-
*send_end.lock().await = true;
513-
}
514-
streams.lock().await.clear();
515-
});
529+
.instrument(tracing::Span::current()),
530+
);
516531
let (task, handle) = abortable(task);
517532
tokio::spawn(task);
518533
handle

leaf/src/proxy/amux/outbound/stream.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use rand::prelude::SliceRandom;
1010
use rand::rngs::StdRng;
1111
use rand::SeedableRng;
1212
use tokio::sync::Mutex;
13-
use tracing::debug;
13+
use tracing::{debug, Instrument};
1414

1515
use crate::{
1616
app::SyncDnsClient,
@@ -93,7 +93,7 @@ impl MuxManager {
9393
let mut conns = self.connectors.lock().await;
9494
conns.shuffle(&mut StdRng::from_entropy());
9595
for c in conns.iter_mut() {
96-
if let Some(s) = c.new_stream().await {
96+
if let Some(s) = c.new_stream().instrument(tracing::Span::current()).await {
9797
return Ok(s);
9898
}
9999
}
@@ -104,6 +104,7 @@ impl MuxManager {
104104
// Create the underlying TCP stream.
105105
let mut conn = self
106106
.new_tcp_stream(self.dns_client.clone(), &self.address, &self.port)
107+
.instrument(tracing::Span::current())
107108
.await?;
108109

109110
// Pass the TCP stream through all sub-transports, e.g. TLS, WebSocket.
@@ -113,7 +114,11 @@ impl MuxManager {
113114
sess.http_sniffed_domain = None;
114115
sess.tls_sniffed_domain = None;
115116
for a in self.actors.iter() {
116-
conn = a.stream()?.handle(&sess, None, Some(conn)).await?;
117+
conn = a
118+
.stream()?
119+
.handle(&sess, None, Some(conn))
120+
.instrument(tracing::Span::current())
121+
.await?;
117122
}
118123

119124
// Create the stream over this new connection.
@@ -130,7 +135,11 @@ impl MuxManager {
130135
)
131136
}
132137
};
133-
let s = match connector.new_stream().await {
138+
let s = match connector
139+
.new_stream()
140+
.instrument(tracing::Span::current())
141+
.await
142+
{
134143
Some(s) => s,
135144
None => return Err(io::Error::other("new stream failed")),
136145
};
@@ -186,6 +195,11 @@ impl OutboundStreamHandler for Handler {
186195
_stream: Option<AnyStream>,
187196
) -> io::Result<AnyStream> {
188197
tracing::trace!("handling outbound stream");
189-
Ok(Box::new(self.manager.new_stream(sess).await?))
198+
Ok(Box::new(
199+
self.manager
200+
.new_stream(sess)
201+
.instrument(tracing::Span::current())
202+
.await?,
203+
))
190204
}
191205
}

leaf/src/proxy/mptp/mptp_conn/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MptpStream<S> {
270270
}
271271
}
272272
MTYP_FIN => {
273-
debug!("received FIN from sub {}", i);
273+
debug!("received fin from sub {}", i);
274274
sub.read_buf.advance(1);
275275
this.closed = true;
276276
// Don't return EOF yet, we might have data in read_buffer

leaf/src/proxy/mptp/outbound/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ impl Handler {
3737
let (tx, mut rx) = mpsc::unbounded_channel();
3838
let cid = Uuid::new_v4();
3939

40-
debug!("CID={} CMD={}", &cid, cmd);
40+
debug!("cid={} cmd={}", &cid, cmd);
4141

4242
// Clone session and set destination to MPTP server
4343
let mut server_sess = sess.clone();

0 commit comments

Comments
 (0)