@@ -207,39 +207,37 @@ impl Sink for RabbitmqSink {
207207 return Ok ( ( ) ) ;
208208 }
209209
210- // Prepare all messages first.
211- let mut prepared: Vec < ( String , String , Vec < u8 > ) > = Vec :: with_capacity ( events. len ( ) ) ;
212-
213- for event in & events {
214- // Resolve exchange from event metadata or config.
215- let exchange = self . resolve_exchange ( event) . ok_or_else ( || {
216- etl:: etl_error!(
217- etl:: error:: ErrorKind :: ConfigError ,
218- "No exchange configured" ,
219- "Exchange must be provided in sink config or event metadata"
220- )
221- } ) ?;
222-
223- // Resolve routing key from event metadata or config.
224- let routing_key = self . resolve_routing_key ( event) . ok_or_else ( || {
225- etl:: etl_error!(
226- etl:: error:: ErrorKind :: ConfigError ,
227- "No routing_key configured" ,
228- "Routing key must be provided in sink config or event metadata"
229- )
230- } ) ?;
231-
232- // Serialize payload to JSON.
233- let payload = serde_json:: to_vec ( & event. payload ) . map_err ( |e| {
234- etl:: etl_error!(
235- etl:: error:: ErrorKind :: InvalidData ,
236- "Failed to serialize payload to JSON" ,
237- e. to_string( )
238- )
239- } ) ?;
210+ // Resolve routing and serialize payloads upfront (fail fast, minimize lock time).
211+ let prepared: Vec < _ > = events
212+ . into_iter ( )
213+ . map ( |event| {
214+ let exchange = self . resolve_exchange ( & event) . ok_or_else ( || {
215+ etl:: etl_error!(
216+ etl:: error:: ErrorKind :: ConfigError ,
217+ "No exchange configured" ,
218+ "Exchange must be provided in sink config or event metadata"
219+ )
220+ } ) ?;
240221
241- prepared. push ( ( exchange. to_string ( ) , routing_key. to_string ( ) , payload) ) ;
242- }
222+ let routing_key = self . resolve_routing_key ( & event) . ok_or_else ( || {
223+ etl:: etl_error!(
224+ etl:: error:: ErrorKind :: ConfigError ,
225+ "No routing_key configured" ,
226+ "Routing key must be provided in sink config or event metadata"
227+ )
228+ } ) ?;
229+
230+ let payload = serde_json:: to_vec ( & event. payload ) . map_err ( |e| {
231+ etl:: etl_error!(
232+ etl:: error:: ErrorKind :: InvalidData ,
233+ "Failed to serialize payload to JSON" ,
234+ e. to_string( )
235+ )
236+ } ) ?;
237+
238+ Ok ( ( exchange. to_string ( ) , routing_key. to_string ( ) , payload) )
239+ } )
240+ . collect :: < EtlResult < Vec < _ > > > ( ) ?;
243241
244242 // Fire all publishes and collect confirms (holding lock minimally).
245243 let confirms = {
0 commit comments