Skip to content

Commit 1800b34

Browse files
committed
Add subject validataion
Signed-off-by: Tomasz Pietrek <tomasz@nats.io> Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
1 parent aff191a commit 1800b34

File tree

11 files changed

+560
-41
lines changed

11 files changed

+560
-41
lines changed

.config/nats.dic

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,4 @@ NUID
202202
performant
203203
thread_rng
204204
NUID's
205+
whitespace

async-nats/benches/main.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,10 @@ use criterion::criterion_main;
33
// Import the benchmark groups from both files
44
mod core_nats;
55
mod jetstream;
6+
mod subject_validation;
67

7-
criterion_main!(core_nats::core_nats, jetstream::jetstream);
8+
criterion_main!(
9+
core_nats::core_nats,
10+
jetstream::jetstream,
11+
subject_validation::subject_validation
12+
);
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use bytes::Bytes;
2+
use criterion::{criterion_group, criterion_main, Criterion};
3+
4+
static PAYLOAD: &[u8] = &[22; 32];
5+
6+
// Pre-defined subjects as static strs to avoid cloning overhead
7+
static SUBJECT_5: &str = "bench";
8+
static SUBJECT_16: &str = "bench.data.testx";
9+
static SUBJECT_32: &str = "events.data.bench.test.messages";
10+
static SUBJECT_128: &str = "segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.segment.x";
11+
12+
async fn publish_with_static_str(
13+
nc: async_nats::Client,
14+
subject: &'static str,
15+
payload: Bytes,
16+
count: u64,
17+
) {
18+
for _ in 0..count {
19+
nc.publish(subject, payload.clone()).await.unwrap();
20+
}
21+
}
22+
23+
pub fn publish_validation_comparison(c: &mut Criterion) {
24+
let messages_per_iter = 500_000;
25+
let server = nats_server::run_basic_server();
26+
27+
let mut validation_group = c.benchmark_group("nats::publish_validation_comparison");
28+
validation_group.sample_size(10);
29+
validation_group.warm_up_time(std::time::Duration::from_secs(1));
30+
31+
// Test different subject lengths: 5, 16, 32, 128 characters
32+
for (subject_len, subject_str) in [
33+
(5, SUBJECT_5),
34+
(16, SUBJECT_16),
35+
(32, SUBJECT_32),
36+
(128, SUBJECT_128),
37+
] {
38+
validation_group.throughput(criterion::Throughput::Elements(messages_per_iter));
39+
40+
// Benchmark 1: With runtime validation (default)
41+
validation_group.bench_with_input(
42+
criterion::BenchmarkId::new("with_validation", subject_len),
43+
&subject_len,
44+
|b, _| {
45+
let rt = tokio::runtime::Runtime::new().unwrap();
46+
let nc = rt.block_on(async {
47+
let nc = async_nats::connect(server.client_url()).await.unwrap();
48+
nc.publish("data", "data".into()).await.unwrap();
49+
nc
50+
});
51+
52+
b.to_async(rt).iter_with_large_drop(move || {
53+
let nc = nc.clone();
54+
async move {
55+
publish_with_static_str(
56+
nc,
57+
subject_str,
58+
Bytes::from_static(PAYLOAD),
59+
messages_per_iter,
60+
)
61+
.await
62+
}
63+
});
64+
},
65+
);
66+
67+
// Benchmark 2: With validation disabled
68+
validation_group.bench_with_input(
69+
criterion::BenchmarkId::new("skip_validation", subject_len),
70+
&subject_len,
71+
|b, _| {
72+
let rt = tokio::runtime::Runtime::new().unwrap();
73+
let nc = rt.block_on(async {
74+
let nc = async_nats::ConnectOptions::new()
75+
.skip_subject_validation(true)
76+
.connect(server.client_url())
77+
.await
78+
.unwrap();
79+
nc.publish("data", "data".into()).await.unwrap();
80+
nc
81+
});
82+
83+
b.to_async(rt).iter_with_large_drop(move || {
84+
let nc = nc.clone();
85+
async move {
86+
publish_with_static_str(
87+
nc,
88+
subject_str,
89+
Bytes::from_static(PAYLOAD),
90+
messages_per_iter,
91+
)
92+
.await
93+
}
94+
});
95+
},
96+
);
97+
}
98+
99+
validation_group.finish();
100+
}
101+
102+
criterion_group!(subject_validation, publish_validation_comparison);
103+
criterion_main!(subject_validation);

async-nats/src/client.rs

Lines changed: 78 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use std::time::Duration;
3535
use thiserror::Error;
3636
use tokio::sync::{mpsc, oneshot};
3737
use tokio_util::sync::PollSender;
38-
use tracing::trace;
3938

4039
static VERSION_RE: Lazy<Regex> =
4140
Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
@@ -88,6 +87,7 @@ pub struct Client {
8887
request_timeout: Option<Duration>,
8988
max_payload: Arc<AtomicUsize>,
9089
connection_stats: Arc<Statistics>,
90+
skip_subject_validation: bool,
9191
}
9292

9393
pub mod traits {
@@ -100,30 +100,37 @@ pub mod traits {
100100
use super::{PublishError, Request, RequestError, SubscribeError};
101101

102102
pub trait Publisher {
103-
fn publish_with_reply<S: ToSubject, R: ToSubject>(
103+
fn publish_with_reply<S, R>(
104104
&self,
105105
subject: S,
106106
reply: R,
107107
payload: Bytes,
108-
) -> impl Future<Output = Result<(), PublishError>>;
108+
) -> impl Future<Output = Result<(), PublishError>>
109+
where
110+
S: ToSubject,
111+
R: ToSubject;
109112

110113
fn publish_message(
111114
&self,
112115
msg: message::OutboundMessage,
113116
) -> impl Future<Output = Result<(), PublishError>>;
114117
}
115118
pub trait Subscriber {
116-
fn subscribe<S: ToSubject>(
119+
fn subscribe<S>(
117120
&self,
118121
subject: S,
119-
) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
122+
) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
123+
where
124+
S: ToSubject;
120125
}
121126
pub trait Requester {
122-
fn send_request<S: ToSubject>(
127+
fn send_request<S>(
123128
&self,
124129
subject: S,
125130
request: Request,
126-
) -> impl Future<Output = Result<Message, RequestError>>;
131+
) -> impl Future<Output = Result<Message, RequestError>>
132+
where
133+
S: ToSubject;
127134
}
128135
pub trait TimeoutProvider {
129136
fn timeout(&self) -> Option<Duration>;
@@ -147,12 +154,16 @@ impl traits::TimeoutProvider for Client {
147154
}
148155

149156
impl traits::Publisher for Client {
150-
fn publish_with_reply<S: ToSubject, R: ToSubject>(
157+
fn publish_with_reply<S, R>(
151158
&self,
152159
subject: S,
153160
reply: R,
154161
payload: Bytes,
155-
) -> impl Future<Output = Result<(), PublishError>> {
162+
) -> impl Future<Output = Result<(), PublishError>>
163+
where
164+
S: ToSubject,
165+
R: ToSubject,
166+
{
156167
self.publish_with_reply(subject, reply, payload)
157168
}
158169

@@ -165,10 +176,10 @@ impl traits::Publisher for Client {
165176
}
166177

167178
impl traits::Subscriber for Client {
168-
fn subscribe<S: ToSubject>(
169-
&self,
170-
subject: S,
171-
) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
179+
fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
180+
where
181+
S: ToSubject,
182+
{
172183
self.subscribe(subject)
173184
}
174185
}
@@ -206,6 +217,7 @@ impl Client {
206217
request_timeout: Option<Duration>,
207218
max_payload: Arc<AtomicUsize>,
208219
statistics: Arc<Statistics>,
220+
skip_subject_validation: bool,
209221
) -> Client {
210222
let poll_sender = PollSender::new(sender.clone());
211223
Client {
@@ -219,7 +231,25 @@ impl Client {
219231
request_timeout,
220232
max_payload,
221233
connection_stats: statistics,
234+
skip_subject_validation,
235+
}
236+
}
237+
238+
/// Converts a subject to a [`Subject`], optionally validating it.
239+
///
240+
/// If subject validation is enabled (the default), the subject is validated
241+
/// before conversion. If validation is disabled via
242+
/// [`ConnectOptions::skip_subject_validation`], the subject is converted
243+
/// without validation.
244+
fn maybe_validate_subject<S: ToSubject>(
245+
&self,
246+
subject: S,
247+
) -> Result<crate::Subject, crate::subject::SubjectError> {
248+
let subject = subject.to_subject();
249+
if !self.skip_subject_validation && !subject.is_valid() {
250+
return Err(crate::subject::SubjectError::InvalidFormat);
222251
}
252+
Ok(subject)
223253
}
224254

225255
/// Returns the default timeout for requests set when creating the client.
@@ -318,7 +348,10 @@ impl Client {
318348
subject: S,
319349
payload: Bytes,
320350
) -> Result<(), PublishError> {
321-
let subject = subject.to_subject();
351+
let subject = self
352+
.maybe_validate_subject(subject)
353+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
354+
322355
let max_payload = self.max_payload.load(Ordering::Relaxed);
323356
if payload.len() > max_payload {
324357
return Err(PublishError::with_source(
@@ -367,7 +400,9 @@ impl Client {
367400
headers: HeaderMap,
368401
payload: Bytes,
369402
) -> Result<(), PublishError> {
370-
let subject = subject.to_subject();
403+
let subject = self
404+
.maybe_validate_subject(subject)
405+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
371406

372407
self.sender
373408
.send(Command::Publish(OutboundMessage {
@@ -402,8 +437,12 @@ impl Client {
402437
reply: R,
403438
payload: Bytes,
404439
) -> Result<(), PublishError> {
405-
let subject = subject.to_subject();
406-
let reply = reply.to_subject();
440+
let subject = self
441+
.maybe_validate_subject(subject)
442+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
443+
let reply = self
444+
.maybe_validate_subject(reply)
445+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
407446

408447
self.sender
409448
.send(Command::Publish(OutboundMessage {
@@ -441,8 +480,12 @@ impl Client {
441480
headers: HeaderMap,
442481
payload: Bytes,
443482
) -> Result<(), PublishError> {
444-
let subject = subject.to_subject();
445-
let reply = reply.to_subject();
483+
let subject = self
484+
.maybe_validate_subject(subject)
485+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
486+
let reply = self
487+
.maybe_validate_subject(reply)
488+
.map_err(|e| PublishError::with_source(PublishErrorKind::BadSubject, e))?;
446489

447490
self.sender
448491
.send(Command::Publish(OutboundMessage {
@@ -471,13 +514,6 @@ impl Client {
471514
subject: S,
472515
payload: Bytes,
473516
) -> Result<Message, RequestError> {
474-
let subject = subject.to_subject();
475-
476-
trace!(
477-
"request sent to subject: {} ({})",
478-
subject.as_ref(),
479-
payload.len()
480-
);
481517
let request = Request::new().payload(payload);
482518
self.send_request(subject, request).await
483519
}
@@ -503,8 +539,6 @@ impl Client {
503539
headers: HeaderMap,
504540
payload: Bytes,
505541
) -> Result<Message, RequestError> {
506-
let subject = subject.to_subject();
507-
508542
let request = Request::new().headers(headers).payload(payload);
509543
self.send_request(subject, request).await
510544
}
@@ -527,7 +561,9 @@ impl Client {
527561
subject: S,
528562
request: Request,
529563
) -> Result<Message, RequestError> {
530-
let subject = subject.to_subject();
564+
let subject = self
565+
.maybe_validate_subject(subject)
566+
.map_err(|e| RequestError::with_source(RequestErrorKind::Other, e))?;
531567

532568
if let Some(inbox) = request.inbox {
533569
let timeout = request.timeout.unwrap_or(self.request_timeout);
@@ -640,7 +676,13 @@ impl Client {
640676
/// # }
641677
/// ```
642678
pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
643-
let subject = subject.to_subject();
679+
let subject = self.maybe_validate_subject(subject).map_err(|e| {
680+
SubscribeError(Box::new(std::io::Error::new(
681+
std::io::ErrorKind::InvalidInput,
682+
e,
683+
)))
684+
})?;
685+
644686
let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
645687
let (sender, receiver) = mpsc::channel(self.subscription_capacity);
646688

@@ -677,7 +719,12 @@ impl Client {
677719
subject: S,
678720
queue_group: String,
679721
) -> Result<Subscriber, SubscribeError> {
680-
let subject = subject.to_subject();
722+
let subject = self.maybe_validate_subject(subject).map_err(|e| {
723+
SubscribeError(Box::new(std::io::Error::new(
724+
std::io::ErrorKind::InvalidInput,
725+
e,
726+
)))
727+
})?;
681728

682729
let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
683730
let (sender, receiver) = mpsc::channel(self.subscription_capacity);

async-nats/src/jetstream/consumer/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ impl<T: IntoConsumerConfig> Consumer<T> {
5858
/// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
5959
/// [Consumer] and returns it.
6060
///
61+
/// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead —
62+
/// message metadata often already contains the needed information and does not require a server call.
63+
///
6164
/// # Examples
6265
///
6366
/// ```no_run
@@ -85,6 +88,9 @@ impl<T: IntoConsumerConfig> Consumer<T> {
8588

8689
/// Retrieves `info` about [Consumer] from the server. Does not update the cache.
8790
///
91+
/// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead —
92+
/// message metadata often already contains the needed information and does not require a server call.
93+
///
8894
/// # Examples
8995
///
9096
/// ```no_run

0 commit comments

Comments
 (0)