@@ -6,10 +6,9 @@ use async_nats::jetstream::Context;
66use async_nats:: jetstream:: consumer:: PullConsumer ;
77use async_nats:: jetstream:: context:: { PublishAckFuture , PublishErrorKind } ;
88use async_nats:: jetstream:: message:: PublishMessage ;
9- use async_nats:: jetstream:: publish:: PublishAck ;
109use async_nats:: jetstream:: stream:: RetentionPolicy :: Limits ;
1110use bytes:: { Bytes , BytesMut } ;
12- use tokio:: time:: { Instant , sleep } ;
11+ use tokio:: time:: Instant ;
1312use tokio_util:: sync:: CancellationToken ;
1413use tracing:: { debug, error} ;
1514
@@ -44,7 +43,6 @@ struct BufferInfo {
4443 num_ack_pending : usize ,
4544}
4645
47- const DEFAULT_RETRY_INTERVAL_MILLIS : u64 = 10 ;
4846const DEFAULT_REFRESH_INTERVAL_SECS : u64 = 1 ;
4947
5048/// Lightweight JetStream Writer for a single stream.
@@ -212,64 +210,82 @@ impl JetStreamWriter {
212210 }
213211 }
214212
215- /// Writes the message to the JetStream ISB and returns the PublishAck. It will do
216- /// infinite retries until the message gets published successfully. If it returns
217- /// an error it means it is fatal non-retryable error.
218- pub ( crate ) async fn blocking_write (
213+ /// Writes a message to the JetStream ISB and waits for confirmation.
214+ ///
215+ /// This is a single-attempt write that returns immediately after the write
216+ /// completes or fails. The orchestrator is responsible for retry logic.
217+ ///
218+ /// Returns `Err(WriteError::BufferFull)` if the buffer is full.
219+ /// Returns `Err(WriteError::WriteFailed)` if the write operation fails.
220+ pub ( crate ) async fn write (
219221 & self ,
220222 message : Message ,
221- cln_token : CancellationToken ,
222- ) -> Result < PublishAck > {
223+ ) -> std:: result:: Result < crate :: pipeline:: isb:: WriteResult , WriteError > {
223224 let start_time = Instant :: now ( ) ;
224225
226+ // Check if buffer is full
227+ if self . is_full ( ) {
228+ pipeline_metrics ( )
229+ . jetstream_isb
230+ . isfull_total
231+ . get_or_create ( & self . buffer_labels )
232+ . inc ( ) ;
233+ return Err ( WriteError :: BufferFull ) ;
234+ }
235+
225236 // Compress the message value if compression is enabled
226237 let mut message = message;
227- message. value = match self . compression_type {
228- Some ( compression_type) => bytes:: Bytes :: from (
229- compression:: compress ( compression_type, & message. value ) . map_err ( |e| {
230- Error :: ISB ( ISBError :: Encode ( format ! ( "Compression failed: {}" , e) ) )
231- } ) ?,
232- ) ,
233- None => message. value ,
234- } ;
238+ if let Some ( compression_type) = self . compression_type {
239+ message. value = bytes:: Bytes :: from (
240+ compression:: compress ( compression_type, & message. value )
241+ . map_err ( |e| WriteError :: WriteFailed ( format ! ( "Compression failed: {}" , e) ) ) ?,
242+ ) ;
243+ }
235244
236245 let payload: Bytes = message
237246 . try_into ( )
238247 . expect ( "message serialization should not fail" ) ;
239248
240- loop {
241- match self . js_ctx . publish ( self . stream . name , payload. clone ( ) ) . await {
242- Ok ( paf) => match paf. await {
243- Ok ( ack) => {
244- debug ! (
245- elapsed_ms = start_time. elapsed( ) . as_millis( ) ,
246- "Blocking write successful in" ,
247- ) ;
248- pipeline_metrics ( )
249- . jetstream_isb
250- . write_time_total
251- . get_or_create ( & self . buffer_labels )
252- . observe ( start_time. elapsed ( ) . as_micros ( ) as f64 ) ;
253- return Ok ( ack) ;
254- }
255- Err ( e) => {
256- error ! ( ?e, "awaiting publish ack failed, retrying" ) ;
257- sleep ( Duration :: from_millis ( 10 ) ) . await ;
258- }
259- } ,
260- Err ( e) => {
249+ // Publish and await acknowledgment
250+ match self . js_ctx . publish ( self . stream . name , payload) . await {
251+ Ok ( paf) => match paf. await {
252+ Ok ( ack) => {
253+ debug ! (
254+ elapsed_ms = start_time. elapsed( ) . as_millis( ) ,
255+ "Write successful" ,
256+ ) ;
261257 pipeline_metrics ( )
262258 . jetstream_isb
263- . write_error_total
259+ . write_time_total
264260 . get_or_create ( & self . buffer_labels )
265- . inc ( ) ;
266- error ! ( ?e, "publishing failed, retrying" ) ;
267- sleep ( Duration :: from_millis ( DEFAULT_RETRY_INTERVAL_MILLIS ) ) . await ;
261+ . observe ( start_time. elapsed ( ) . as_micros ( ) as f64 ) ;
262+
263+ // Convert sequence number to Offset using the stream's partition
264+ let offset = crate :: message:: Offset :: Int ( crate :: message:: IntOffset :: new (
265+ ack. sequence as i64 ,
266+ self . stream . partition ,
267+ ) ) ;
268+
269+ // Check if this was a duplicate message
270+ if ack. duplicate {
271+ Ok ( crate :: pipeline:: isb:: WriteResult :: duplicate ( offset) )
272+ } else {
273+ Ok ( crate :: pipeline:: isb:: WriteResult :: new ( offset) )
274+ }
268275 }
269- }
270-
271- if cln_token. is_cancelled ( ) {
272- return Err ( Error :: Cancelled ( ) ) ;
276+ Err ( e) => {
277+ error ! ( ?e, "awaiting publish ack failed" ) ;
278+ Err ( WriteError :: WriteFailed ( e. to_string ( ) ) )
279+ }
280+ } ,
281+ Err ( e) => {
282+ pipeline_metrics ( )
283+ . jetstream_isb
284+ . write_error_total
285+ . get_or_create ( & self . buffer_labels )
286+ . inc ( ) ;
287+ error ! ( ?e, "publishing failed" ) ;
288+ Err ( WriteError :: WriteFailed ( e. to_string ( ) ) )
273289 }
274290 }
275291 }
@@ -395,7 +411,7 @@ impl crate::pipeline::isb::ISBWriter for JetStreamWriter {
395411 async fn resolve (
396412 & self ,
397413 pending : Self :: PendingWrite ,
398- ) -> std:: result:: Result < crate :: pipeline:: isb:: ResolveResult , crate :: pipeline:: isb:: WriteError >
414+ ) -> std:: result:: Result < crate :: pipeline:: isb:: WriteResult , crate :: pipeline:: isb:: WriteError >
399415 {
400416 // Await the PAF to get the PublishAck
401417 let ack = pending
@@ -410,28 +426,22 @@ impl crate::pipeline::isb::ISBWriter for JetStreamWriter {
410426
411427 // Check if this was a duplicate message
412428 if ack. duplicate {
413- Ok ( crate :: pipeline:: isb:: ResolveResult :: duplicate ( offset) )
429+ Ok ( crate :: pipeline:: isb:: WriteResult :: duplicate ( offset) )
414430 } else {
415- Ok ( crate :: pipeline:: isb:: ResolveResult :: new ( offset) )
431+ Ok ( crate :: pipeline:: isb:: WriteResult :: new ( offset) )
416432 }
417433 }
418434
419- async fn blocking_write (
435+ async fn write (
420436 & self ,
421437 message : Message ,
422- cln_token : CancellationToken ,
423- ) -> std:: result:: Result < crate :: message:: Offset , crate :: pipeline:: isb:: WriteError > {
424- // Use the existing blocking_write which handles retries internally
425- let ack = self
426- . blocking_write ( message, cln_token)
427- . await
428- . map_err ( |e| crate :: pipeline:: isb:: WriteError :: WriteFailed ( e. to_string ( ) ) ) ?;
429-
430- // Convert sequence number to Offset using the stream's partition
431- Ok ( crate :: message:: Offset :: Int ( crate :: message:: IntOffset :: new (
432- ack. sequence as i64 ,
433- self . stream . partition ,
434- ) ) )
438+ ) -> std:: result:: Result < crate :: pipeline:: isb:: WriteResult , crate :: pipeline:: isb:: WriteError >
439+ {
440+ // Delegate to the inherent write method
441+ self . write ( message) . await . map_err ( |e| match e {
442+ WriteError :: BufferFull => crate :: pipeline:: isb:: WriteError :: BufferFull ,
443+ WriteError :: WriteFailed ( msg) => crate :: pipeline:: isb:: WriteError :: WriteFailed ( msg) ,
444+ } )
435445 }
436446
437447 fn name ( & self ) -> & ' static str {
@@ -688,13 +698,13 @@ mod tests {
688698
689699 #[ cfg( feature = "nats-tests" ) ]
690700 #[ tokio:: test]
691- async fn test_blocking_write_with_compression ( ) {
701+ async fn test_write_with_compression ( ) {
692702 let js_url = "localhost:4222" ;
693703 let client = async_nats:: connect ( js_url) . await . unwrap ( ) ;
694704 let context = jetstream:: new ( client) ;
695705 let cln_token = CancellationToken :: new ( ) ;
696706
697- let stream = Stream :: new ( "test-blocking -compress" , "temp" , 0 ) ;
707+ let stream = Stream :: new ( "test-write -compress" , "temp" , 0 ) ;
698708 let _ = context. delete_stream ( stream. name ) . await ;
699709 let _stream = context
700710 . get_or_create_stream ( stream:: Config {
@@ -752,11 +762,12 @@ mod tests {
752762 ..Default :: default ( )
753763 } ;
754764
755- let result = writer. blocking_write ( message, cln_token) . await ;
756- assert ! (
757- result. is_ok( ) ,
758- "blocking_write with compression should succeed"
759- ) ;
765+ let result = writer. write ( message) . await ;
766+ assert ! ( result. is_ok( ) , "write with compression should succeed" ) ;
767+
768+ // Verify the WriteResult has the expected structure
769+ let write_result = result. unwrap ( ) ;
770+ assert ! ( !write_result. is_duplicate, "should not be a duplicate" ) ;
760771
761772 context. delete_stream ( stream. name ) . await . unwrap ( ) ;
762773 }
0 commit comments