Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ GROUPSIO_WEBHOOK_SECRET=
GROUPSIO_TIMEOUT=30s
GROUPSIO_MAX_RETRIES=3
GROUPSIO_RETRY_DELAY=1s

# Eventing Configuration
EVENTING_ENABLED=true
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,20 @@ lfx-v2-mailing-list-service/
│ ├── design/ # Goa API design files
│ │ ├── mailing_list.go # Service and endpoint definitions
│ │ └── type.go # Type definitions and data structures
│ ├── eventing/ # v1→v2 data stream event processing
│ │ ├── event_processor.go # JetStream consumer lifecycle
│ │ └── handler.go # Key-prefix router (delegates to internal/service)
│ ├── service/ # GOA service implementations
│ ├── data_stream.go # Data stream startup wiring and env config
│ ├── main.go # Application entry point
│ └── http.go # HTTP server setup
├── charts/ # Helm chart for Kubernetes deployment
│ └── lfx-v2-mailing-list-service/
│ ├── templates/ # Kubernetes resource templates
│ ├── values.yaml # Production configuration
│ └── values.local.yaml # Local development configuration
├── docs/ # Additional documentation
│ └── event-processing.md # v1→v2 data stream event processing
├── gen/ # Generated code (DO NOT EDIT)
│ ├── http/ # HTTP transport layer
│ │ ├── openapi.yaml # OpenAPI 2.0 specification
Expand All @@ -129,12 +135,17 @@ lfx-v2-mailing-list-service/
│ ├── domain/ # Business domain layer
│ │ ├── model/ # Domain models and conversions
│ │ └── port/ # Repository and service interfaces
│ │ └── mapping_store.go # MappingReader / MappingWriter / MappingReaderWriter
│ ├── service/ # Service layer implementation
│ │ └── grpsio_service_reader.go # GroupsIO service reader
│ │ ├── grpsio_*.go # GroupsIO CRUD orchestrators
│ │ ├── datastream_service_handler.go # v1-sync service transform + publish
│ │ ├── datastream_subgroup_handler.go # v1-sync mailing list transform + publish
│ │ └── datastream_member_handler.go # v1-sync member transform + publish
│ ├── infrastructure/ # Infrastructure layer
│ │ ├── auth/ # JWT authentication
│ │ ├── groupsio/ # GroupsIO API client implementation
│ │ ├── nats/ # NATS messaging and storage
│ │ │ ├── mapping_store.go # MappingReaderWriter backed by JetStream KV
│ │ │ ├── messaging_publish.go # Message publishing
│ │ │ ├── messaging_request.go # Request/reply messaging
│ │ │ └── storage.go # KV store repositories
Expand All @@ -158,6 +169,12 @@ lfx-v2-mailing-list-service/
└── go.mod # Go module definition
```

## 📚 Additional Documentation

| Document | Description |
|---|---|
| [docs/event-processing.md](docs/event-processing.md) | v1→v2 data stream: how DynamoDB change events are consumed, transformed, and published to the indexer and FGA-sync services |

## 🛠️ Development

### Prerequisites
Expand Down
3 changes: 3 additions & 0 deletions charts/lfx-v2-mailing-list-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ app:
name: lfx-v2-mailing-list-service
key: GROUPSIO_WEBHOOK_SECRET

EVENTING_ENABLED:
value: "true"

# External Secrets Operator
externalSecretsOperator:
# Enable/disable External Secrets Operator integration
Expand Down
108 changes: 108 additions & 0 deletions cmd/mailing-list-api/data_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package main

import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"sync"
"time"

"github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/eventing"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/service"
infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants"
)

// handleDataStream starts the durable JetStream consumer that processes DynamoDB KV
// change events for GroupsIO entities (service, subgroup, member).
//
// Enabled only when EVENTING_ENABLED=true. If disabled, the function
// is a no-op and returns nil.
func handleDataStream(ctx context.Context, wg *sync.WaitGroup) error {
if !dataStreamEnabled() {
slog.InfoContext(ctx, "data stream processor disabled (EVENTING_ENABLED not set to true)")
return nil
}

natsClient := service.GetNATSClient(ctx)

handler := eventing.NewEventHandler(service.MessagePublisher(ctx), service.MappingReaderWriter(ctx))
streamConsumer := infraNATS.NewDataStreamConsumer(handler)

cfg := dataStreamConfig()
processor, err := eventing.NewEventProcessor(ctx, cfg, natsClient)
if err != nil {
return fmt.Errorf("failed to create data stream processor: %w", err)
}

slog.InfoContext(ctx, "data stream processor created",
"consumer_name", cfg.ConsumerName,
"stream_name", cfg.StreamName,
)

wg.Add(1)
go func() {
defer wg.Done()
if err := processor.Start(ctx, streamConsumer); err != nil {
slog.ErrorContext(ctx, "data stream processor exited with error", "error", err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
stopCtx, cancel := context.WithTimeout(context.Background(), gracefulShutdownSeconds*time.Second)
defer cancel()
if err := processor.Stop(stopCtx); err != nil {
slog.ErrorContext(stopCtx, "error stopping data stream processor", "error", err)
}
}()

return nil
}

// dataStreamEnabled reports whether the data stream processor has been opted into.
func dataStreamEnabled() bool {
return os.Getenv("EVENTING_ENABLED") == "true"
}

// dataStreamConfig builds eventing.Config from environment variables with
// sensible defaults.
func dataStreamConfig() eventing.Config {
consumerName := os.Getenv("EVENTING_CONSUMER_NAME")
if consumerName == "" {
consumerName = "mailing-list-service-kv-consumer"
}

maxDeliver := envInt("EVENTING_MAX_DELIVER", 3)
maxAckPending := envInt("EVENTING_MAX_ACK_PENDING", 1000)
ackWaitSecs := envInt("EVENTING_ACK_WAIT_SECS", 30)

return eventing.Config{
ConsumerName: consumerName,
StreamName: "KV_" + constants.KVBucketV1Objects,
MaxDeliver: maxDeliver,
AckWait: time.Duration(ackWaitSecs) * time.Second,
MaxAckPending: maxAckPending,
}
}

// envInt reads an integer environment variable, returning defaultVal if the
// variable is absent or cannot be parsed.
func envInt(key string, defaultVal int) int {
s := os.Getenv(key)
if s == "" {
return defaultVal
}
n, err := strconv.Atoi(s)
if err != nil {
return defaultVal
}
return n
}
147 changes: 147 additions & 0 deletions cmd/mailing-list-api/eventing/event_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package eventing

import (
"context"
"fmt"
"log/slog"
"strings"
"time"

"github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port"
infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats"
"github.com/nats-io/nats.go/jetstream"
)

// Config holds the configuration for an EventProcessor.
type Config struct {
// ConsumerName is the durable consumer name (survives restarts).
ConsumerName string
// StreamName is the JetStream stream to consume from (e.g. "KV_v1-objects").
StreamName string
// MaxDeliver is the maximum number of delivery attempts before giving up.
MaxDeliver int
// AckWait is how long the server waits for an ACK before redelivering.
AckWait time.Duration
// MaxAckPending is the maximum number of unacknowledged messages in flight.
MaxAckPending int
}

// EventProcessor is the interface for JetStream KV bucket event consumers.
// Start blocks until ctx is cancelled; Stop performs a graceful shutdown.
type EventProcessor interface {
Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error
Stop(ctx context.Context) error
}

// natsEventProcessor is the NATS JetStream implementation of EventProcessor.
type natsEventProcessor struct {
natsClient *infraNATS.NATSClient
consumer jetstream.Consumer
consumeCtx jetstream.ConsumeContext
config Config
}

// NewEventProcessor creates an EventProcessor backed by the given NATSClient.
func NewEventProcessor(_ context.Context, cfg Config, natsClient *infraNATS.NATSClient) (EventProcessor, error) {
return &natsEventProcessor{
natsClient: natsClient,
config: cfg,
}, nil
}

// Start creates (or resumes) the durable JetStream consumer and processes messages
// until ctx is cancelled.
func (ep *natsEventProcessor) Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error {
slog.InfoContext(ctx, "starting data stream processor", "consumer_name", ep.config.ConsumerName)

consumer, err := ep.natsClient.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{
Name: ep.config.ConsumerName,
Durable: ep.config.ConsumerName,
// DeliverLastPerSubjectPolicy resumes from the last seen record per KV key after a
// restart, avoiding a full replay while ensuring no in-flight event is dropped.
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{
"$KV.v1-objects.itx-groupsio-v2-service.>",
"$KV.v1-objects.itx-groupsio-v2-subgroup.>",
"$KV.v1-objects.itx-groupsio-v2-member.>",
},
MaxDeliver: ep.config.MaxDeliver,
AckWait: ep.config.AckWait,
MaxAckPending: ep.config.MaxAckPending,
Description: "Durable KV watcher for mailing-list-service GroupsIO entities",
})
if err != nil {
return fmt.Errorf("failed to create or update consumer: %w", err)
}
ep.consumer = consumer

consumeCtx, err := consumer.Consume(
func(jMsg jetstream.Msg) {
meta, err := jMsg.Metadata()
if err != nil {
slog.ErrorContext(ctx, "failed to read stream message metadata, ACKing to avoid poison pill",
"subject", jMsg.Subject(), "error", err)
_ = jMsg.Ack()
return
}
streamConsumer.Process(ctx, model.StreamMessage{
Key: kvKey(jMsg.Subject()),
Data: jMsg.Data(),
IsRemoval: isKVRemoval(jMsg),
DeliveryCount: meta.NumDelivered,
Ack: jMsg.Ack,
Nak: jMsg.NakWithDelay,
})
},
jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
slog.With("error", err).Error("data stream KV consumer error")
}),
)
if err != nil {
return fmt.Errorf("failed to start consuming messages: %w", err)
}
ep.consumeCtx = consumeCtx

slog.InfoContext(ctx, "data stream processor started successfully")
<-ctx.Done()
slog.InfoContext(ctx, "data stream processor context cancelled")
return nil
}

// Stop halts the JetStream consumer. The NATS connection lifecycle is managed
// by the caller (NATSClient).
func (ep *natsEventProcessor) Stop(ctx context.Context) error {
slog.InfoContext(ctx, "stopping data stream processor")

if ep.consumeCtx != nil {
ep.consumeCtx.Stop()
slog.InfoContext(ctx, "data stream consumer stopped")
}

slog.InfoContext(ctx, "data stream processor stopped")
return nil
}

// kvKey strips the "$KV.<bucket>." prefix from a JetStream KV subject,
// returning the bare key. Subject format: $KV.<bucket>.<key>
func kvKey(subject string) string {
idx := strings.Index(subject, ".")
if idx == -1 {
return subject
}
idx2 := strings.Index(subject[idx+1:], ".")
if idx2 == -1 {
return subject
}
return subject[idx+idx2+2:]
}

func isKVRemoval(msg jetstream.Msg) bool {
op := msg.Headers().Get("Kv-Operation")
return op == "DEL" || op == "PURGE"
}
Loading
Loading