@@ -13,10 +13,9 @@ import (
1313 "github.com/segmentio/kafka-go"
1414)
1515
16- // RunEventProcessor starts the Kafka consumer for release events.
17- // Kafka messages contain the SBOM CID (and optional metadata)
18- func RunEventProcessor (ctx context.Context , db database.DBConnection ) {
19- // Parse Kafka brokers from environment variable
16+ // RunEventProcessor attempts to connect to Kafka 3 times.
17+ // If successful, it starts the consumer loop. If not, it returns an error.
18+ func RunEventProcessor (ctx context.Context , db database.DBConnection ) error {
2019 brokersEnv := os .Getenv ("KAFKA_BROKERS" )
2120 var brokers []string
2221 if brokersEnv != "" {
@@ -25,50 +24,58 @@ func RunEventProcessor(ctx context.Context, db database.DBConnection) {
2524 brokers = []string {"localhost:9092" }
2625 }
2726
28- // Create Kafka reader
27+ topic := "release-events"
28+ var conn * kafka.Conn
29+ var err error
30+
31+ // Retry logic: 3 tries
32+ for i := 1 ; i <= 3 ; i ++ {
33+ log .Printf ("Kafka connection attempt %d/3..." , i )
34+ conn , err = kafka .DialContext (ctx , "tcp" , brokers [0 ])
35+ if err == nil {
36+ conn .Close ()
37+ break
38+ }
39+ if i < 3 {
40+ time .Sleep (2 * time .Second )
41+ }
42+ }
43+
44+ if err != nil {
45+ return err // Give up after 3 tries
46+ }
47+
48+ // If connection is successful, start the reader
2949 reader := kafka .NewReader (kafka.ReaderConfig {
3050 Brokers : brokers ,
3151 GroupID : "pdvd-backend-worker" ,
32- Topic : "release-events" ,
33- MaxBytes : 10e6 , // 10MB per message
52+ Topic : topic ,
53+ MaxBytes : 10e6 ,
3454 })
35- defer func () {
36- if err := reader .Close (); err != nil {
37- log .Printf ("Error closing Kafka reader: %v" , err )
38- }
39- }()
40-
41- // Initialize ReleaseService
42- service := & services.ReleaseServiceWrapper {DB : db }
4355
44- // Initialize SBOM fetcher
45- fetcher := & services.CIDFetcher {} // implements release.SBOMFetcher
56+ go func () {
57+ defer reader .Close ()
58+ service := & services.ReleaseServiceWrapper {DB : db }
59+ fetcher := & services.CIDFetcher {}
4660
47- log .Println ("Kafka Event Processor started. Listening for release events..." )
61+ log .Println ("Kafka Event Processor started. Listening for release events..." )
4862
49- for {
50- select {
51- case <- ctx .Done ():
52- log . Println ( "Kafka Event Processor shutting down..." )
53- return
54- default :
55- // Read message
56- msg , err := reader . ReadMessage ( ctx )
57- if err != nil {
58- if ctx . Err () != nil {
59- return
63+ for {
64+ select {
65+ case <- ctx .Done ():
66+ return
67+ default :
68+ msg , err := reader . ReadMessage ( ctx )
69+ if err != nil {
70+ if ctx . Err () != nil {
71+ return
72+ }
73+ continue
6074 }
61- log .Printf ("Kafka read error: %v. Retrying in 1s..." , err )
62- time .Sleep (time .Second )
63- continue
64- }
65-
66- // Pass the raw message to the release handler along with fetcher and service
67- if err := release .HandleReleaseSBOMCreatedWithService (ctx , msg .Value , fetcher , service ); err != nil {
68- log .Printf ("Handler error for message key=%s: %v" , string (msg .Key ), err )
69- } else {
70- log .Printf ("Successfully processed message key=%s offset=%d" , string (msg .Key ), msg .Offset )
75+ _ = release .HandleReleaseSBOMCreatedWithService (ctx , msg .Value , fetcher , service )
7176 }
7277 }
73- }
78+ }()
79+
80+ return nil
7481}
0 commit comments