Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ See [ITX Proxy Implementation Architecture](docs/itx-proxy-implementation.md) fo
- **JWT Authentication**: Secure authentication via Heimdall JWT tokens
- **OAuth2 M2M**: Machine-to-machine authentication with ITX using Auth0
- **ID Mapping**: Automatic v1/v2 ID translation via NATS
- **Event Processing**: Real-time sync of v1 survey data to v2 indexer and FGA (see [Event Processing](docs/event-processing.md))
- **OpenFGA Authorization**: Fine-grained access control
- **OpenAPI Spec**: Auto-generated from Goa design
- **Kubernetes Ready**: Includes Helm charts with health checks and probes
Expand Down Expand Up @@ -179,6 +180,15 @@ The service is configured via environment variables:
- `NATS_URL` - NATS server URL for ID mapping
- `ID_MAPPING_DISABLED` - Disable ID mapping for local dev (default: false)

### Event Processing

- `EVENT_PROCESSING_ENABLED` - Enable/disable event processing (default: true)
- `EVENT_CONSUMER_NAME` - JetStream consumer name (default: survey-service-kv-consumer)
- `EVENT_STREAM_NAME` - JetStream stream name (default: KV_v1-objects)
- `EVENT_FILTER_SUBJECT` - NATS subject filter (default: $KV.v1-objects.>)

See [Event Processing Documentation](docs/event-processing.md) for details.

## Docker

### Build Image
Expand Down Expand Up @@ -238,11 +248,13 @@ kubectl logs -n lfx -l app=lfx-v2-survey-service
│ └── survey/v1/design/ # API design (DSL)
├── cmd/ # Application entry points
│ └── survey-api/ # Main service binary
│ └── eventing/ # Event processing handlers
├── gen/ # Generated code (from Goa)
├── internal/ # Private application code
│ ├── domain/ # Domain interfaces and types
│ ├── infrastructure/ # Infrastructure implementations
│ │ ├── auth/ # JWT authentication
│ │ ├── eventing/ # Event processing infrastructure
│ │ ├── idmapper/ # ID mapping (NATS)
│ │ └── proxy/ # ITX proxy client
│ ├── logging/ # Structured logging
Expand All @@ -253,6 +265,7 @@ kubectl logs -n lfx -l app=lfx-v2-survey-service
│ └── models/itx/ # ITX API models
├── docs/ # Documentation
│ ├── api-contracts/ # API contract documentation
│ ├── event-processing.md # Event processing guide
│ └── itx-proxy-implementation.md # Architecture guide
├── charts/ # Helm charts
│ └── lfx-v2-survey-service/
Expand Down
156 changes: 156 additions & 0 deletions cmd/survey-api/eventing/event_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package eventing

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/linuxfoundation/lfx-v2-survey-service/internal/domain"
"github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/eventing"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

const (
V1MappingsBucket = "v1-mappings"
)

// EventProcessor handles NATS KV bucket event processing
type EventProcessor struct {
natsConn *nats.Conn
jsInstance jetstream.JetStream
consumer jetstream.Consumer
consumeCtx jetstream.ConsumeContext
publisher domain.EventPublisher
idMapper domain.IDMapper
mappingsKV jetstream.KeyValue
logger *slog.Logger
config eventing.Config
}

// NewEventProcessor creates a new event processor
func NewEventProcessor(
cfg eventing.Config,
idMapper domain.IDMapper,
logger *slog.Logger,
) (*EventProcessor, error) {
// Connect to NATS
conn, err := nats.Connect(cfg.NATSURL,
nats.DrainTimeout(30*time.Second),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
if sub != nil {
logger.With("error", err, "subject", sub.Subject).Error("NATS async error encountered")
} else {
logger.With("error", err).Error("NATS async error encountered")
}
}),
nats.ClosedHandler(func(nc *nats.Conn) {
logger.Warn("NATS connection closed")
}),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}

// Create JetStream context
jsContext, err := jetstream.New(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}

// Initialize publisher
publisher := eventing.NewNATSPublisher(conn, logger)

// Access the V1 mappings KV bucket
mappingsKV, err := jsContext.KeyValue(context.Background(), V1MappingsBucket)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to access %s KV bucket: %w", V1MappingsBucket, err)
}

return &EventProcessor{
natsConn: conn,
jsInstance: jsContext,
publisher: publisher,
idMapper: idMapper,
mappingsKV: mappingsKV,
logger: logger,
config: cfg,
}, nil
}

// Start starts the event processor
func (ep *EventProcessor) Start(ctx context.Context) error {
ep.logger.Info("Starting event processor", "consumer_name", ep.config.ConsumerName)

// Create or update consumer
consumer, err := ep.jsInstance.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{
Name: ep.config.ConsumerName,
Durable: ep.config.ConsumerName,
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: ep.config.FilterSubject,
MaxDeliver: ep.config.MaxDeliver,
AckWait: ep.config.AckWait,
MaxAckPending: ep.config.MaxAckPending,
Description: "Durable/shared KV bucket watcher for survey service",
})
if err != nil {
return fmt.Errorf("failed to create or update consumer: %w", err)
}
ep.consumer = consumer

// Start consuming messages
consumeCtx, err := consumer.Consume(func(msg jetstream.Msg) {
kvMessageHandler(ctx, msg, ep.publisher, ep.idMapper, ep.mappingsKV, ep.logger)
}, jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
ep.logger.With("error", err).Error("KV consumer error encountered")
}))
if err != nil {
return fmt.Errorf("failed to start consuming messages: %w", err)
}
ep.consumeCtx = consumeCtx

ep.logger.Info("Event processor started successfully")

// Block until context is cancelled
<-ctx.Done()

ep.logger.Info("Event processor context cancelled")
return nil
}

// Stop stops the event processor gracefully
func (ep *EventProcessor) Stop() error {
ep.logger.Info("Stopping event processor...")

// Stop the consumer
if ep.consumeCtx != nil {
ep.consumeCtx.Stop()
ep.logger.Info("Consumer stopped")
}

// Drain and close the NATS connection
if ep.natsConn != nil {
if err := ep.natsConn.Drain(); err != nil {
ep.logger.With("error", err).Error("Error draining NATS connection")
}
ep.natsConn.Close()
ep.logger.Info("NATS connection closed")
}

// Close the publisher
if ep.publisher != nil {
if err := ep.publisher.Close(); err != nil {
ep.logger.With("error", err).Error("Error closing publisher")
}
}

ep.logger.Info("Event processor stopped successfully")
return nil
}
Loading