Skip to content

Commit 2cadb50

Browse files
authored
feat(session layer): send rtx error if the packet is not in the producer buffer (#166)
Signed-off-by: Michele Papalini <[email protected]>
1 parent 9db06b4 commit 2cadb50

File tree

4 files changed

+106
-74
lines changed

4 files changed

+106
-74
lines changed

data-plane/gateway/datapath/src/messages/utils.rs

+4
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,10 @@ impl ProtoMessage {
625625
self.get_agp_header().get_forward_to()
626626
}
627627

628+
pub fn get_error(&self) -> Option<bool> {
629+
self.get_agp_header().get_error()
630+
}
631+
628632
pub fn get_incoming_conn(&self) -> u64 {
629633
self.get_agp_header().get_incoming_conn().unwrap()
630634
}

data-plane/gateway/service/src/streaming.rs

+82-67
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ impl Streaming {
270270
SessionHeaderType::PubSub => {
271271
// send the packet to the application
272272
process_message_from_gw(msg, session_id, &mut state.receiver, &source, max_retries, timeout, &timer_tx, &send_gw, &send_app).await;
273-
274273
}
275274
SessionHeaderType::RtxRequest => {
276275
// handle RTX request
@@ -353,15 +352,17 @@ async fn process_incoming_rtx_request(
353352
msg_rtx_id, session_id
354353
);
355354
// search the packet in the producer buffer
355+
let pkt_src = msg.get_source();
356+
let incoming_conn = msg.get_incoming_conn();
357+
356358
let rtx_pub = match producer.buffer.get(msg_rtx_id as usize) {
357359
Some(packet) => {
358360
trace!(
359361
"packet {} exists in the producer buffer, create rtx reply",
360362
msg_rtx_id
361363
);
362-
// the packet exists, send it to the source of the RTX
363-
let pkt_src = msg.get_source();
364364

365+
// the packet exists, send it to the source of the RTX
365366
let payload = match packet.get_payload() {
366367
Some(p) => p,
367368
None => {
@@ -370,8 +371,6 @@ async fn process_incoming_rtx_request(
370371
}
371372
};
372373

373-
let incoming_conn = msg.get_incoming_conn();
374-
375374
let agp_header = Some(AgpHeader::new(
376375
source,
377376
pkt_src.agent_type(),
@@ -392,21 +391,37 @@ async fn process_incoming_rtx_request(
392391
Message::new_publish_with_headers(agp_header, session_header, "", payload.blob.to_vec())
393392
}
394393
None => {
395-
// the packet does not exist so do nothing
396-
// TODO(micpapal): improve by returning a rtx nack so that the remote app does not
397-
// wait too long for all the retransmissions
394+
// the packet does not exist return an empty RtxReply with the error flag set
398395
debug!(
399-
"received and RTX messages for an old packet on producer session {}",
396+
"received an RTX messages for an old packet on session {}",
400397
session_id
401398
);
402-
return;
399+
400+
let flags = AgpHeaderFlags::default()
401+
.with_forward_to(incoming_conn)
402+
.with_error(true);
403+
404+
let agp_header = Some(AgpHeader::new(
405+
source,
406+
pkt_src.agent_type(),
407+
Some(pkt_src.agent_id()),
408+
Some(flags),
409+
));
410+
411+
let session_header = Some(SessionHeader::new(
412+
SessionHeaderType::RtxReply.into(),
413+
session_id,
414+
msg_rtx_id,
415+
));
416+
417+
Message::new_publish_with_headers(agp_header, session_header, "", vec![])
403418
}
404419
};
405420

406421
trace!("send rtx reply for message {}", msg_rtx_id);
407422
if send_gw.send(Ok(rtx_pub)).await.is_err() {
408423
error!(
409-
"error sending RTX packet to the gateway on producer session {}",
424+
"error sending RTX packet to the gateway on session {}",
410425
session_id
411426
);
412427
}
@@ -421,10 +436,7 @@ async fn process_message_from_app(
421436
send_app: &mpsc::Sender<Result<SessionMessage, SessionError>>,
422437
) {
423438
// set the session header, add the message to the buffer and send it
424-
trace!(
425-
"received message from the app on producer session {}",
426-
session_id
427-
);
439+
trace!("received message from the app on session {}", session_id);
428440

429441
if is_bidirectional {
430442
msg.set_header_type(SessionHeaderType::PubSub);
@@ -450,7 +462,7 @@ async fn process_message_from_app(
450462

451463
if send_gw.send(Ok(msg)).await.is_err() {
452464
error!(
453-
"error sending publication packet to the gateway on producer session {}",
465+
"error sending publication packet to the gateway on session {}",
454466
session_id
455467
);
456468
send_app
@@ -500,8 +512,12 @@ async fn process_message_from_gw(
500512
};
501513

502514
let header_type = msg.get_header_type();
515+
let mut error_rtx = false;
503516
if header_type == SessionHeaderType::RtxReply {
504517
let rtx_msg_id = msg.get_id();
518+
if msg.get_error().is_some() && msg.get_error().unwrap() {
519+
error_rtx = true;
520+
}
505521

506522
// try to clean local state
507523
match receiver.timers_map.get(&rtx_msg_id) {
@@ -518,43 +534,34 @@ async fn process_message_from_gw(
518534
}
519535
} else if header_type != SessionHeaderType::Stream && header_type != SessionHeaderType::PubSub {
520536
error!(
521-
"received packet with invalid header type {} on receiver session {}",
537+
"received packet with invalid header type {} on session {}",
522538
i32::from(header_type),
523539
session_id
524540
);
525541
return;
526542
}
527543

544+
if error_rtx {
545+
// a message cannot be recovered
546+
let msg_id = msg.get_id();
547+
debug!("received an error RTX reply for message {}", msg_id);
548+
match receiver.buffer.on_lost_message(msg_id) {
549+
Ok(recv) => {
550+
send_message_to_app(recv, session_id, send_app).await;
551+
}
552+
Err(e) => {
553+
error!("error adding message lost to the buffer: {}", e.to_string());
554+
}
555+
}
556+
return;
557+
}
558+
528559
match receiver.buffer.on_received_message(msg) {
529560
Ok((recv, rtx)) => {
530-
for opt in recv {
531-
trace!(
532-
"send recv packet to the application on receiver session {}",
533-
session_id
534-
);
535-
match opt {
536-
Some(m) => {
537-
// send message to the app
538-
let info = Info::from(&m);
539-
let session_msg = SessionMessage::new(m, info);
540-
if send_app.send(Ok(session_msg)).await.is_err() {
541-
error!(
542-
"error sending packet to the application on receiver session {}",
543-
session_id
544-
);
545-
}
546-
}
547-
None => {
548-
warn!(
549-
"a message was definitely lost in receiver session {}",
550-
session_id
551-
);
552-
let _ = send_app
553-
.send(Err(SessionError::MessageLost(session_id.to_string())))
554-
.await;
555-
}
556-
}
557-
}
561+
// send packets to the app
562+
send_message_to_app(recv, session_id, send_app).await;
563+
564+
// send RTX
558565
for r in rtx {
559566
debug!(
560567
"packet loss detected on session {}, send RTX for id {}",
@@ -635,7 +642,7 @@ async fn handle_timeout(
635642

636643
if send_gw.send(Ok(rtx.clone())).await.is_err() {
637644
error!(
638-
"error sending RTX for id {} on receiver session {}",
645+
"error sending RTX for id {} on session {}",
639646
msg_id, session_id
640647
);
641648
}
@@ -663,34 +670,42 @@ async fn handle_failure(
663670

664671
match receiver.buffer.on_lost_message(msg_id) {
665672
Ok(recv) => {
666-
for opt in recv {
667-
match opt {
668-
Some(m) => {
669-
let info = Info::from(&m);
670-
let session_msg = SessionMessage::new(m, info);
671-
// send message to the app
672-
if send_app.send(Ok(session_msg)).await.is_err() {
673-
error!(
674-
"error sending packet to the gateway on session {}",
675-
session_id
676-
);
677-
}
678-
}
679-
None => {
680-
warn!("a message was definitely lost in session {}", session_id);
681-
let _ = send_app
682-
.send(Err(SessionError::MessageLost(session_id.to_string())))
683-
.await;
684-
}
685-
}
686-
}
673+
send_message_to_app(recv, session_id, send_app).await;
687674
}
688675
Err(e) => {
689676
error!("error adding message lost to the buffer: {}", e.to_string());
690677
}
691678
};
692679
}
693680

681+
async fn send_message_to_app(
682+
messages: Vec<Option<Message>>,
683+
session_id: u32,
684+
send_app: &mpsc::Sender<Result<SessionMessage, SessionError>>,
685+
) {
686+
for opt in messages {
687+
match opt {
688+
Some(m) => {
689+
let info = Info::from(&m);
690+
let session_msg = SessionMessage::new(m, info);
691+
// send message to the app
692+
if send_app.send(Ok(session_msg)).await.is_err() {
693+
error!(
694+
"error sending packet to the gateway on session {}",
695+
session_id
696+
);
697+
}
698+
}
699+
None => {
700+
warn!("a message was definitely lost in session {}", session_id);
701+
let _ = send_app
702+
.send(Err(SessionError::MessageLost(session_id.to_string())))
703+
.await;
704+
}
705+
}
706+
}
707+
}
708+
694709
#[async_trait]
695710
impl Session for Streaming {
696711
async fn on_message(

data-plane/python-bindings/uv.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-plane/testing/src/bin/subscriber.rs

+19-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use testing::parse_line;
1111
use agp_gw::config;
1212
use clap::Parser;
1313
use indicatif::ProgressBar;
14-
use tracing::{debug, info};
14+
use tracing::{debug, error, info};
1515

1616
#[derive(Parser, Debug)]
1717
#[command(version, about, long_about = None)]
@@ -120,12 +120,24 @@ async fn main() {
120120

121121
info!("waiting for incoming messages");
122122
loop {
123-
let recv_msg = rx.recv().await.unwrap().expect("error");
124-
info!(
125-
"received message {} from session {}",
126-
recv_msg.info.message_id.unwrap(),
127-
recv_msg.info.id
128-
);
123+
match rx.recv().await {
124+
Some(res) => match res {
125+
Ok(recv_msg) => {
126+
info!(
127+
"received message {} from session {}",
128+
recv_msg.info.message_id.unwrap(),
129+
recv_msg.info.id
130+
);
131+
}
132+
Err(e) => {
133+
error!("received error {}", e)
134+
}
135+
},
136+
None => {
137+
error!("stream close");
138+
return;
139+
}
140+
};
129141
}
130142
}
131143

0 commit comments

Comments
 (0)