@@ -17,6 +17,7 @@ use tracing::{debug, error, info, warn};
1717
1818use crate :: msg:: pending_message:: { MessageContext , PendingMessage } ;
1919use crate :: relay_api:: metrics:: RelayApiMetrics ;
20+ use crate :: relay_api:: ExtractedMessage ;
2021
2122/// Bounded cache for tracking recently submitted tx hashes to prevent replay attacks
2223pub struct TxHashCache {
@@ -414,18 +415,19 @@ async fn create_relay(
414415 . as_ref ( )
415416 . ok_or_else ( || ServerError :: InternalError ( "Send channels not configured" . to_string ( ) ) ) ?;
416417
417- // 3. Process each message
418- let mut processed_messages = Vec :: new ( ) ;
418+ // 3. First pass: Validate ALL messages before processing any
419+ // This prevents partial success where some messages are sent to channels
420+ // and others fail validation, which would prevent retries (tx_hash already cached)
421+ struct ValidatedMessage {
422+ extracted : ExtractedMessage ,
423+ msg_ctx : Arc < MessageContext > ,
424+ app_context : Option < String > ,
425+ send_channel : UnboundedSender < QueueOperation > ,
426+ }
419427
420- for extracted in extracted_messages {
421- info ! (
422- message_id = ?extracted. message_id,
423- origin = extracted. origin_domain,
424- destination = extracted. destination_domain,
425- nonce = extracted. message. nonce,
426- "Processing message"
427- ) ;
428+ let mut validated_messages = Vec :: new ( ) ;
428429
430+ for extracted in & extracted_messages {
429431 // Get message context for (origin, destination)
430432 let msg_ctx = msg_ctxs
431433 . get ( & ( extracted. origin_domain , extracted. destination_domain ) )
@@ -436,8 +438,59 @@ async fn create_relay(
436438 ) )
437439 } ) ?;
438440
439- // Classify app_context for metrics
440- // Use short timeouts to avoid blocking the response
441+ // Apply message filtering (whitelist, blacklist, address blacklist)
442+ if let Some ( whitelist) = & state. message_whitelist {
443+ if !whitelist. msg_matches ( & extracted. message , true ) {
444+ if let Some ( ref metrics) = state. metrics {
445+ metrics. inc_failure ( "message_not_whitelisted" ) ;
446+ }
447+ return Err ( ServerError :: InvalidRequest (
448+ "Message not whitelisted" . to_string ( ) ,
449+ ) ) ;
450+ }
451+ }
452+
453+ if let Some ( blacklist) = & state. message_blacklist {
454+ if blacklist. msg_matches ( & extracted. message , false ) {
455+ if let Some ( ref metrics) = state. metrics {
456+ metrics. inc_failure ( "message_blacklisted" ) ;
457+ }
458+ return Err ( ServerError :: InvalidRequest (
459+ "Message blacklisted" . to_string ( ) ,
460+ ) ) ;
461+ }
462+ }
463+
464+ if let Some ( blacklist) = & state. address_blacklist {
465+ if let Some ( blacklisted_address) =
466+ blacklist. find_blacklisted_address ( & extracted. message )
467+ {
468+ debug ! (
469+ blacklisted_address = hex:: encode( blacklisted_address) ,
470+ message_id = ?extracted. message_id,
471+ "Rejecting message involving blacklisted address"
472+ ) ;
473+ if let Some ( ref metrics) = state. metrics {
474+ metrics. inc_failure ( "message_blacklisted_address" ) ;
475+ }
476+ return Err ( ServerError :: InvalidRequest (
477+ "Message involves a blacklisted address" . to_string ( ) ,
478+ ) ) ;
479+ }
480+ }
481+
482+ // Verify send channel exists for destination
483+ let send_channel = send_channels
484+ . get ( & extracted. destination_domain )
485+ . ok_or_else ( || {
486+ ServerError :: InvalidRequest ( format ! (
487+ "No send channel for destination domain {}" ,
488+ extracted. destination_domain
489+ ) )
490+ } ) ?
491+ . clone ( ) ;
492+
493+ // Classify app_context for metrics (with short timeouts)
441494 let recipient_ism = tokio:: time:: timeout (
442495 Duration :: from_millis ( 500 ) ,
443496 msg_ctx
@@ -501,76 +554,44 @@ async fn create_relay(
501554 debug ! (
502555 message_id = ?extracted. message_id,
503556 app_context = ?app_context,
504- "Classified message app context "
557+ "Validated and classified message "
505558 ) ;
506559
507- // Apply message filtering (whitelist, blacklist, address blacklist)
508- // Skip if not whitelisted
509- if let Some ( whitelist) = & state. message_whitelist {
510- if !whitelist. msg_matches ( & extracted. message , true ) {
511- if let Some ( ref metrics) = state. metrics {
512- metrics. inc_failure ( "message_not_whitelisted" ) ;
513- }
514- return Err ( ServerError :: InvalidRequest (
515- "Message not whitelisted" . to_string ( ) ,
516- ) ) ;
517- }
518- }
519-
520- // Skip if message is blacklisted
521- if let Some ( blacklist) = & state. message_blacklist {
522- if blacklist. msg_matches ( & extracted. message , false ) {
523- if let Some ( ref metrics) = state. metrics {
524- metrics. inc_failure ( "message_blacklisted" ) ;
525- }
526- return Err ( ServerError :: InvalidRequest (
527- "Message blacklisted" . to_string ( ) ,
528- ) ) ;
529- }
530- }
560+ validated_messages. push ( ValidatedMessage {
561+ extracted : extracted. clone ( ) ,
562+ msg_ctx : msg_ctx. clone ( ) ,
563+ app_context,
564+ send_channel,
565+ } ) ;
566+ }
531567
532- // Skip if message involves a blacklisted address
533- if let Some ( blacklist) = & state. address_blacklist {
534- if let Some ( blacklisted_address) =
535- blacklist. find_blacklisted_address ( & extracted. message )
536- {
537- // Log internally but don't expose the address in the API response
538- debug ! (
539- blacklisted_address = hex:: encode( blacklisted_address) ,
540- message_id = ?extracted. message_id,
541- "Rejecting message involving blacklisted address"
542- ) ;
543- if let Some ( ref metrics) = state. metrics {
544- metrics. inc_failure ( "message_blacklisted_address" ) ;
545- }
546- return Err ( ServerError :: InvalidRequest (
547- "Message involves a blacklisted address" . to_string ( ) ,
548- ) ) ;
549- }
550- }
568+ // 4. Second pass: Process all validated messages
569+ // All messages passed validation, safe to send to channels and persist to DB
570+ let mut processed_messages = Vec :: new ( ) ;
551571
552- // Get send channel for destination
553- let send_channel = send_channels
554- . get ( & extracted . destination_domain )
555- . ok_or_else ( || {
556- ServerError :: InvalidRequest ( format ! (
557- "No send channel for destination domain {}" ,
558- extracted. destination_domain
559- ) )
560- } ) ? ;
572+ for validated in validated_messages {
573+ let extracted = & validated . extracted ;
574+ info ! (
575+ message_id = ?extracted . message_id ,
576+ origin = extracted . origin_domain ,
577+ destination = extracted . destination_domain ,
578+ nonce = extracted. message . nonce ,
579+ "Processing validated message"
580+ ) ;
561581
562582 // Create PendingMessage with classified app_context
563583 let pending_msg = PendingMessage :: new (
564584 extracted. message . clone ( ) ,
565- msg_ctx. clone ( ) ,
585+ validated . msg_ctx . clone ( ) ,
566586 PendingOperationStatus :: FirstPrepareAttempt ,
567- app_context. clone ( ) ,
587+ validated . app_context . clone ( ) ,
568588 0 , // Max retries - relay API messages fail fast, no retries
569589 ) ;
570590
571591 // CRITICAL: Send to channel FIRST, before persisting to DB
572592 // This ensures we only persist messages that were successfully queued for processing
573- send_channel
593+ validated
594+ . send_channel
574595 . send ( Box :: new ( pending_msg) as QueueOperation )
575596 . map_err ( |e| {
576597 error ! (
@@ -584,7 +605,7 @@ async fn create_relay(
584605 info ! (
585606 message_id = ?extracted. message_id,
586607 destination = extracted. destination_domain,
587- app_context = ?app_context,
608+ app_context = ?validated . app_context,
588609 "Successfully sent message to processor channel"
589610 ) ;
590611
0 commit comments