-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_processor.go
More file actions
156 lines (135 loc) · 4.32 KB
/
event_processor.go
File metadata and controls
156 lines (135 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT
package eventing
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/linuxfoundation/lfx-v2-survey-service/internal/domain"
"github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/eventing"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
V1MappingsBucket = "v1-mappings"
)
// EventProcessor handles NATS KV bucket event processing
type EventProcessor struct {
natsConn *nats.Conn
jsInstance jetstream.JetStream
consumer jetstream.Consumer
consumeCtx jetstream.ConsumeContext
publisher domain.EventPublisher
idMapper domain.IDMapper
mappingsKV jetstream.KeyValue
logger *slog.Logger
config eventing.Config
}
// NewEventProcessor creates a new event processor
func NewEventProcessor(
cfg eventing.Config,
idMapper domain.IDMapper,
logger *slog.Logger,
) (*EventProcessor, error) {
// Connect to NATS
conn, err := nats.Connect(cfg.NATSURL,
nats.DrainTimeout(30*time.Second),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
if sub != nil {
logger.With("error", err, "subject", sub.Subject).Error("NATS async error encountered")
} else {
logger.With("error", err).Error("NATS async error encountered")
}
}),
nats.ClosedHandler(func(nc *nats.Conn) {
logger.Warn("NATS connection closed")
}),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
// Create JetStream context
jsContext, err := jetstream.New(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}
// Initialize publisher
publisher := eventing.NewNATSPublisher(conn, logger)
// Access the V1 mappings KV bucket
mappingsKV, err := jsContext.KeyValue(context.Background(), V1MappingsBucket)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to access %s KV bucket: %w", V1MappingsBucket, err)
}
return &EventProcessor{
natsConn: conn,
jsInstance: jsContext,
publisher: publisher,
idMapper: idMapper,
mappingsKV: mappingsKV,
logger: logger,
config: cfg,
}, nil
}
// Start starts the event processor
func (ep *EventProcessor) Start(ctx context.Context) error {
ep.logger.Info("Starting event processor", "consumer_name", ep.config.ConsumerName)
// Create or update consumer
consumer, err := ep.jsInstance.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{
Name: ep.config.ConsumerName,
Durable: ep.config.ConsumerName,
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: ep.config.FilterSubject,
MaxDeliver: ep.config.MaxDeliver,
AckWait: ep.config.AckWait,
MaxAckPending: ep.config.MaxAckPending,
Description: "Durable/shared KV bucket watcher for survey service",
})
if err != nil {
return fmt.Errorf("failed to create or update consumer: %w", err)
}
ep.consumer = consumer
// Start consuming messages
consumeCtx, err := consumer.Consume(func(msg jetstream.Msg) {
kvMessageHandler(ctx, msg, ep.publisher, ep.idMapper, ep.mappingsKV, ep.logger)
}, jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
ep.logger.With("error", err).Error("KV consumer error encountered")
}))
if err != nil {
return fmt.Errorf("failed to start consuming messages: %w", err)
}
ep.consumeCtx = consumeCtx
ep.logger.Info("Event processor started successfully")
// Block until context is cancelled
<-ctx.Done()
ep.logger.Info("Event processor context cancelled")
return nil
}
// Stop stops the event processor gracefully
func (ep *EventProcessor) Stop() error {
ep.logger.Info("Stopping event processor...")
// Stop the consumer
if ep.consumeCtx != nil {
ep.consumeCtx.Stop()
ep.logger.Info("Consumer stopped")
}
// Drain and close the NATS connection
if ep.natsConn != nil {
if err := ep.natsConn.Drain(); err != nil {
ep.logger.With("error", err).Error("Error draining NATS connection")
}
ep.natsConn.Close()
ep.logger.Info("NATS connection closed")
}
// Close the publisher
if ep.publisher != nil {
if err := ep.publisher.Close(); err != nil {
ep.logger.With("error", err).Error("Error closing publisher")
}
}
ep.logger.Info("Event processor stopped successfully")
return nil
}