11use crate :: models:: AmqpConfig ;
2- use crate :: traits:: { BoxFuture , CommitFunc , MessageConsumer , MessagePublisher } ;
2+ use crate :: traits:: { BatchCommitFunc , BoxFuture , MessageConsumer , MessagePublisher } ;
33use crate :: CanonicalMessage ;
44use anyhow:: anyhow;
55use async_trait:: async_trait;
6+ use futures:: TryStreamExt ;
67use lapin:: tcp:: { OwnedIdentity , OwnedTLSConfig } ;
78use lapin:: {
89 options:: {
910 BasicAckOptions , BasicConsumeOptions , BasicPublishOptions , BasicQosOptions ,
1011 QueueDeclareOptions ,
1112 } ,
12- types:: FieldTable ,
13+ types:: { FieldTable , ShortString } ,
1314 BasicProperties , Channel , Connection , ConnectionProperties , Consumer ,
1415} ;
1516use std:: any:: Any ;
@@ -20,7 +21,8 @@ pub struct AmqpPublisher {
2021 channel : Channel ,
2122 exchange : String ,
2223 routing_key : String ,
23- await_ack : bool ,
24+ no_persistence : bool ,
25+ delayed_ack : bool ,
2426}
2527
2628impl AmqpPublisher {
@@ -37,7 +39,10 @@ impl AmqpPublisher {
3739 channel
3840 . queue_declare (
3941 routing_key,
40- QueueDeclareOptions :: default ( ) ,
42+ QueueDeclareOptions {
43+ durable : !config. no_persistence ,
44+ ..Default :: default ( )
45+ } ,
4146 FieldTable :: default ( ) ,
4247 )
4348 . await ?;
@@ -46,7 +51,8 @@ impl AmqpPublisher {
4651 channel,
4752 exchange : "" . to_string ( ) , // Default exchange
4853 routing_key : routing_key. to_string ( ) ,
49- await_ack : config. await_ack ,
54+ no_persistence : config. no_persistence ,
55+ delayed_ack : config. delayed_ack ,
5056 } )
5157 }
5258
@@ -55,26 +61,32 @@ impl AmqpPublisher {
5561 channel : self . channel . clone ( ) ,
5662 exchange : self . exchange . clone ( ) ,
5763 routing_key : routing_key. to_string ( ) ,
58- await_ack : self . await_ack ,
64+ no_persistence : self . no_persistence ,
65+ delayed_ack : self . delayed_ack ,
5966 }
6067 }
6168}
6269
6370#[ async_trait]
6471impl MessagePublisher for AmqpPublisher {
6572 async fn send ( & self , message : CanonicalMessage ) -> anyhow:: Result < Option < CanonicalMessage > > {
66- let mut properties = BasicProperties :: default ( ) ;
73+ let mut properties = if self . no_persistence {
74+ BasicProperties :: default ( )
75+ } else {
76+ // Delivery mode 2 makes the message persistent
77+ BasicProperties :: default ( ) . with_delivery_mode ( 2 )
78+ } ;
6779 if let Some ( metadata) = message. metadata {
6880 if !metadata. is_empty ( ) {
6981 let mut table = FieldTable :: default ( ) ;
7082 for ( key, value) in metadata {
7183 table. insert (
72- key . into ( ) ,
84+ ShortString :: from ( key ) ,
7385 lapin:: types:: AMQPValue :: LongString ( value. into ( ) ) ,
7486 ) ;
7587 }
7688 properties = properties. with_headers ( table) ;
77- }
89+ }
7890 }
7991
8092 let confirmation = self
@@ -88,13 +100,24 @@ impl MessagePublisher for AmqpPublisher {
88100 )
89101 . await ?;
90102
91- if self . await_ack {
103+ if ! self . delayed_ack {
92104 // Wait for the broker's publisher confirmation.
93105 confirmation. await ?;
94106 }
95107 Ok ( None )
96108 }
97109
110+ // This isn't a real bulk send, but the normal send is fast enough.
111+ async fn send_batch (
112+ & self ,
113+ messages : Vec < CanonicalMessage > ,
114+ ) -> anyhow:: Result < ( Option < Vec < CanonicalMessage > > , Vec < CanonicalMessage > ) > {
115+ crate :: traits:: send_batch_helper ( self , messages, |publisher, message| {
116+ Box :: pin ( publisher. send ( message) )
117+ } )
118+ . await
119+ }
120+
98121 fn as_any ( & self ) -> & dyn Any {
99122 self
100123 }
@@ -111,7 +134,14 @@ impl AmqpConsumer {
111134
112135 info ! ( queue = %queue, "Declaring AMQP queue" ) ;
113136 channel
114- . queue_declare ( queue, QueueDeclareOptions :: default ( ) , FieldTable :: default ( ) )
137+ . queue_declare (
138+ queue,
139+ QueueDeclareOptions {
140+ durable : !config. no_persistence ,
141+ ..Default :: default ( )
142+ } ,
143+ FieldTable :: default ( ) ,
144+ )
115145 . await ?;
116146
117147 // Set prefetch count. This acts as a buffer and is crucial for concurrent processing.
@@ -150,7 +180,8 @@ async fn create_amqp_connection(config: &AmqpConfig) -> anyhow::Result<Connectio
150180
151181 let mut last_error = None ;
152182 for attempt in 1 ..=5 {
153- info ! ( url = %conn_uri, attempt = attempt, "Attempting to connect to AMQP broker" ) ;
183+ // Avoid logging credentials embedded in URLs.
184+ info ! ( attempt = attempt, "Attempting to connect to AMQP broker" ) ;
154185 let conn_props = ConnectionProperties :: default ( ) ;
155186 let result = if config. tls . required {
156187 let tls_config = build_tls_config ( config) . await ?;
@@ -193,42 +224,85 @@ async fn build_tls_config(config: &AmqpConfig) -> anyhow::Result<OwnedTLSConfig>
193224 } )
194225}
195226
227+ fn delivery_to_canonical_message ( delivery : & lapin:: message:: Delivery ) -> CanonicalMessage {
228+ let mut canonical_message = CanonicalMessage :: new ( delivery. data . clone ( ) ) ;
229+ if let Some ( headers) = delivery. properties . headers ( ) . as_ref ( ) {
230+ if !headers. inner ( ) . is_empty ( ) {
231+ let mut metadata = std:: collections:: HashMap :: new ( ) ;
232+ for ( key, value) in headers. inner ( ) . iter ( ) {
233+ let value_str = match value {
234+ lapin:: types:: AMQPValue :: LongString ( s) => s. to_string ( ) ,
235+ lapin:: types:: AMQPValue :: ShortString ( s) => s. to_string ( ) ,
236+ lapin:: types:: AMQPValue :: Boolean ( b) => b. to_string ( ) ,
237+ lapin:: types:: AMQPValue :: LongInt ( i) => i. to_string ( ) ,
238+ _ => continue ,
239+ } ;
240+ metadata. insert ( key. to_string ( ) , value_str) ;
241+ }
242+ if !metadata. is_empty ( ) {
243+ canonical_message. metadata = Some ( metadata) ;
244+ }
245+ }
246+ }
247+ canonical_message
248+ }
249+
196250#[ async_trait]
197251impl MessageConsumer for AmqpConsumer {
198- async fn receive ( & mut self ) -> anyhow:: Result < ( CanonicalMessage , CommitFunc ) > {
199- let delivery = futures:: StreamExt :: next ( & mut self . consumer )
252+ async fn receive_batch (
253+ & mut self ,
254+ max_messages : usize ,
255+ ) -> anyhow:: Result < ( Vec < CanonicalMessage > , BatchCommitFunc ) > {
256+ if max_messages == 0 {
257+ return Ok ( ( Vec :: new ( ) , Box :: new ( |_| Box :: pin ( async { } ) ) ) ) ;
258+ }
259+
260+ // 1. Wait for the first message. This will block until a message is available.
261+ let mut last_delivery = futures:: StreamExt :: next ( & mut self . consumer )
200262 . await
201263 . ok_or_else ( || anyhow ! ( "AMQP consumer stream ended" ) ) ??;
202264
203- let mut message = CanonicalMessage :: new ( delivery. data . clone ( ) ) ;
204- if let Some ( headers) = delivery. properties . headers ( ) . as_ref ( ) {
205- if !headers. inner ( ) . is_empty ( ) {
206- let mut metadata = std:: collections:: HashMap :: new ( ) ;
207- for ( key, value) in headers. inner ( ) . iter ( ) {
208- if let lapin:: types:: AMQPValue :: LongString ( s) = value {
209- metadata. insert ( key. to_string ( ) , s. to_string ( ) ) ;
210- }
265+ let mut messages = Vec :: with_capacity ( max_messages) ;
266+ messages. push ( delivery_to_canonical_message ( & last_delivery) ) ;
267+
268+ // 2. Greedily consume more messages if they are already buffered, up to max_messages.
269+ while messages. len ( ) < max_messages {
270+ match self . consumer . try_next ( ) . await {
271+ Ok ( Some ( delivery) ) => {
272+ messages. push ( delivery_to_canonical_message ( & delivery) ) ;
273+ last_delivery = delivery;
211274 }
212- if !metadata. is_empty ( ) {
213- message. metadata = Some ( metadata) ;
275+ Ok ( None ) => break , // No more messages in the buffer
276+ Err ( e) => {
277+ // An error occurred, but we have some messages. Process them and let the next call handle the error.
278+ tracing:: warn!( "Error receiving subsequent AMQP message: {}" , e) ;
279+ break ;
214280 }
215281 }
216282 }
217283
218- let commit = Box :: new ( move |_response| {
284+ // 3. Create a commit function that acks all received messages.
285+ let messages_len = messages. len ( ) ;
286+ let commit = Box :: new ( move |_response : Option < Vec < CanonicalMessage > > | {
219287 Box :: pin ( async move {
220- delivery
221- . ack ( BasicAckOptions :: default ( ) )
222- . await
223- . expect ( "Failed to ack AMQP message" ) ;
224- debug ! (
225- delivery_tag = delivery. delivery_tag,
226- "AMQP message acknowledged"
227- ) ;
288+ let ack_options = BasicAckOptions {
289+ // Use multiple: true only if we've consumed more than one message.
290+ multiple : messages_len > 1 ,
291+ ..Default :: default ( )
292+ } ;
293+ if let Err ( e) = last_delivery. ack ( ack_options) . await {
294+ tracing:: error!( last_delivery_tag = last_delivery. delivery_tag, error = %e, "Failed to bulk-ack AMQP messages" ) ;
295+ } else {
296+ debug ! (
297+ last_delivery_tag = last_delivery. delivery_tag,
298+ count = messages_len,
299+ "Bulk-acknowledged AMQP messages"
300+ ) ;
301+ }
228302 } ) as BoxFuture < ' static , ( ) >
229303 } ) ;
230304
231- Ok ( ( message , commit) )
305+ Ok ( ( messages , commit) )
232306 }
233307
234308 fn as_any ( & self ) -> & dyn Any {
0 commit comments