@@ -32,7 +32,9 @@ use vector_lib::{
3232 config:: { LegacyKey , LogNamespace } ,
3333 configurable:: configurable_component,
3434 event:: { BatchNotifier , EventMetadata } ,
35- internal_event:: { ComponentEventsDropped , CountByteSize , InternalEventHandle as _, Registered , UNINTENTIONAL } ,
35+ internal_event:: {
36+ ComponentEventsDropped , CountByteSize , InternalEventHandle as _, Registered , UNINTENTIONAL ,
37+ } ,
3638 lookup:: {
3739 self , OwnedValuePath , event_path, lookup_v2:: OptionalValuePath , metadata_path,
3840 owned_value_path,
@@ -1215,13 +1217,14 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
12151217 decoder : Decoder ,
12161218 ) -> Result < ( Vec < Event > , bool ) , Rejection > {
12171219 self . envelopes_processed += 1 ;
1218- let payload = match serde_json:: to_vec ( & json[ "event" ] ) {
1220+ let event = self . validate_event_field ( & json) ?;
1221+ let payload = match serde_json:: to_vec ( event) {
12191222 Ok ( bytes) => bytes,
12201223 Err ( error) => {
12211224 let error: vector_lib:: Error = Box :: new ( error) ;
1222- emit ! ( vector_lib :: codecs :: internal_events :: DecoderDeserializeError {
1223- error: & error
1224- } ) ;
1225+ emit ! (
1226+ vector_lib :: codecs :: internal_events :: DecoderDeserializeError { error: & error }
1227+ ) ;
12251228 emit ! ( ComponentEventsDropped :: <UNINTENTIONAL > {
12261229 count: 1 ,
12271230 reason: "Failed to serialize event field to bytes." ,
@@ -1342,92 +1345,89 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
13421345 Ok ( ( out, had_decode_errors) )
13431346 }
13441347
1348+ /// Validate the `event` field of a HEC envelope, returning a reference to the
1349+ /// validated value or an error if it is missing, null, or (for string values)
1350+ /// empty. Shared between the decoder path and the legacy/vector construction
1351+ /// paths so they all enforce the same HEC protocol contract.
1352+ fn validate_event_field < ' a > ( & self , json : & ' a JsonValue ) -> Result < & ' a JsonValue , Rejection > {
1353+ let event_idx = self . envelopes_processed . saturating_sub ( 1 ) ;
1354+ match json. get ( "event" ) {
1355+ None | Some ( JsonValue :: Null ) => {
1356+ Err ( ApiError :: MissingEventField { event : event_idx } . into ( ) )
1357+ }
1358+ Some ( JsonValue :: String ( s) ) if s. is_empty ( ) => {
1359+ Err ( ApiError :: EmptyEventField { event : event_idx } . into ( ) )
1360+ }
1361+ Some ( event) => Ok ( event) ,
1362+ }
1363+ }
1364+
13451365 /// Build the log event for the vector namespace.
13461366 /// In this namespace the log event is created entirely from the event field.
13471367 /// No renaming of the `line` field is done.
13481368 fn build_log_vector ( & mut self , json : & mut JsonValue ) -> Result < LogEvent , Rejection > {
1349- match json. get ( "event" ) {
1350- Some ( event) => {
1351- let event: Value = event. into ( ) ;
1352- let mut log = LogEvent :: from ( event) ;
1369+ let event: Value = self . validate_event_field ( json) ?. into ( ) ;
1370+ let mut log = LogEvent :: from ( event) ;
13531371
1354- // EstimatedJsonSizeOf must be calculated before enrichment
1355- self . events_received
1356- . emit ( CountByteSize ( 1 , log. estimated_json_encoded_size_of ( ) ) ) ;
1372+ // EstimatedJsonSizeOf must be calculated before enrichment
1373+ self . events_received
1374+ . emit ( CountByteSize ( 1 , log. estimated_json_encoded_size_of ( ) ) ) ;
13571375
1358- // The timestamp is extracted from the message for the Legacy namespace.
1359- self . log_namespace . insert_vector_metadata (
1360- & mut log,
1361- log_schema ( ) . timestamp_key ( ) ,
1362- lookup:: path!( "ingest_timestamp" ) ,
1363- chrono:: Utc :: now ( ) ,
1364- ) ;
1376+ // The timestamp is extracted from the message for the Legacy namespace.
1377+ self . log_namespace . insert_vector_metadata (
1378+ & mut log,
1379+ log_schema ( ) . timestamp_key ( ) ,
1380+ lookup:: path!( "ingest_timestamp" ) ,
1381+ chrono:: Utc :: now ( ) ,
1382+ ) ;
13651383
1366- Ok ( log)
1367- }
1368- None => Err ( ApiError :: MissingEventField {
1369- event : self . envelopes_processed . saturating_sub ( 1 ) ,
1370- }
1371- . into ( ) ) ,
1372- }
1384+ Ok ( log)
13731385 }
13741386
13751387 /// Build the log event for the legacy namespace.
13761388 /// If the event is a string, or the event contains a field called `line` that is a string
13771389 /// (the docker splunk logger places the message in the event.line field) that string
13781390 /// is placed in the message field.
13791391 fn build_log_legacy ( & mut self , json : & mut JsonValue ) -> Result < LogEvent , Rejection > {
1392+ // validate_event_field checks for missing/null/empty-string
1393+ self . validate_event_field ( json) ?;
13801394 let mut log = LogEvent :: default ( ) ;
1381- match json. get_mut ( "event" ) {
1382- Some ( event ) => match event . take ( ) {
1383- JsonValue :: String ( string ) => {
1384- if string . is_empty ( ) {
1385- return Err ( ApiError :: EmptyEventField {
1386- event : self . envelopes_processed . saturating_sub ( 1 ) ,
1387- }
1388- . into ( ) ) ;
1395+ match json[ "event" ] . take ( ) {
1396+ JsonValue :: String ( string ) => {
1397+ log . maybe_insert ( log_schema ( ) . message_key_target_path ( ) , string ) ;
1398+ }
1399+ JsonValue :: Object ( mut object ) => {
1400+ if object . is_empty ( ) {
1401+ return Err ( ApiError :: EmptyEventField {
1402+ event : self . envelopes_processed . saturating_sub ( 1 ) ,
13891403 }
1390- log . maybe_insert ( log_schema ( ) . message_key_target_path ( ) , string ) ;
1404+ . into ( ) ) ;
13911405 }
1392- JsonValue :: Object ( mut object) => {
1393- if object. is_empty ( ) {
1394- return Err ( ApiError :: EmptyEventField {
1395- event : self . envelopes_processed . saturating_sub ( 1 ) ,
1396- }
1397- . into ( ) ) ;
1398- }
13991406
1400- // Add 'line' value as 'event::schema().message_key'
1401- if let Some ( line) = object. remove ( "line" ) {
1402- match line {
1403- // This don't quite fit the meaning of a event::schema().message_key
1404- JsonValue :: Array ( _) | JsonValue :: Object ( _) => {
1405- log. insert ( event_path ! ( "line" ) , line) ;
1406- }
1407- _ => {
1408- log. maybe_insert ( log_schema ( ) . message_key_target_path ( ) , line) ;
1409- }
1407+ // Add 'line' value as 'event::schema().message_key'
1408+ if let Some ( line) = object. remove ( "line" ) {
1409+ match line {
1410+ // This don't quite fit the meaning of a event::schema().message_key
1411+ JsonValue :: Array ( _) | JsonValue :: Object ( _) => {
1412+ log. insert ( event_path ! ( "line" ) , line) ;
1413+ }
1414+ _ => {
1415+ log. maybe_insert ( log_schema ( ) . message_key_target_path ( ) , line) ;
14101416 }
1411- }
1412-
1413- for ( key, value) in object {
1414- log. insert ( event_path ! ( key. as_str( ) ) , value) ;
14151417 }
14161418 }
1417- _ => {
1418- return Err ( ApiError :: InvalidDataFormat {
1419- event : self . envelopes_processed . saturating_sub ( 1 ) ,
1420- }
1421- . into ( ) ) ;
1419+
1420+ for ( key, value) in object {
1421+ log. insert ( event_path ! ( key. as_str( ) ) , value) ;
14221422 }
1423- } ,
1424- None => {
1425- return Err ( ApiError :: MissingEventField {
1423+ }
1424+ _ => {
1425+ return Err ( ApiError :: InvalidDataFormat {
14261426 event : self . envelopes_processed . saturating_sub ( 1 ) ,
14271427 }
14281428 . into ( ) ) ;
14291429 }
1430- } ;
1430+ }
14311431
14321432 // EstimatedJsonSizeOf must be calculated before enrichment
14331433 self . events_received
0 commit comments