@@ -292,10 +292,7 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
292292
293293 // After receiving messages, the internal metrics should be updated.
294294 metadatatest .AssertEqualKafkaReceiverPartitionStart (t , tel , []metricdata.DataPoint [int64 ]{{
295- // 2 because:
296- // - the initial open
297- // - the invalid message causes the consumer to restart, closing the partition
298- Value : 2 ,
295+ Value : 1 ,
299296 Attributes : attribute .NewSet (attribute .String ("name" , set .ID .Name ())),
300297 }}, metricdatatest .IgnoreTimestamp ())
301298
@@ -312,19 +309,17 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
312309 err = r .Shutdown (context .Background ())
313310 require .NoError (t , err )
314311 metadatatest .AssertEqualKafkaReceiverPartitionClose (t , tel , []metricdata.DataPoint [int64 ]{{
315- // 2 because:
316- // - the invalid message causes the consumer to restart, closing the partition
317- // - it re-acquires the partition, but then shutting down closes the partition again
318- Value : 2 ,
312+ Value : 1 ,
319313 Attributes : attribute .NewSet (
320314 attribute .String ("name" , set .ID .Name ()),
321315 ),
322316 }}, metricdatatest .IgnoreTimestamp ())
323317
324318 observedErrorLogs := observedLogs .FilterLevelExact (zapcore .ErrorLevel )
325319 logEntries := observedErrorLogs .All ()
326- assert .Len (t , logEntries , 1 )
320+ assert .Len (t , logEntries , 2 )
327321 assert .Equal (t , "failed to unmarshal message" , logEntries [0 ].Message )
322+ assert .Equal (t , "failed to consume message, skipping due to message_marking config" , logEntries [1 ].Message )
328323
329324 metadatatest .AssertEqualKafkaReceiverCurrentOffset (t , tel , []metricdata.DataPoint [int64 ]{{
330325 Value : 4 , // offset of the final message
@@ -345,6 +340,104 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
345340 }}, metricdatatest .IgnoreTimestamp ())
346341}
347342
343+ func TestReceiver_MessageMarking (t * testing.T ) {
344+ t .Parallel ()
345+ for name , testcase := range map [string ]struct {
346+ markAfter bool
347+ markErrors bool
348+
349+ errorShouldRestart bool
350+ }{
351+ "mark_before" : {
352+ markAfter : false ,
353+ },
354+ "mark_after_success" : {
355+ markAfter : true ,
356+ errorShouldRestart : true ,
357+ },
358+ "mark_after_all" : {
359+ markAfter : true ,
360+ markErrors : true ,
361+ },
362+ } {
363+ t .Run (name , func (t * testing.T ) {
364+ t .Parallel ()
365+ kafkaClient , receiverConfig := mustNewFakeCluster (t , kfake .SeedTopics (1 , "otlp_spans" ))
366+
367+ // Send some invalid data to the otlp_spans topic so unmarshaling fails,
368+ // and then send some valid data to show that the invalid data does not
369+ // block the consumer.
370+ traces := testdata .GenerateTraces (1 )
371+ data , err := (& ptrace.ProtoMarshaler {}).MarshalTraces (traces )
372+ require .NoError (t , err )
373+ results := kafkaClient .ProduceSync (context .Background (),
374+ & kgo.Record {Topic : "otlp_spans" , Value : []byte ("junk" )},
375+ & kgo.Record {Topic : "otlp_spans" , Value : data },
376+ )
377+ require .NoError (t , results .FirstErr ())
378+
379+ var calls atomic.Int64
380+ consumer := newTracesConsumer (func (_ context.Context , received ptrace.Traces ) error {
381+ calls .Add (1 )
382+ return ptracetest .CompareTraces (traces , received )
383+ })
384+
385+ // Only mark messages after consuming, including for errors.
386+ receiverConfig .MessageMarking .After = testcase .markAfter
387+ receiverConfig .MessageMarking .OnError = testcase .markErrors
388+ set , tel , observedLogs := mustNewSettings (t )
389+ f := NewFactory ()
390+ r , err := f .CreateTraces (context .Background (), set , receiverConfig , consumer )
391+ require .NoError (t , err )
392+ require .NoError (t , r .Start (context .Background (), componenttest .NewNopHost ()))
393+ t .Cleanup (func () {
394+ assert .NoError (t , r .Shutdown (context .Background ()))
395+ })
396+
397+ if testcase .errorShouldRestart {
398+ // Verify that the consumer restarts at least once.
399+ assert .Eventually (t , func () bool {
400+ m , err := tel .GetMetric ("otelcol_kafka_receiver_partition_start" )
401+ require .NoError (t , err )
402+
403+ dataPoints := m .Data .(metricdata.Sum [int64 ]).DataPoints
404+ assert .Len (t , dataPoints , 1 )
405+ return dataPoints [0 ].Value > 5
406+ }, time .Second , 100 * time .Millisecond , "unmarshal error should restart consumer" )
407+
408+ // The invalid message should block the consumer.
409+ assert .Zero (t , calls .Load ())
410+
411+ observedErrorLogs := observedLogs .FilterLevelExact (zapcore .ErrorLevel )
412+ logEntries := observedErrorLogs .All ()
413+ require .NotEmpty (t , logEntries )
414+ for _ , entry := range logEntries {
415+ assert .Equal (t , "failed to unmarshal message" , entry .Message )
416+ }
417+ } else {
418+ assert .Eventually (t , func () bool {
419+ return calls .Load () == 1
420+ }, time .Second , 100 * time .Millisecond , "unmarshal error should not block consumption" )
421+
422+ // Verify that the consumer did not restart.
423+ metadatatest .AssertEqualKafkaReceiverPartitionStart (t , tel , []metricdata.DataPoint [int64 ]{{
424+ Value : 1 ,
425+ Attributes : attribute .NewSet (attribute .String ("name" , set .ID .Name ())),
426+ }}, metricdatatest .IgnoreTimestamp ())
427+
428+ observedErrorLogs := observedLogs .FilterLevelExact (zapcore .ErrorLevel )
429+ logEntries := observedErrorLogs .All ()
430+ require .Len (t , logEntries , 2 )
431+ assert .Equal (t , "failed to unmarshal message" , logEntries [0 ].Message )
432+ assert .Equal (t ,
433+ "failed to consume message, skipping due to message_marking config" ,
434+ logEntries [1 ].Message ,
435+ )
436+ }
437+ })
438+ }
439+ }
440+
348441func TestNewLogsReceiver (t * testing.T ) {
349442 t .Parallel ()
350443 kafkaClient , receiverConfig := mustNewFakeCluster (t , kfake .SeedTopics (1 , "otlp_logs" ))
0 commit comments