@@ -12,7 +12,8 @@ import (
1212func TestFlushTotalEventStat (t * testing.T ) {
1313 // Arrange
1414 eventCh := make (chan int32 , 10 )
15- mockKP := & mockKafkaProducer {}
15+ producedCh := make (chan * pb.TotalEventCountMessage , 10 ) // <- initialize channel
16+ mockKP := & mockKafkaProducer {producedCh : producedCh }
1617 flushInterval := 1 * time .Millisecond
1718 topicName := "test-topic"
1819
@@ -27,26 +28,24 @@ func TestFlushTotalEventStat(t *testing.T) {
2728 time .Sleep (2 * flushInterval )
2829
2930 // Assert
30- if len (mockKP .produced ) == 0 {
31+ select {
32+ case msg := <- producedCh :
33+ if msg .EventCount != 5 {
34+ t .Errorf ("expected EventCount=5, got %d" , msg .EventCount )
35+ }
36+ if msg .EventTimestamp .AsTime ().IsZero () {
37+ t .Error ("expected non-zero EventTimestamp" )
38+ }
39+ default :
3140 t .Error ("expected ProduceEventStat to be called, got 0" )
3241 }
33-
34- result := mockKP .produced [0 ].EventCount
35- expected := int32 (5 )
36- if result != expected {
37- t .Errorf ("expected EventCount=%d, got %d" , expected , result )
38- }
39-
40- // Also check timestamp is set
41- if mockKP .produced [0 ].EventTimestamp .AsTime ().IsZero () {
42- t .Error ("expected non-zero EventTimestamp" )
43- }
4442}
4543
4644// Optional: add a timeout test to ensure aggregation resets
4745func TestFlushResetsAfterPublish (t * testing.T ) {
4846 eventCh := make (chan int32 , 10 )
49- mockKP := & mockKafkaProducer {}
47+ producedCh := make (chan * pb.TotalEventCountMessage , 10 )
48+ mockKP := & mockKafkaProducer {producedCh : producedCh }
5049 flushInterval := 1 * time .Millisecond
5150 ts := stats .CreateTotalEventStat (mockKP , flushInterval , "topic" , eventCh )
5251
@@ -60,28 +59,37 @@ func TestFlushResetsAfterPublish(t *testing.T) {
6059 eventCh <- 4
6160 time .Sleep (2 * flushInterval )
6261
63- if len (mockKP .produced ) < 2 {
64- t .Fatalf ("expected at least 2 ProduceEventStat calls, got %d" , len (mockKP .produced ))
62+ // Assert first batch
63+ select {
64+ case msg := <- producedCh :
65+ if msg .EventCount != 1 {
66+ t .Errorf ("expected first flush=1, got %d" , msg .EventCount )
67+ }
68+ default :
69+ t .Fatal ("expected first ProduceEventStat call" )
6570 }
6671
67- if mockKP .produced [0 ].EventCount != 1 {
68- t .Errorf ("expected first flush=1, got %d" , mockKP .produced [0 ].EventCount )
69- }
70- if mockKP .produced [1 ].EventCount != 4 {
71- t .Errorf ("expected second flush=4, got %d" , mockKP .produced [1 ].EventCount )
72+ // Assert second batch
73+ select {
74+ case msg := <- producedCh :
75+ if msg .EventCount != 4 {
76+ t .Errorf ("expected second flush=4, got %d" , msg .EventCount )
77+ }
78+ default :
79+ t .Fatal ("expected second ProduceEventStat call" )
7280 }
7381}
7482
7583// mockKafkaProducer implements publisher.KafkaProducer
7684type mockKafkaProducer struct {
77- produced [] * pb.TotalEventCountMessage
85+ producedCh chan * pb.TotalEventCountMessage
7886}
7987
8088func (m * mockKafkaProducer ) ProduceBulk (_ []* pb.Event , _ string , _ chan kafka.Event ) error {
8189 return nil
8290}
8391
8492func (m * mockKafkaProducer ) ProduceEventStat (_ string , event * pb.TotalEventCountMessage ) error {
85- m .produced = append ( m . produced , event )
93+ m .producedCh <- event
8694 return nil
8795}
0 commit comments