@@ -7411,14 +7411,15 @@ mod unit_tests {
74117411 TrustedPeerAddr , URL_SAFE_NO_PAD , accepts_sse, active_runtime_session_count,
74127412 affinity_forward_error_response, auth_binding_fingerprint,
74137413 authenticate_public_request_if_needed, authorize_server_method_via_backend,
7414- batch_rejected_response, build_public_router, can_reuse_session_auth,
7415- can_use_direct_prompts_get, can_use_direct_resources_read, decode_request,
7416- derive_backend_authenticate_url, derive_backend_completion_complete_url,
7417- derive_backend_initialize_url, derive_backend_logging_set_level_url,
7418- derive_backend_notifications_cancelled_url, derive_backend_notifications_initialized_url,
7419- derive_backend_notifications_message_url, derive_backend_prompts_get_authz_url,
7420- derive_backend_prompts_get_url, derive_backend_prompts_list_authz_url,
7421- derive_backend_prompts_list_url, derive_backend_resource_templates_list_authz_url,
7414+ batch_rejected_response, build_forwarded_sse_event, build_public_router,
7415+ can_reuse_session_auth, can_use_direct_prompts_get, can_use_direct_resources_read,
7416+ decode_request, decode_upstream_json_payload_bytes, derive_backend_authenticate_url,
7417+ derive_backend_completion_complete_url, derive_backend_initialize_url,
7418+ derive_backend_logging_set_level_url, derive_backend_notifications_cancelled_url,
7419+ derive_backend_notifications_initialized_url, derive_backend_notifications_message_url,
7420+ derive_backend_prompts_get_authz_url, derive_backend_prompts_get_url,
7421+ derive_backend_prompts_list_authz_url, derive_backend_prompts_list_url,
7422+ derive_backend_resource_templates_list_authz_url,
74227423 derive_backend_resource_templates_list_url, derive_backend_resources_list_authz_url,
74237424 derive_backend_resources_list_url, derive_backend_resources_read_authz_url,
74247425 derive_backend_resources_read_url, derive_backend_resources_subscribe_url,
@@ -7429,14 +7430,14 @@ mod unit_tests {
74297430 derive_backend_transport_url, direct_server_prompts_get, direct_server_prompts_list,
74307431 direct_server_resource_templates_list, direct_server_resources_list,
74317432 direct_server_resources_read, encode_internal_auth_context_header, event_store_key_prefix,
7432- extract_client_capabilities, forward_initialize_to_backend , forward_to_backend ,
7433- forward_transport_request , get_runtime_session , handle_initialize_with_session_core ,
7434- handle_resume_transport_request , has_server_scope , hex_decode , hex_encode ,
7435- inject_server_id_header , inject_session_header , invalid_request_response ,
7436- is_affinity_forwarded_request, maybe_bind_session_auth_context,
7433+ extract_client_capabilities, extract_first_sse_data_payload , finalize_sse_frame ,
7434+ forward_initialize_to_backend , forward_to_backend , forward_transport_request ,
7435+ get_runtime_session , handle_initialize_with_session_core , handle_resume_transport_request ,
7436+ has_server_scope , hex_decode , hex_encode , inject_server_id_header , inject_session_header ,
7437+ invalid_request_response , is_affinity_forwarded_request, maybe_bind_session_auth_context,
74377438 maybe_upsert_runtime_session_from_transport_response, normalize_postgres_database_url,
7438- parse_error_response, pool_owner_key , prompt_arguments_from_schema , public_client_ip ,
7439- query_param, remove_runtime_session, replay_events_endpoint,
7439+ parse_error_response, parse_sse_line , pool_owner_key , prompt_arguments_from_schema ,
7440+ public_client_ip , query_param, remove_runtime_session, replay_events_endpoint,
74407441 requested_initialize_session_id, requested_protocol_version,
74417442 response_from_affinity_forward_response, run, runtime_session_access_outcome,
74427443 runtime_session_id_from_request, runtime_session_key, send_tools_list_to_backend,
@@ -7449,12 +7450,14 @@ mod unit_tests {
74497450 body:: to_bytes,
74507451 extract:: { Path as AxumPath , State } ,
74517452 http:: { HeaderMap , HeaderName , HeaderValue , StatusCode , Uri } ,
7452- response:: Response ,
7453+ response:: { IntoResponse , Response , sse :: Sse } ,
74537454 routing:: { get, post} ,
74547455 } ;
7456+ use futures_util:: stream;
74557457 use reqwest:: Url ;
74567458 use serde_json:: { Value , json} ;
74577459 use std:: {
7460+ convert:: Infallible ,
74587461 net:: { SocketAddr , TcpListener } ,
74597462 path:: PathBuf ,
74607463 sync:: { Arc , Mutex } ,
@@ -8573,6 +8576,111 @@ mod unit_tests {
85738576 assert_eq ! ( batch_rejected_response( ) . status( ) , StatusCode :: BAD_REQUEST ) ;
85748577 }
85758578
8579+ #[ test]
8580+ fn sse_parser_helpers_cover_spec_edge_cases ( ) {
8581+ let mut frame = super :: PendingSseFrame :: default ( ) ;
8582+ parse_sse_line ( & mut frame, ": keepalive" ) ;
8583+ assert ! ( !frame. saw_field) ;
8584+
8585+ parse_sse_line ( & mut frame, "data: hello" ) ;
8586+ parse_sse_line ( & mut frame, "data:world" ) ;
8587+ parse_sse_line ( & mut frame, "id: event-1" ) ;
8588+ parse_sse_line ( & mut frame, "event: message" ) ;
8589+ parse_sse_line ( & mut frame, "retry: 1500" ) ;
8590+ parse_sse_line ( & mut frame, "foo: bar" ) ;
8591+ let finalized = finalize_sse_frame ( & mut frame) . expect ( "frame should finalize" ) ;
8592+ assert_eq ! ( finalized. id. as_deref( ) , Some ( "event-1" ) ) ;
8593+ assert_eq ! ( finalized. event. as_deref( ) , Some ( "message" ) ) ;
8594+ assert_eq ! ( finalized. data, "hello\n world" ) ;
8595+ assert_eq ! ( finalized. retry_ms, Some ( 1500 ) ) ;
8596+
8597+ let mut invalid_retry = super :: PendingSseFrame :: default ( ) ;
8598+ parse_sse_line ( & mut invalid_retry, "retry: nope" ) ;
8599+ parse_sse_line ( & mut invalid_retry, "data: payload" ) ;
8600+ let invalid_retry = finalize_sse_frame ( & mut invalid_retry) . expect ( "invalid retry frame" ) ;
8601+ assert_eq ! ( invalid_retry. retry_ms, None ) ;
8602+ assert_eq ! ( invalid_retry. data, "payload" ) ;
8603+
8604+ let mut no_colon_frame = super :: PendingSseFrame :: default ( ) ;
8605+ parse_sse_line ( & mut no_colon_frame, "data" ) ;
8606+ let no_colon = finalize_sse_frame ( & mut no_colon_frame) . expect ( "empty data field" ) ;
8607+ assert_eq ! ( no_colon. data, "" ) ;
8608+
8609+ let mut comments_only = super :: PendingSseFrame :: default ( ) ;
8610+ parse_sse_line ( & mut comments_only, ": still ignored" ) ;
8611+ assert ! ( finalize_sse_frame( & mut comments_only) . is_none( ) ) ;
8612+
8613+ let mut empty = super :: PendingSseFrame :: default ( ) ;
8614+ assert ! ( finalize_sse_frame( & mut empty) . is_none( ) ) ;
8615+ }
8616+
8617+ #[ tokio:: test]
8618+ async fn build_forwarded_sse_event_and_payload_decoders_cover_edge_cases ( ) {
8619+ let frame = super :: FinalizedSseFrame {
8620+ id : Some ( "event-7" . to_string ( ) ) ,
8621+ event : Some ( "message" . to_string ( ) ) ,
8622+ data : "line one\n line two" . to_string ( ) ,
8623+ retry_ms : Some ( 2500 ) ,
8624+ } ;
8625+ let response = Sse :: new ( stream:: iter ( vec ! [ Ok :: <_, Infallible >(
8626+ build_forwarded_sse_event( & frame) ,
8627+ ) ] ) )
8628+ . into_response ( ) ;
8629+ let encoded = String :: from_utf8 (
8630+ to_bytes ( response. into_body ( ) , usize:: MAX )
8631+ . await
8632+ . expect ( "encoded event body" )
8633+ . to_vec ( ) ,
8634+ )
8635+ . expect ( "utf-8 body" ) ;
8636+ assert ! ( encoded. contains( "id: event-7" ) ) ;
8637+ assert ! ( encoded. contains( "event: message" ) ) ;
8638+ assert ! ( encoded. contains( "retry: 2500" ) ) ;
8639+ assert ! ( encoded. contains( "data: line one" ) ) ;
8640+ assert ! ( encoded. contains( "data: line two" ) ) ;
8641+
8642+ assert_eq ! (
8643+ extract_first_sse_data_payload( "data: first\n \n data: second\n \n " ) ,
8644+ Some ( "first" . to_string( ) )
8645+ ) ;
8646+ assert_eq ! (
8647+ extract_first_sse_data_payload( "data: first\n data: second" ) ,
8648+ Some ( "first\n second" . to_string( ) )
8649+ ) ;
8650+ assert_eq ! ( extract_first_sse_data_payload( "event: message\n \n " ) , None ) ;
8651+ assert_eq ! (
8652+ extract_first_sse_data_payload( "data: padded value\n \n " ) ,
8653+ Some ( "padded value" . to_string( ) )
8654+ ) ;
8655+
8656+ let sse_json = decode_upstream_json_payload_bytes (
8657+ br#"data: {"ok":true}
8658+
8659+ "# ,
8660+ "text/event-stream" ,
8661+ )
8662+ . expect ( "valid SSE JSON" ) ;
8663+ assert_eq ! ( sse_json[ "ok" ] , json!( true ) ) ;
8664+
8665+ let inferred_sse = decode_upstream_json_payload_bytes ( br#"data: {"via":"prefix"}"# , "" )
8666+ . expect ( "body prefix infers SSE" ) ;
8667+ assert_eq ! ( inferred_sse[ "via" ] , json!( "prefix" ) ) ;
8668+
8669+ let plain_json =
8670+ decode_upstream_json_payload_bytes ( br#"{"plain":true}"# , "application/json" )
8671+ . expect ( "plain JSON" ) ;
8672+ assert_eq ! ( plain_json[ "plain" ] , json!( true ) ) ;
8673+
8674+ let invalid_sse =
8675+ decode_upstream_json_payload_bytes ( b"data: not-json\n \n " , "text/event-stream" )
8676+ . expect_err ( "invalid SSE JSON should fail" ) ;
8677+ assert ! ( invalid_sse. contains( "invalid SSE JSON payload" ) ) ;
8678+
8679+ let empty_json = decode_upstream_json_payload_bytes ( b"" , "application/json" )
8680+ . expect_err ( "empty JSON body should fail" ) ;
8681+ assert ! ( empty_json. contains( "invalid JSON payload" ) ) ;
8682+ }
8683+
85768684 #[ test]
85778685 fn client_ip_and_session_auth_reuse_helpers_cover_edge_cases ( ) {
85788686 let mut headers = HeaderMap :: new ( ) ;
0 commit comments