Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ GROUPSIO_WEBHOOK_SECRET=
GROUPSIO_TIMEOUT=30s
GROUPSIO_MAX_RETRIES=3
GROUPSIO_RETRY_DELAY=1s

# Eventing Configuration
EVENTING_ENABLED=true
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,20 @@ lfx-v2-mailing-list-service/
│ ├── design/ # Goa API design files
│ │ ├── mailing_list.go # Service and endpoint definitions
│ │ └── type.go # Type definitions and data structures
│ ├── eventing/ # v1→v2 data stream event processing
│ │ ├── event_processor.go # JetStream consumer lifecycle
│ │ └── handler.go # Key-prefix router (delegates to internal/service)
│ ├── service/ # GOA service implementations
│ ├── data_stream.go # Data stream startup wiring and env config
│ ├── main.go # Application entry point
│ └── http.go # HTTP server setup
├── charts/ # Helm chart for Kubernetes deployment
│ └── lfx-v2-mailing-list-service/
│ ├── templates/ # Kubernetes resource templates
│ ├── values.yaml # Production configuration
│ └── values.local.yaml # Local development configuration
├── docs/ # Additional documentation
│ └── event-processing.md # v1→v2 data stream event processing
├── gen/ # Generated code (DO NOT EDIT)
│ ├── http/ # HTTP transport layer
│ │ ├── openapi.yaml # OpenAPI 2.0 specification
Expand All @@ -129,12 +135,17 @@ lfx-v2-mailing-list-service/
│ ├── domain/ # Business domain layer
│ │ ├── model/ # Domain models and conversions
│ │ └── port/ # Repository and service interfaces
│ │ └── mapping_store.go # MappingReader / MappingWriter / MappingReaderWriter
│ ├── service/ # Service layer implementation
│ │ └── grpsio_service_reader.go # GroupsIO service reader
│ │ ├── grpsio_*.go # GroupsIO CRUD orchestrators
│ │ ├── datastream_service_handler.go # v1-sync service transform + publish
│ │ ├── datastream_subgroup_handler.go # v1-sync mailing list transform + publish
│ │ └── datastream_member_handler.go # v1-sync member transform + publish
│ ├── infrastructure/ # Infrastructure layer
│ │ ├── auth/ # JWT authentication
│ │ ├── groupsio/ # GroupsIO API client implementation
│ │ ├── nats/ # NATS messaging and storage
│ │ │ ├── mapping_store.go # MappingReaderWriter backed by JetStream KV
│ │ │ ├── messaging_publish.go # Message publishing
│ │ │ ├── messaging_request.go # Request/reply messaging
│ │ │ └── storage.go # KV store repositories
Expand All @@ -158,6 +169,12 @@ lfx-v2-mailing-list-service/
└── go.mod # Go module definition
```

## 📚 Additional Documentation

| Document | Description |
|---|---|
| [docs/event-processing.md](docs/event-processing.md) | v1→v2 data stream: how DynamoDB change events are consumed, transformed, and published to the indexer and FGA-sync services |

## 🛠️ Development

### Prerequisites
Expand Down
3 changes: 3 additions & 0 deletions charts/lfx-v2-mailing-list-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ app:
name: lfx-v2-mailing-list-service
key: GROUPSIO_WEBHOOK_SECRET

EVENTING_ENABLED:
value: "true"

# External Secrets Operator
externalSecretsOperator:
# Enable/disable External Secrets Operator integration
Expand Down
108 changes: 108 additions & 0 deletions cmd/mailing-list-api/data_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package main

import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"sync"
"time"

"github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/eventing"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/service"
infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants"
)

// handleDataStream starts the durable JetStream consumer that processes DynamoDB KV
// change events for GroupsIO entities (service, subgroup, member).
//
// Enabled only when EVENTING_ENABLED=true. If disabled, the function
// is a no-op and returns nil.
func handleDataStream(ctx context.Context, wg *sync.WaitGroup) error {
if !dataStreamEnabled() {
slog.InfoContext(ctx, "data stream processor disabled (EVENTING_ENABLED not set to true)")
return nil
}

natsClient := service.GetNATSClient(ctx)

handler := eventing.NewEventHandler(service.MessagePublisher(ctx), service.MappingReaderWriter(ctx))
streamConsumer := infraNATS.NewDataStreamConsumer(handler)

cfg := dataStreamConfig()
processor, err := eventing.NewEventProcessor(ctx, cfg, natsClient)
if err != nil {
return fmt.Errorf("failed to create data stream processor: %w", err)
}

slog.InfoContext(ctx, "data stream processor created",
"consumer_name", cfg.ConsumerName,
"stream_name", cfg.StreamName,
)

wg.Add(1)
go func() {
defer wg.Done()
if err := processor.Start(ctx, streamConsumer); err != nil {
slog.ErrorContext(ctx, "data stream processor exited with error", "error", err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
stopCtx, cancel := context.WithTimeout(context.Background(), gracefulShutdownSeconds*time.Second)
defer cancel()
if err := processor.Stop(stopCtx); err != nil {
slog.ErrorContext(stopCtx, "error stopping data stream processor", "error", err)
}
}()

return nil
}

// dataStreamEnabled reports whether the data stream processor has been opted into.
func dataStreamEnabled() bool {
return os.Getenv("EVENTING_ENABLED") == "true"
}

// dataStreamConfig builds eventing.Config from environment variables with
// sensible defaults.
func dataStreamConfig() eventing.Config {
consumerName := os.Getenv("EVENTING_CONSUMER_NAME")
if consumerName == "" {
consumerName = "mailing-list-service-kv-consumer"
}

maxDeliver := envInt("EVENTING_MAX_DELIVER", 3)
maxAckPending := envInt("EVENTING_MAX_ACK_PENDING", 1000)
ackWaitSecs := envInt("EVENTING_ACK_WAIT_SECS", 30)

return eventing.Config{
ConsumerName: consumerName,
StreamName: "KV_" + constants.KVBucketV1Objects,
MaxDeliver: maxDeliver,
AckWait: time.Duration(ackWaitSecs) * time.Second,
MaxAckPending: maxAckPending,
}
}

// envInt reads an integer environment variable, returning defaultVal if the
// variable is absent or cannot be parsed.
func envInt(key string, defaultVal int) int {
s := os.Getenv(key)
if s == "" {
return defaultVal
}
n, err := strconv.Atoi(s)
if err != nil {
return defaultVal
}
return n
}
147 changes: 147 additions & 0 deletions cmd/mailing-list-api/eventing/event_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package eventing

import (
"context"
"fmt"
"log/slog"
"strings"
"time"

"github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port"
infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats"
"github.com/nats-io/nats.go/jetstream"
)

// Config holds the configuration for an EventProcessor.
type Config struct {
// ConsumerName is the durable consumer name (survives restarts).
ConsumerName string
// StreamName is the JetStream stream to consume from (e.g. "KV_v1-objects").
StreamName string
// MaxDeliver is the maximum number of delivery attempts before giving up.
MaxDeliver int
// AckWait is how long the server waits for an ACK before redelivering.
AckWait time.Duration
// MaxAckPending is the maximum number of unacknowledged messages in flight.
MaxAckPending int
}

// EventProcessor is the interface for JetStream KV bucket event consumers.
// Start blocks until ctx is cancelled; Stop performs a graceful shutdown.
type EventProcessor interface {
Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error
Stop(ctx context.Context) error
}

// natsEventProcessor is the NATS JetStream implementation of EventProcessor.
type natsEventProcessor struct {
natsClient *infraNATS.NATSClient
consumer jetstream.Consumer
consumeCtx jetstream.ConsumeContext
config Config
}

// NewEventProcessor creates an EventProcessor backed by the given NATSClient.
func NewEventProcessor(_ context.Context, cfg Config, natsClient *infraNATS.NATSClient) (EventProcessor, error) {
return &natsEventProcessor{
natsClient: natsClient,
config: cfg,
}, nil
}

// Start creates (or resumes) the durable JetStream consumer and processes messages
// until ctx is cancelled.
func (ep *natsEventProcessor) Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error {
slog.InfoContext(ctx, "starting data stream processor", "consumer_name", ep.config.ConsumerName)

consumer, err := ep.natsClient.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{
Name: ep.config.ConsumerName,
Durable: ep.config.ConsumerName,
// DeliverLastPerSubjectPolicy resumes from the last seen record per KV key after a
// restart, avoiding a full replay while ensuring no in-flight event is dropped.
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{
"$KV.v1-objects.itx-groupsio-v2-service.>",
"$KV.v1-objects.itx-groupsio-v2-subgroup.>",
"$KV.v1-objects.itx-groupsio-v2-member.>",
},
MaxDeliver: ep.config.MaxDeliver,
AckWait: ep.config.AckWait,
MaxAckPending: ep.config.MaxAckPending,
Description: "Durable KV watcher for mailing-list-service GroupsIO entities",
})
if err != nil {
return fmt.Errorf("failed to create or update consumer: %w", err)
}
ep.consumer = consumer

consumeCtx, err := consumer.Consume(
func(jMsg jetstream.Msg) {
meta, err := jMsg.Metadata()
if err != nil {
slog.ErrorContext(ctx, "failed to read stream message metadata, ACKing to avoid poison pill",
"subject", jMsg.Subject(), "error", err)
_ = jMsg.Ack()
return
}
streamConsumer.Process(ctx, model.StreamMessage{
Key: kvKey(jMsg.Subject()),
Data: jMsg.Data(),
IsRemoval: isKVRemoval(jMsg),
DeliveryCount: meta.NumDelivered,
Ack: jMsg.Ack,
Nak: jMsg.NakWithDelay,
})
},
jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
slog.With("error", err).Error("data stream KV consumer error")
}),
)
if err != nil {
return fmt.Errorf("failed to start consuming messages: %w", err)
}
ep.consumeCtx = consumeCtx

slog.InfoContext(ctx, "data stream processor started successfully")
<-ctx.Done()
slog.InfoContext(ctx, "data stream processor context cancelled")
return nil
}

// Stop halts the JetStream consumer. The NATS connection lifecycle is managed
// by the caller (NATSClient).
func (ep *natsEventProcessor) Stop(ctx context.Context) error {
slog.InfoContext(ctx, "stopping data stream processor")

if ep.consumeCtx != nil {
ep.consumeCtx.Stop()
slog.InfoContext(ctx, "data stream consumer stopped")
}

slog.InfoContext(ctx, "data stream processor stopped")
return nil
}

// kvKey strips the "$KV.<bucket>." prefix from a JetStream KV subject,
// returning the bare key. Subject format: $KV.<bucket>.<key>
func kvKey(subject string) string {
idx := strings.Index(subject, ".")
if idx == -1 {
return subject
}
idx2 := strings.Index(subject[idx+1:], ".")
if idx2 == -1 {
return subject
}
return subject[idx+idx2+2:]
}

func isKVRemoval(msg jetstream.Msg) bool {
op := msg.Headers().Get("Kv-Operation")
return op == "DEL" || op == "PURGE"
}
Loading
Loading