Skip to content

Commit 4e8e72b

Browse files
authored
refactor: write and resolve should return the same type and Orchestrator to orchestrate retries (#3190)
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent 9adab30 commit 4e8e72b

3 files changed

Lines changed: 147 additions & 108 deletions

File tree

rust/numaflow-core/src/pipeline/isb.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use std::time::Duration;
77

88
use async_trait::async_trait;
9-
use tokio_util::sync::CancellationToken;
109

1110
use crate::error::Result;
1211
use crate::message::{Message, Offset};
@@ -92,28 +91,28 @@ impl std::fmt::Display for WriteError {
9291

9392
impl std::error::Error for WriteError {}
9493

95-
/// Result of resolving a pending write operation.
94+
/// Result of a write operation.
9695
///
9796
/// Contains the offset of the written message along with additional metadata
9897
/// that may be useful for logging, metrics, or debugging.
9998
#[derive(Debug, Clone)]
100-
pub struct ResolveResult {
99+
pub struct WriteResult {
101100
/// The offset of the written message
102101
pub offset: Offset,
103102
/// Whether this was a duplicate message (already existed in the buffer)
104103
pub is_duplicate: bool,
105104
}
106105

107-
impl ResolveResult {
108-
/// Creates a new ResolveResult with the given offset and no duplicate flag.
106+
impl WriteResult {
107+
/// Creates a new WriteResult with the given offset and no duplicate flag.
109108
pub fn new(offset: Offset) -> Self {
110109
Self {
111110
offset,
112111
is_duplicate: false,
113112
}
114113
}
115114

116-
/// Creates a new ResolveResult marked as a duplicate.
115+
/// Creates a new WriteResult marked as a duplicate.
117116
pub fn duplicate(offset: Offset) -> Self {
118117
Self {
119118
offset,
@@ -128,7 +127,11 @@ impl ResolveResult {
128127
/// 1. **High-performance async pattern**: Use `async_write()` to get a `PendingWrite` handle
129128
/// immediately, then resolve it later with `resolve()`. This allows batching acknowledgments
130129
/// for higher throughput.
131-
/// 2. **Simple blocking pattern**: Use `blocking_write()` which blocks until the write is confirmed.
130+
/// 2. **Simple synchronous pattern**: Use `write()` which writes and waits for confirmation
131+
/// in a single call. This is useful as a fallback when async writes fail.
132+
///
133+
/// Both `resolve()` and `write()` return `Result<WriteResult, WriteError>` for consistency.
134+
/// The orchestrator is responsible for retry logic and handling cancellation.
132135
///
133136
/// Implementations must be cheaply cloneable (e.g., using Arc internally).
134137
#[async_trait]
@@ -156,7 +159,7 @@ pub(crate) trait ISBWriter: Send + Sync + Clone {
156159

157160
/// Resolves a pending write to get the result.
158161
///
159-
/// This waits for the write acknowledgment and returns a `ResolveResult` containing
162+
/// This waits for the write acknowledgment and returns a `WriteResult` containing
160163
/// the offset of the written message along with additional metadata (e.g., whether
161164
/// the message was a duplicate).
162165
///
@@ -165,21 +168,19 @@ pub(crate) trait ISBWriter: Send + Sync + Clone {
165168
async fn resolve(
166169
&self,
167170
pending: Self::PendingWrite,
168-
) -> std::result::Result<ResolveResult, WriteError>;
171+
) -> std::result::Result<WriteResult, WriteError>;
169172

170-
/// Writes a message and blocks until confirmed, returning the offset.
173+
/// Writes a message and waits for confirmation, returning the result.
174+
///
175+
/// This is a single-attempt write that returns immediately after the write
176+
/// completes or fails. The orchestrator is responsible for retry logic.
171177
///
172-
/// This is the simple blocking write method with infinite retries until success
173-
/// or cancellation. Use this when you don't need the high-performance async pattern.
178+
/// Returns `Err(WriteError::BufferFull)` if the buffer is full.
179+
/// Returns `Err(WriteError::WriteFailed)` if the write operation fails.
174180
///
175181
/// # Arguments
176182
/// * `message` - The message to write
177-
/// * `cln_token` - Cancellation token for graceful shutdown
178-
async fn blocking_write(
179-
&self,
180-
message: Message,
181-
cln_token: CancellationToken,
182-
) -> std::result::Result<Offset, WriteError>;
183+
async fn write(&self, message: Message) -> std::result::Result<WriteResult, WriteError>;
183184

184185
/// Returns the name/identifier of this writer (e.g., stream name).
185186
#[allow(dead_code)]

rust/numaflow-core/src/pipeline/isb/jetstream/js_writer.rs

Lines changed: 81 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ use async_nats::jetstream::Context;
66
use async_nats::jetstream::consumer::PullConsumer;
77
use async_nats::jetstream::context::{PublishAckFuture, PublishErrorKind};
88
use async_nats::jetstream::message::PublishMessage;
9-
use async_nats::jetstream::publish::PublishAck;
109
use async_nats::jetstream::stream::RetentionPolicy::Limits;
1110
use bytes::{Bytes, BytesMut};
12-
use tokio::time::{Instant, sleep};
11+
use tokio::time::Instant;
1312
use tokio_util::sync::CancellationToken;
1413
use tracing::{debug, error};
1514

@@ -44,7 +43,6 @@ struct BufferInfo {
4443
num_ack_pending: usize,
4544
}
4645

47-
const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10;
4846
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1;
4947

5048
/// Lightweight JetStream Writer for a single stream.
@@ -212,64 +210,82 @@ impl JetStreamWriter {
212210
}
213211
}
214212

215-
/// Writes the message to the JetStream ISB and returns the PublishAck. It will do
216-
/// infinite retries until the message gets published successfully. If it returns
217-
/// an error it means it is fatal non-retryable error.
218-
pub(crate) async fn blocking_write(
213+
/// Writes a message to the JetStream ISB and waits for confirmation.
214+
///
215+
/// This is a single-attempt write that returns immediately after the write
216+
/// completes or fails. The orchestrator is responsible for retry logic.
217+
///
218+
/// Returns `Err(WriteError::BufferFull)` if the buffer is full.
219+
/// Returns `Err(WriteError::WriteFailed)` if the write operation fails.
220+
pub(crate) async fn write(
219221
&self,
220222
message: Message,
221-
cln_token: CancellationToken,
222-
) -> Result<PublishAck> {
223+
) -> std::result::Result<crate::pipeline::isb::WriteResult, WriteError> {
223224
let start_time = Instant::now();
224225

226+
// Check if buffer is full
227+
if self.is_full() {
228+
pipeline_metrics()
229+
.jetstream_isb
230+
.isfull_total
231+
.get_or_create(&self.buffer_labels)
232+
.inc();
233+
return Err(WriteError::BufferFull);
234+
}
235+
225236
// Compress the message value if compression is enabled
226237
let mut message = message;
227-
message.value = match self.compression_type {
228-
Some(compression_type) => bytes::Bytes::from(
229-
compression::compress(compression_type, &message.value).map_err(|e| {
230-
Error::ISB(ISBError::Encode(format!("Compression failed: {}", e)))
231-
})?,
232-
),
233-
None => message.value,
234-
};
238+
if let Some(compression_type) = self.compression_type {
239+
message.value = bytes::Bytes::from(
240+
compression::compress(compression_type, &message.value)
241+
.map_err(|e| WriteError::WriteFailed(format!("Compression failed: {}", e)))?,
242+
);
243+
}
235244

236245
let payload: Bytes = message
237246
.try_into()
238247
.expect("message serialization should not fail");
239248

240-
loop {
241-
match self.js_ctx.publish(self.stream.name, payload.clone()).await {
242-
Ok(paf) => match paf.await {
243-
Ok(ack) => {
244-
debug!(
245-
elapsed_ms = start_time.elapsed().as_millis(),
246-
"Blocking write successful in",
247-
);
248-
pipeline_metrics()
249-
.jetstream_isb
250-
.write_time_total
251-
.get_or_create(&self.buffer_labels)
252-
.observe(start_time.elapsed().as_micros() as f64);
253-
return Ok(ack);
254-
}
255-
Err(e) => {
256-
error!(?e, "awaiting publish ack failed, retrying");
257-
sleep(Duration::from_millis(10)).await;
258-
}
259-
},
260-
Err(e) => {
249+
// Publish and await acknowledgment
250+
match self.js_ctx.publish(self.stream.name, payload).await {
251+
Ok(paf) => match paf.await {
252+
Ok(ack) => {
253+
debug!(
254+
elapsed_ms = start_time.elapsed().as_millis(),
255+
"Write successful",
256+
);
261257
pipeline_metrics()
262258
.jetstream_isb
263-
.write_error_total
259+
.write_time_total
264260
.get_or_create(&self.buffer_labels)
265-
.inc();
266-
error!(?e, "publishing failed, retrying");
267-
sleep(Duration::from_millis(DEFAULT_RETRY_INTERVAL_MILLIS)).await;
261+
.observe(start_time.elapsed().as_micros() as f64);
262+
263+
// Convert sequence number to Offset using the stream's partition
264+
let offset = crate::message::Offset::Int(crate::message::IntOffset::new(
265+
ack.sequence as i64,
266+
self.stream.partition,
267+
));
268+
269+
// Check if this was a duplicate message
270+
if ack.duplicate {
271+
Ok(crate::pipeline::isb::WriteResult::duplicate(offset))
272+
} else {
273+
Ok(crate::pipeline::isb::WriteResult::new(offset))
274+
}
268275
}
269-
}
270-
271-
if cln_token.is_cancelled() {
272-
return Err(Error::Cancelled());
276+
Err(e) => {
277+
error!(?e, "awaiting publish ack failed");
278+
Err(WriteError::WriteFailed(e.to_string()))
279+
}
280+
},
281+
Err(e) => {
282+
pipeline_metrics()
283+
.jetstream_isb
284+
.write_error_total
285+
.get_or_create(&self.buffer_labels)
286+
.inc();
287+
error!(?e, "publishing failed");
288+
Err(WriteError::WriteFailed(e.to_string()))
273289
}
274290
}
275291
}
@@ -395,7 +411,7 @@ impl crate::pipeline::isb::ISBWriter for JetStreamWriter {
395411
async fn resolve(
396412
&self,
397413
pending: Self::PendingWrite,
398-
) -> std::result::Result<crate::pipeline::isb::ResolveResult, crate::pipeline::isb::WriteError>
414+
) -> std::result::Result<crate::pipeline::isb::WriteResult, crate::pipeline::isb::WriteError>
399415
{
400416
// Await the PAF to get the PublishAck
401417
let ack = pending
@@ -410,28 +426,22 @@ impl crate::pipeline::isb::ISBWriter for JetStreamWriter {
410426

411427
// Check if this was a duplicate message
412428
if ack.duplicate {
413-
Ok(crate::pipeline::isb::ResolveResult::duplicate(offset))
429+
Ok(crate::pipeline::isb::WriteResult::duplicate(offset))
414430
} else {
415-
Ok(crate::pipeline::isb::ResolveResult::new(offset))
431+
Ok(crate::pipeline::isb::WriteResult::new(offset))
416432
}
417433
}
418434

419-
async fn blocking_write(
435+
async fn write(
420436
&self,
421437
message: Message,
422-
cln_token: CancellationToken,
423-
) -> std::result::Result<crate::message::Offset, crate::pipeline::isb::WriteError> {
424-
// Use the existing blocking_write which handles retries internally
425-
let ack = self
426-
.blocking_write(message, cln_token)
427-
.await
428-
.map_err(|e| crate::pipeline::isb::WriteError::WriteFailed(e.to_string()))?;
429-
430-
// Convert sequence number to Offset using the stream's partition
431-
Ok(crate::message::Offset::Int(crate::message::IntOffset::new(
432-
ack.sequence as i64,
433-
self.stream.partition,
434-
)))
438+
) -> std::result::Result<crate::pipeline::isb::WriteResult, crate::pipeline::isb::WriteError>
439+
{
440+
// Delegate to the inherent write method
441+
self.write(message).await.map_err(|e| match e {
442+
WriteError::BufferFull => crate::pipeline::isb::WriteError::BufferFull,
443+
WriteError::WriteFailed(msg) => crate::pipeline::isb::WriteError::WriteFailed(msg),
444+
})
435445
}
436446

437447
fn name(&self) -> &'static str {
@@ -688,13 +698,13 @@ mod tests {
688698

689699
#[cfg(feature = "nats-tests")]
690700
#[tokio::test]
691-
async fn test_blocking_write_with_compression() {
701+
async fn test_write_with_compression() {
692702
let js_url = "localhost:4222";
693703
let client = async_nats::connect(js_url).await.unwrap();
694704
let context = jetstream::new(client);
695705
let cln_token = CancellationToken::new();
696706

697-
let stream = Stream::new("test-blocking-compress", "temp", 0);
707+
let stream = Stream::new("test-write-compress", "temp", 0);
698708
let _ = context.delete_stream(stream.name).await;
699709
let _stream = context
700710
.get_or_create_stream(stream::Config {
@@ -752,11 +762,12 @@ mod tests {
752762
..Default::default()
753763
};
754764

755-
let result = writer.blocking_write(message, cln_token).await;
756-
assert!(
757-
result.is_ok(),
758-
"blocking_write with compression should succeed"
759-
);
765+
let result = writer.write(message).await;
766+
assert!(result.is_ok(), "write with compression should succeed");
767+
768+
// Verify the WriteResult has the expected structure
769+
let write_result = result.unwrap();
770+
assert!(!write_result.is_duplicate, "should not be a duplicate");
760771

761772
context.delete_stream(stream.name).await.unwrap();
762773
}

0 commit comments

Comments
 (0)