Skip to content

Commit 7729d6a

Browse files
committed
Add subject to slow consumers event
1 parent aff191a commit 7729d6a

File tree

2 files changed

+36
-10
lines changed

2 files changed

+36
-10
lines changed

async-nats/src/lib.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -748,11 +748,14 @@ impl ConnectionHandler {
748748
}
749749
}
750750
}
751-
Err(mpsc::error::TrySendError::Full(_)) => {
751+
Err(mpsc::error::TrySendError::Full(returned_message)) => {
752752
debug!("slow consumer detected for subscription {}", sid);
753753
self.connector
754754
.events_tx
755-
.try_send(Event::SlowConsumer(sid))
755+
.try_send(Event::SlowConsumer(SlowConsumer {
756+
sid,
757+
subject: returned_message.subject,
758+
}))
756759
.ok();
757760
}
758761
Err(mpsc::error::TrySendError::Closed(_)) => {
@@ -1103,7 +1106,7 @@ pub enum Event {
11031106
LameDuckMode,
11041107
Draining,
11051108
Closed,
1106-
SlowConsumer(u64),
1109+
SlowConsumer(SlowConsumer),
11071110
ServerError(ServerError),
11081111
ClientError(ClientError),
11091112
}
@@ -1116,7 +1119,11 @@ impl fmt::Display for Event {
11161119
Event::LameDuckMode => write!(f, "lame duck mode detected"),
11171120
Event::Draining => write!(f, "draining"),
11181121
Event::Closed => write!(f, "closed"),
1119-
Event::SlowConsumer(sid) => write!(f, "slow consumers for subscription {sid}"),
1122+
Event::SlowConsumer(slow_consumer) => write!(
1123+
f,
1124+
"slow consumers for subscription {} on subject {}",
1125+
slow_consumer.sid, slow_consumer.subject
1126+
),
11201127
Event::ServerError(err) => write!(f, "server error: {err}"),
11211128
Event::ClientError(err) => write!(f, "client error: {err}"),
11221129
}
@@ -1437,7 +1444,7 @@ impl From<ClientError> for CallbackError {
14371444
#[derive(Clone, Debug, Eq, PartialEq, Error)]
14381445
pub enum ServerError {
14391446
AuthorizationViolation,
1440-
SlowConsumer(u64),
1447+
SlowConsumer(SlowConsumer),
14411448
Other(String),
14421449
}
14431450

@@ -1469,12 +1476,28 @@ impl std::fmt::Display for ServerError {
14691476
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14701477
match self {
14711478
Self::AuthorizationViolation => write!(f, "nats: authorization violation"),
1472-
Self::SlowConsumer(sid) => write!(f, "nats: subscription {sid} is a slow consumer"),
1479+
Self::SlowConsumer(slow_consumer) => write!(
1480+
f,
1481+
"nats: subscription {} on subject {} is a slow consumer",
1482+
slow_consumer.sid, slow_consumer.subject,
1483+
),
14731484
Self::Other(error) => write!(f, "nats: {error}"),
14741485
}
14751486
}
14761487
}
14771488

1489+
#[derive(Clone, Debug, Eq, PartialEq)]
1490+
pub struct SlowConsumer {
1491+
pub sid: u64,
1492+
pub subject: Subject,
1493+
}
1494+
1495+
impl std::fmt::Display for SlowConsumer {
1496+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1497+
write!(f, "slow consumer {} on subject {}", self.sid, self.subject)
1498+
}
1499+
}
1500+
14781501
/// Info to construct a CONNECT message.
14791502
#[derive(Clone, Debug, Serialize)]
14801503
pub struct ConnectInfo {

async-nats/tests/client_tests.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ mod client {
569569
.event_callback(move |event| {
570570
let tx = tx.clone();
571571
async move {
572-
if let Event::SlowConsumer(_) = event {
573-
tx.send(()).await.unwrap()
572+
if let Event::SlowConsumer(sc) = event {
573+
tx.send(sc).await.unwrap()
574574
}
575575
}
576576
})
@@ -587,15 +587,18 @@ mod client {
587587

588588
tokio::time::sleep(Duration::from_secs(1)).await;
589589

590-
tokio::time::timeout(Duration::from_secs(5), rx.recv())
590+
let sc = tokio::time::timeout(Duration::from_secs(5), rx.recv())
591591
.await
592592
.unwrap()
593593
.unwrap();
594-
tokio::time::timeout(Duration::from_secs(5), rx.recv())
594+
assert_eq!(sc.subject.as_str(), "data");
595+
let sc = tokio::time::timeout(Duration::from_secs(5), rx.recv())
595596
.await
596597
.unwrap()
597598
.unwrap();
599+
assert_eq!(sc.subject.as_str(), "data");
598600
}
601+
599602
#[tokio::test]
600603
async fn no_echo() {
601604
// no_echo disabled.

0 commit comments

Comments
 (0)