diff --git a/.env.example b/.env.example index c93ad88..d49dc84 100644 --- a/.env.example +++ b/.env.example @@ -36,3 +36,6 @@ GROUPSIO_WEBHOOK_SECRET= GROUPSIO_TIMEOUT=30s GROUPSIO_MAX_RETRIES=3 GROUPSIO_RETRY_DELAY=1s + +# Eventing Configuration +EVENTING_ENABLED=true diff --git a/README.md b/README.md index 9cf4cd2..407b162 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,11 @@ lfx-v2-mailing-list-service/ │ ├── design/ # Goa API design files │ │ ├── mailing_list.go # Service and endpoint definitions │ │ └── type.go # Type definitions and data structures +│ ├── eventing/ # v1→v2 data stream event processing +│ │ ├── event_processor.go # JetStream consumer lifecycle +│ │ └── handler.go # Key-prefix router (delegates to internal/service) │ ├── service/ # GOA service implementations +│ ├── data_stream.go # Data stream startup wiring and env config │ ├── main.go # Application entry point │ └── http.go # HTTP server setup ├── charts/ # Helm chart for Kubernetes deployment @@ -120,6 +124,8 @@ lfx-v2-mailing-list-service/ │ ├── templates/ # Kubernetes resource templates │ ├── values.yaml # Production configuration │ └── values.local.yaml # Local development configuration +├── docs/ # Additional documentation +│ └── event-processing.md # v1→v2 data stream event processing ├── gen/ # Generated code (DO NOT EDIT) │ ├── http/ # HTTP transport layer │ │ ├── openapi.yaml # OpenAPI 2.0 specification @@ -129,12 +135,17 @@ lfx-v2-mailing-list-service/ │ ├── domain/ # Business domain layer │ │ ├── model/ # Domain models and conversions │ │ └── port/ # Repository and service interfaces +│ │ └── mapping_store.go # MappingReader / MappingWriter / MappingReaderWriter │ ├── service/ # Service layer implementation -│ │ └── grpsio_service_reader.go # GroupsIO service reader +│ │ ├── grpsio_*.go # GroupsIO CRUD orchestrators +│ │ ├── datastream_service_handler.go # v1-sync service transform + publish +│ │ ├── datastream_subgroup_handler.go # v1-sync mailing list transform + publish +│ │ └── datastream_member_handler.go # v1-sync member transform + publish │ ├── infrastructure/ # Infrastructure layer │ │ ├── auth/ # JWT authentication │ │ ├── groupsio/ # GroupsIO API client implementation │ │ ├── nats/ # NATS messaging and storage +│ │ │ ├── mapping_store.go # MappingReaderWriter backed by JetStream KV │ │ │ ├── messaging_publish.go # Message publishing │ │ │ ├── messaging_request.go # Request/reply messaging │ │ │ └── storage.go # KV store repositories @@ -158,6 +169,12 @@ lfx-v2-mailing-list-service/ └── go.mod # Go module definition ``` +## 📚 Additional Documentation + +| Document | Description | +|---|---| +| [docs/event-processing.md](docs/event-processing.md) | v1→v2 data stream: how DynamoDB change events are consumed, transformed, and published to the indexer and FGA-sync services | + ## 🛠️ Development ### Prerequisites diff --git a/charts/lfx-v2-mailing-list-service/values.yaml b/charts/lfx-v2-mailing-list-service/values.yaml index 5c5d8de..a55fd3e 100644 --- a/charts/lfx-v2-mailing-list-service/values.yaml +++ b/charts/lfx-v2-mailing-list-service/values.yaml @@ -328,6 +328,9 @@ app: name: lfx-v2-mailing-list-service key: GROUPSIO_WEBHOOK_SECRET + EVENTING_ENABLED: + value: "true" + # External Secrets Operator externalSecretsOperator: # Enable/disable External Secrets Operator integration diff --git a/cmd/mailing-list-api/data_stream.go b/cmd/mailing-list-api/data_stream.go new file mode 100644 index 0000000..4bf875b --- /dev/null +++ b/cmd/mailing-list-api/data_stream.go @@ -0,0 +1,108 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "strconv" + "sync" + "time" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/eventing" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/service" + infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" +) + +// handleDataStream starts the durable JetStream consumer that processes DynamoDB KV +// change events for GroupsIO entities (service, subgroup, member). +// +// Enabled only when EVENTING_ENABLED=true. If disabled, the function +// is a no-op and returns nil. +func handleDataStream(ctx context.Context, wg *sync.WaitGroup) error { + if !dataStreamEnabled() { + slog.InfoContext(ctx, "data stream processor disabled (EVENTING_ENABLED not set to true)") + return nil + } + + natsClient := service.GetNATSClient(ctx) + + handler := eventing.NewEventHandler(service.MessagePublisher(ctx), service.MappingReaderWriter(ctx)) + streamConsumer := infraNATS.NewDataStreamConsumer(handler) + + cfg := dataStreamConfig() + processor, err := eventing.NewEventProcessor(ctx, cfg, natsClient) + if err != nil { + return fmt.Errorf("failed to create data stream processor: %w", err) + } + + slog.InfoContext(ctx, "data stream processor created", + "consumer_name", cfg.ConsumerName, + "stream_name", cfg.StreamName, + ) + + wg.Add(1) + go func() { + defer wg.Done() + if err := processor.Start(ctx, streamConsumer); err != nil { + slog.ErrorContext(ctx, "data stream processor exited with error", "error", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + stopCtx, cancel := context.WithTimeout(context.Background(), gracefulShutdownSeconds*time.Second) + defer cancel() + if err := processor.Stop(stopCtx); err != nil { + slog.ErrorContext(stopCtx, "error stopping data stream processor", "error", err) + } + }() + + return nil +} + +// dataStreamEnabled reports whether the data stream processor has been opted into. +func dataStreamEnabled() bool { + return os.Getenv("EVENTING_ENABLED") == "true" +} + +// dataStreamConfig builds eventing.Config from environment variables with +// sensible defaults. +func dataStreamConfig() eventing.Config { + consumerName := os.Getenv("EVENTING_CONSUMER_NAME") + if consumerName == "" { + consumerName = "mailing-list-service-kv-consumer" + } + + maxDeliver := envInt("EVENTING_MAX_DELIVER", 3) + maxAckPending := envInt("EVENTING_MAX_ACK_PENDING", 1000) + ackWaitSecs := envInt("EVENTING_ACK_WAIT_SECS", 30) + + return eventing.Config{ + ConsumerName: consumerName, + StreamName: "KV_" + constants.KVBucketV1Objects, + MaxDeliver: maxDeliver, + AckWait: time.Duration(ackWaitSecs) * time.Second, + MaxAckPending: maxAckPending, + } +} + +// envInt reads an integer environment variable, returning defaultVal if the +// variable is absent or cannot be parsed. +func envInt(key string, defaultVal int) int { + s := os.Getenv(key) + if s == "" { + return defaultVal + } + n, err := strconv.Atoi(s) + if err != nil { + return defaultVal + } + return n +} diff --git a/cmd/mailing-list-api/eventing/event_processor.go b/cmd/mailing-list-api/eventing/event_processor.go new file mode 100644 index 0000000..55ae80b --- /dev/null +++ b/cmd/mailing-list-api/eventing/event_processor.go @@ -0,0 +1,147 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + infraNATS "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats" + "github.com/nats-io/nats.go/jetstream" +) + +// Config holds the configuration for an EventProcessor. +type Config struct { + // ConsumerName is the durable consumer name (survives restarts). + ConsumerName string + // StreamName is the JetStream stream to consume from (e.g. "KV_v1-objects"). + StreamName string + // MaxDeliver is the maximum number of delivery attempts before giving up. + MaxDeliver int + // AckWait is how long the server waits for an ACK before redelivering. + AckWait time.Duration + // MaxAckPending is the maximum number of unacknowledged messages in flight. + MaxAckPending int +} + +// EventProcessor is the interface for JetStream KV bucket event consumers. +// Start blocks until ctx is cancelled; Stop performs a graceful shutdown. +type EventProcessor interface { + Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error + Stop(ctx context.Context) error +} + +// natsEventProcessor is the NATS JetStream implementation of EventProcessor. +type natsEventProcessor struct { + natsClient *infraNATS.NATSClient + consumer jetstream.Consumer + consumeCtx jetstream.ConsumeContext + config Config +} + +// NewEventProcessor creates an EventProcessor backed by the given NATSClient. +func NewEventProcessor(_ context.Context, cfg Config, natsClient *infraNATS.NATSClient) (EventProcessor, error) { + return &natsEventProcessor{ + natsClient: natsClient, + config: cfg, + }, nil +} + +// Start creates (or resumes) the durable JetStream consumer and processes messages +// until ctx is cancelled. +func (ep *natsEventProcessor) Start(ctx context.Context, streamConsumer port.DataStreamProcessor) error { + slog.InfoContext(ctx, "starting data stream processor", "consumer_name", ep.config.ConsumerName) + + consumer, err := ep.natsClient.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{ + Name: ep.config.ConsumerName, + Durable: ep.config.ConsumerName, + // DeliverLastPerSubjectPolicy resumes from the last seen record per KV key after a + // restart, avoiding a full replay while ensuring no in-flight event is dropped. + DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubjects: []string{ + "$KV.v1-objects.itx-groupsio-v2-service.>", + "$KV.v1-objects.itx-groupsio-v2-subgroup.>", + "$KV.v1-objects.itx-groupsio-v2-member.>", + }, + MaxDeliver: ep.config.MaxDeliver, + AckWait: ep.config.AckWait, + MaxAckPending: ep.config.MaxAckPending, + Description: "Durable KV watcher for mailing-list-service GroupsIO entities", + }) + if err != nil { + return fmt.Errorf("failed to create or update consumer: %w", err) + } + ep.consumer = consumer + + consumeCtx, err := consumer.Consume( + func(jMsg jetstream.Msg) { + meta, err := jMsg.Metadata() + if err != nil { + slog.ErrorContext(ctx, "failed to read stream message metadata, ACKing to avoid poison pill", + "subject", jMsg.Subject(), "error", err) + _ = jMsg.Ack() + return + } + streamConsumer.Process(ctx, model.StreamMessage{ + Key: kvKey(jMsg.Subject()), + Data: jMsg.Data(), + IsRemoval: isKVRemoval(jMsg), + DeliveryCount: meta.NumDelivered, + Ack: jMsg.Ack, + Nak: jMsg.NakWithDelay, + }) + }, + jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) { + slog.With("error", err).Error("data stream KV consumer error") + }), + ) + if err != nil { + return fmt.Errorf("failed to start consuming messages: %w", err) + } + ep.consumeCtx = consumeCtx + + slog.InfoContext(ctx, "data stream processor started successfully") + <-ctx.Done() + slog.InfoContext(ctx, "data stream processor context cancelled") + return nil +} + +// Stop halts the JetStream consumer. The NATS connection lifecycle is managed +// by the caller (NATSClient). +func (ep *natsEventProcessor) Stop(ctx context.Context) error { + slog.InfoContext(ctx, "stopping data stream processor") + + if ep.consumeCtx != nil { + ep.consumeCtx.Stop() + slog.InfoContext(ctx, "data stream consumer stopped") + } + + slog.InfoContext(ctx, "data stream processor stopped") + return nil +} + +// kvKey strips the "$KV.." prefix from a JetStream KV subject, +// returning the bare key. Subject format: $KV.. +func kvKey(subject string) string { + idx := strings.Index(subject, ".") + if idx == -1 { + return subject + } + idx2 := strings.Index(subject[idx+1:], ".") + if idx2 == -1 { + return subject + } + return subject[idx+idx2+2:] +} + +func isKVRemoval(msg jetstream.Msg) bool { + op := msg.Headers().Get("Kv-Operation") + return op == "DEL" || op == "PURGE" +} diff --git a/cmd/mailing-list-api/eventing/handler.go b/cmd/mailing-list-api/eventing/handler.go new file mode 100644 index 0000000..242629b --- /dev/null +++ b/cmd/mailing-list-api/eventing/handler.go @@ -0,0 +1,91 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "log/slog" + "strings" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/service" +) + +const ( + // KV key prefixes matching lfx-v1-sync-helper's naming convention. + kvPrefixService = "itx-groupsio-v2-service." + kvPrefixSubgroup = "itx-groupsio-v2-subgroup." + kvPrefixMember = "itx-groupsio-v2-member." + + // sdcDeletedAt is the field injected by lfx-v1-sync-helper on DynamoDB REMOVE events. + sdcDeletedAt = "_sdc_deleted_at" +) + +// eventHandler implements port.DataEventHandler and routes KV events to the +// appropriate per-entity handler based on the key prefix. +type eventHandler struct { + publisher port.MessagePublisher + mappings port.MappingReaderWriter +} + +// NewEventHandler constructs a DataEventHandler for GroupsIO entities. +// publisher is used to emit indexer and access control messages. +// mappings is the v1-mappings abstraction used for idempotency tracking. +func NewEventHandler(publisher port.MessagePublisher, mappings port.MappingReaderWriter) port.DataEventHandler { + return &eventHandler{ + publisher: publisher, + mappings: mappings, + } +} + +// HandleChange dispatches a PUT event to the correct entity handler. +// Payloads containing _sdc_deleted_at are treated as soft-deletes. +func (h *eventHandler) HandleChange(ctx context.Context, key string, data map[string]any) bool { + _, isSoftDelete := data[sdcDeletedAt] + + switch { + case strings.HasPrefix(key, kvPrefixService): + uid := key[len(kvPrefixService):] + if isSoftDelete { + return service.HandleDataStreamServiceDelete(ctx, uid, h.publisher, h.mappings) + } + return service.HandleDataStreamServiceUpdate(ctx, uid, data, h.publisher, h.mappings) + + case strings.HasPrefix(key, kvPrefixSubgroup): + uid := key[len(kvPrefixSubgroup):] + if isSoftDelete { + return service.HandleDataStreamSubgroupDelete(ctx, uid, h.publisher, h.mappings) + } + return service.HandleDataStreamSubgroupUpdate(ctx, uid, data, h.publisher, h.mappings) + + case strings.HasPrefix(key, kvPrefixMember): + uid := key[len(kvPrefixMember):] + if isSoftDelete { + return service.HandleDataStreamMemberDelete(ctx, uid, h.publisher, h.mappings) + } + return service.HandleDataStreamMemberUpdate(ctx, uid, data, h.publisher, h.mappings) + + default: + slog.WarnContext(ctx, "unrecognized KV key prefix in HandleChange, ACKing", "key", key) + return false + } +} + +// HandleRemoval dispatches a hard DELETE or PURGE event to the correct entity handler. +func (h *eventHandler) HandleRemoval(ctx context.Context, key string) bool { + switch { + case strings.HasPrefix(key, kvPrefixService): + return service.HandleDataStreamServiceDelete(ctx, key[len(kvPrefixService):], h.publisher, h.mappings) + + case strings.HasPrefix(key, kvPrefixSubgroup): + return service.HandleDataStreamSubgroupDelete(ctx, key[len(kvPrefixSubgroup):], h.publisher, h.mappings) + + case strings.HasPrefix(key, kvPrefixMember): + return service.HandleDataStreamMemberDelete(ctx, key[len(kvPrefixMember):], h.publisher, h.mappings) + + default: + slog.WarnContext(ctx, "unrecognized KV key prefix in HandleRemoval, ACKing", "key", key) + return false + } +} diff --git a/cmd/mailing-list-api/main.go b/cmd/mailing-list-api/main.go index 1f3f986..c9a47cb 100644 --- a/cmd/mailing-list-api/main.go +++ b/cmd/mailing-list-api/main.go @@ -153,6 +153,12 @@ func main() { os.Exit(1) } + // Start data stream processor for v1 DynamoDB KV events (optional — enabled via env var) + if err := handleDataStream(ctx, &wg); err != nil { + slog.ErrorContext(ctx, "FATAL: failed to start data stream processor", "error", err) + os.Exit(1) + } + // Wait for signal. slog.InfoContext(ctx, "received shutdown signal, stopping servers", "signal", <-errc, diff --git a/cmd/mailing-list-api/service/providers.go b/cmd/mailing-list-api/service/providers.go index f3f79dc..0dd1407 100644 --- a/cmd/mailing-list-api/service/providers.go +++ b/cmd/mailing-list-api/service/providers.go @@ -18,6 +18,7 @@ import ( infrastructure "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/mock" "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/nats" "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/service" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" ) var ( @@ -498,3 +499,14 @@ func GetNATSClient(ctx context.Context) *nats.NATSClient { } return storageImpl.Client() } + +// MappingReaderWriter initializes the v1-mappings KV abstraction used by the +// data stream event handler for idempotency tracking. +func MappingReaderWriter(ctx context.Context) port.MappingReaderWriter { + client := GetNATSClient(ctx) + kv, err := client.KeyValue(ctx, constants.KVBucketNameV1Mappings) + if err != nil { + log.Fatalf("failed to access %s KV bucket: %v", constants.KVBucketNameV1Mappings, err) + } + return nats.NewMappingReaderWriter(kv) +} diff --git a/docs/event-processing.md b/docs/event-processing.md new file mode 100644 index 0000000..15ff019 --- /dev/null +++ b/docs/event-processing.md @@ -0,0 +1,392 @@ +# Event Processing + +## Overview + +The mailing list service implements a NATS JetStream KV-bucket event processor that syncs GroupsIO entities (services, mailing lists, and members) from v1 DynamoDB into v2 — enabling real-time data synchronization without manual intervention. + +The pipeline is driven by the `lfx-v1-sync-helper` component, which writes DynamoDB change events into the `v1-objects` KV bucket. The mailing list service consumes those events, transforms them into v2 domain models, and publishes the results to the indexer and access-control (FGA-sync) services. + +--- + +## Architecture + +### Components + +```mermaid +graph TD + DDB[DynamoDB v1] + SH[lfx-v1-sync-helper] + KV1[NATS KV: v1-objects] + EP[EventProcessor
mailing-list-service] + KV2[NATS KV: v1-mappings
idempotency store] + IDX[Indexer Service
OpenSearch] + FGA[FGA-Sync Service
OpenFGA] + + DDB -->|CDC stream| SH + SH -->|PUT/DEL to KV| KV1 + KV1 -->|JetStream consumer| EP + EP <-->|read/write mappings| KV2 + EP -->|index messages| IDX + EP -->|access messages| FGA +``` + +### Key Prefix → Entity Mapping + +| KV Key Prefix | Entity Type | +|---|---| +| `itx-groupsio-v2-service.` | GroupsIO Service | +| `itx-groupsio-v2-subgroup.` | Mailing List (subgroup) | +| `itx-groupsio-v2-member.` | Member | + +--- + +## Event Flow + +### Create / Update Flow + +```mermaid +sequenceDiagram + participant KV as v1-objects KV + participant EP as EventProcessor + participant MP as v1-mappings KV + participant IDX as Indexer + participant FGA as FGA-Sync + + KV->>EP: PUT event (key + payload) + EP->>EP: Strip $KV prefix → bare key + EP->>EP: Decode JSON/msgpack payload + EP->>MP: resolveAction(key)
missing/tombstone → Created
present → Updated + EP->>EP: transformTo(data) + EP->>IDX: Publish IndexerMessage (Created|Updated) + EP->>FGA: Publish AccessMessage (update_access) + EP->>MP: putMapping(key, uid) +``` + +### Delete Flow + +Deletes arrive in two forms: + +- **Soft delete** — a regular PUT payload that contains the `_sdc_deleted_at` field (injected by `lfx-v1-sync-helper` on DynamoDB REMOVE events). +- **Hard delete / PURGE** — a KV message with the `Kv-Operation: DEL` or `Kv-Operation: PURGE` header. + +```mermaid +sequenceDiagram + participant KV as v1-objects KV + participant EP as EventProcessor + participant MP as v1-mappings KV + participant IDX as Indexer + participant FGA as FGA-Sync + + KV->>EP: DEL/PURGE event (or PUT with _sdc_deleted_at) + EP->>MP: isTombstoned(key)? + alt already tombstoned + EP->>EP: ACK (duplicate, skip) + else not tombstoned + EP->>IDX: Publish IndexerMessage (Deleted) + EP->>FGA: Publish AccessMessage (delete_all_access) + EP->>MP: putTombstone(key) + end +``` + +### Parent Dependency (Ordering Guarantee) + +To avoid orphaned documents in OpenSearch, child entities wait for their parent to be processed first: + +```mermaid +graph LR + S[Service
itx-groupsio-v2-service] --> SG[Subgroup / Mailing List
itx-groupsio-v2-subgroup] + SG --> M[Member
itx-groupsio-v2-member] +``` + +- A **subgroup** event is NAK'd with backoff if the parent service mapping is absent from `v1-mappings`. +- A **member** event is NAK'd with backoff if the parent subgroup mapping is absent from `v1-mappings`. + +### Reverse Index (group_id → UID) + +Members store a `group_id` (Groups.io numeric ID) rather than the v2 `mailing_list_uid`. When the subgroup handler successfully processes a mailing list, it writes a reverse index entry: + +``` +v1-mappings key: groupsio-subgroup-gid. +value: +``` + +The member handler reads this entry to resolve the parent `MailingListUID` before building the indexer message. + +--- + +## Data Transformation + +### Service (`GrpsIOService`) + +| v1 DynamoDB field | v2 model field | +|---|---| +| `group_service_type` | `Type` | +| `domain` | `Domain` | +| `group_id` | `GroupID` | +| `prefix` | `Prefix` | +| `project_id` | `ProjectUID` | +| `proj_id` | `ProjectSlug` | +| `writers` | `GrpsIOServiceSettings.Writers` | +| `auditors` | `GrpsIOServiceSettings.Auditors` | +| `created_at` | `CreatedAt` | +| `last_modified_at` | `UpdatedAt` | +| `last_system_modified_at` | `SystemUpdatedAt` | +| _(hardcoded)_ | `Source = "v1-sync"` | + +### Mailing List / Subgroup (`GrpsIOMailingList`) + +| v1 DynamoDB field | v2 model field | +|---|---| +| `group_id` | `GroupID` | +| `group_name` | `GroupName` | +| `visibility == "Public"` | `Public` | +| `type` | `Type` | +| `description` | `Description` | +| `title` | `Title` | +| `subject_tag` | `SubjectTag` | +| `url` | `URL` | +| `flags` | `Flags` | +| `subscriber_count` | `SubscriberCount` | +| `parent_id` | `ServiceUID` | +| `project_id` | `ProjectUID` | +| `committee` | `Committees[0].UID` | +| `committee_filters` | `Committees[0].AllowedVotingStatuses` | +| `created_at` | `CreatedAt` | +| `last_modified_at` | `UpdatedAt` | +| `last_system_modified_at` | `SystemUpdatedAt` | +| _(hardcoded)_ | `Source = "v1-sync"` | + +### Member (`GrpsIOMember`) + +| v1 DynamoDB field | v2 model field | +|---|---| +| `member_id` | `MemberID` | +| `group_id` | `GroupID` | +| `user_id` | `UserID` | +| `full_name` (split on first space) | `FirstName`, `LastName` | +| `email` | `Email` | +| `organization` | `Organization` | +| `job_title` | `JobTitle` | +| `groups_email` | `GroupsEmail` | +| `groups_full_name` | `GroupsFullName` | +| `committee_email` | `CommitteeEmail` | +| `committee_full_name` | `CommitteeFullName` | +| `committee_id` | `CommitteeID` | +| `role` | `Role` | +| `voting_status` | `VotingStatus` | +| `member_type` | `MemberType` | +| `delivery_mode` | `DeliveryMode` | +| `delivery_mode_list` | `DeliveryModeList` | +| `mod_status` | `ModStatus` | +| `status` | `Status` | +| `created_at` | `CreatedAt` | +| `last_modified_at` | `UpdatedAt` | +| `last_system_modified_at` | `SystemUpdatedAt` | +| _(resolved from reverse index)_ | `MailingListUID` | +| _(hardcoded)_ | `Source = "v1-sync"` | + +> Note: Members do **not** publish a separate FGA access message — access is inherited from the parent mailing list's access record. + +--- + +## NATS Subjects Published + +| Entity | Subject | Notes | +|---|---|---| +| Service | `lfx.index.groupsio_service` | All actions (Created / Updated / Deleted) | +| Service | `lfx.index.groupsio_service_settings` | Created / Updated only (when writers/auditors present) | +| Service | `lfx.update_access.groupsio_service` | Created / Updated only | +| Service | `lfx.delete_all_access.groupsio_service` | Deleted only | +| Mailing List | `lfx.index.groupsio_mailing_list` | All actions (Created / Updated / Deleted) | +| Mailing List | `lfx.index.groupsio_mailing_list_settings` | Created / Updated only (when writers/auditors present) | +| Mailing List | `lfx.update_access.groupsio_mailing_list` | Created / Updated only | +| Mailing List | `lfx.delete_all_access.groupsio_mailing_list` | Deleted only | +| Member | `lfx.index.groupsio_member` | All actions (Created / Updated / Deleted) | + +--- + +## Deduplication + +The `v1-mappings` KV bucket tracks processing state for each entity: + +| State | Key Pattern | Value | +|---|---|---| +| Synced (service) | `groupsio-service.` | `` | +| Synced (subgroup) | `groupsio-subgroup.` | `` | +| Synced (member) | `groupsio-member.` | `` | +| Reverse index | `groupsio-subgroup-gid.` | `` | +| Deleted (tombstone) | any of the above | `!del` | + +On consumer redelivery, tombstone markers prevent duplicate downstream operations. Missing keys and tombstoned entries are both treated as "never seen" for create-vs-update resolution. + +--- + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|---|---|---| +| `EVENTING_ENABLED` | _(unset)_ | Set to `true` to enable the data stream processor | +| `EVENTING_CONSUMER_NAME` | `mailing-list-service-kv-consumer` | Durable JetStream consumer name | +| `EVENTING_MAX_DELIVER` | `3` | Maximum delivery attempts before giving up | +| `EVENTING_ACK_WAIT_SECS` | `30` | Seconds the server waits for ACK before redelivering | +| `EVENTING_MAX_ACK_PENDING` | `1000` | Maximum in-flight unacknowledged messages | +| `NATS_URL` | `nats://lfx-platform-nats.lfx.svc.cluster.local:4222` | NATS server connection URL | + +### Consumer Configuration + +| Setting | Value | +|---|---| +| Delivery Policy | `DeliverLastPerSubjectPolicy` (resumes from last seen record per key after restart) | +| Ack Policy | `AckExplicitPolicy` (explicit ACK required) | +| Filter Subjects | `$KV.v1-objects.itx-groupsio-v2-service.>`, `$KV.v1-objects.itx-groupsio-v2-subgroup.>`, `$KV.v1-objects.itx-groupsio-v2-member.>` | +| Stream | `KV_v1-objects` | + +--- + +## Error Handling + +### Transient Errors (NAK — retry) + +The handler returns `true` (NAK) for: +- Parent mapping absent (subgroup waiting for service; member waiting for subgroup) +- Transient publish failures to indexer or FGA-sync (as determined by `pkgerrors.IsTransient`) + +The consumer redelivers after the `AckWait` backoff, up to `MaxDeliver` times. + +### Permanent Errors (ACK — skip) + +The handler returns `false` (ACK) for: +- Unrecognised KV key prefix +- Missing required fields (e.g., member with no `group_id`) +- Message metadata read failure (poison-pill guard) +- Payload decode failure + +These events are logged at `ERROR` level and discarded to prevent the consumer from stalling indefinitely. + +--- + +## Lifecycle + +```mermaid +stateDiagram-v2 + [*] --> Disabled: EVENTING_ENABLED != "true" + [*] --> Starting: EVENTING_ENABLED = "true" + Starting --> Running: Consumer created / resumed + Running --> Stopping: ctx cancelled (SIGTERM / shutdown) + Stopping --> [*]: consumer.Stop() called +``` + +1. **Startup**: `handleDataStream` is called from `main.go` after the NATS client is ready. If `EVENTING_ENABLED` is not `true`, the function is a no-op. +2. **Running**: The processor consumes messages in the background goroutine until context cancellation. +3. **Shutdown**: A second goroutine waits for `ctx.Done()` and calls `processor.Stop()` with a graceful-shutdown timeout. + +--- + +## Operations + +### Enable Event Processing + +```bash +export EVENTING_ENABLED=true +make run +``` + +### Disable Event Processing (e.g., local dev) + +```bash +# Simply omit the variable or set it to anything other than "true" +unset EVENTING_ENABLED +make run +``` + +### Monitoring + +Watch for these log messages: + +| Log message | Meaning | +|---|---| +| `data stream processor started successfully` | Consumer running | +| `data stream processor context cancelled` | Normal shutdown | +| `parent service not yet processed, NAKing subgroup for retry` | Ordering backpressure | +| `parent subgroup not yet processed, NAKing member for retry` | Ordering backpressure | +| `service/subgroup/member already deleted, ACKing duplicate` | Idempotent delete | +| `data stream KV consumer error` | NATS consumer-level error | + +### Troubleshooting + +| Symptom | Action | +|---|---| +| No events processed | Verify `EVENTING_ENABLED=true`, check NATS connectivity, confirm `v1-objects` bucket exists | +| Repeated NAK / ordering failures | Ensure `lfx-v1-sync-helper` is populating the KV bucket in dependency order | +| Duplicate events replayed | Inspect `v1-mappings` bucket for missing tombstones | +| Consumer not progressing | Check downstream indexer / FGA-sync availability; review `EVENTING_MAX_DELIVER` | + +--- + +## Development + +### Code Structure + +``` +cmd/mailing-list-api/ +├── data_stream.go # Startup wiring, env config +└── eventing/ + ├── event_processor.go # JetStream consumer lifecycle + └── handler.go # Key-prefix router (HandleChange / HandleRemoval) + +internal/ +├── domain/port/ +│ └── mapping_store.go # MappingReader / MappingWriter / MappingReaderWriter +├── infrastructure/nats/ +│ └── mapping_store.go # JetStream KV implementation (hides tombstone details) +└── service/ + ├── datastream_service_handler.go # Service transform + publish + ├── datastream_subgroup_handler.go # Mailing list transform + publish + reverse index + └── datastream_member_handler.go # Member transform + publish +``` + +The `MappingReaderWriter` port abstracts all `v1-mappings` KV operations (create-vs-update resolution, parent-dependency checks, tombstone writes) behind domain-meaningful methods. The JetStream KV details — including the `!del` tombstone marker — are encapsulated entirely in `internal/infrastructure/nats/mapping_store.go`. + +### Adding a New Entity Type + +1. Add KV key prefix constant in [eventing/handler.go](../cmd/mailing-list-api/eventing/handler.go) +2. Register the new prefix in the `switch` inside `HandleChange` and `HandleRemoval`, delegating to the new handler functions +3. Create `internal/service/datastream_xxx_handler.go` with `HandleDataStreamXxxUpdate` / `HandleDataStreamXxxDelete` +4. Add `transformV1ToXxx` conversion function in the same file +5. Add mapping prefix constants to [pkg/constants/storage.go](../pkg/constants/storage.go) +6. Add published subject constants to [pkg/constants/subjects.go](../pkg/constants/subjects.go) +7. Write unit tests + +### Testing Locally + +```bash +# Disable event processing for pure API development +unset EVENTING_ENABLED +make run + +# Enable with a local NATS server +export EVENTING_ENABLED=true +export NATS_URL=nats://localhost:4222 +make run + +# Inject a test event manually +nats kv put v1-objects itx-groupsio-v2-service.test-uid '{"group_service_type":"primary","project_id":"proj-1","domain":"groups.io"}' + +# Verify mapping was written +nats kv get v1-mappings groupsio-service.test-uid + +# Run unit tests +go test ./cmd/mailing-list-api/eventing/... ./internal/service/... -v +``` + +--- + +## Related Services + +| Service | Role | +|---|---| +| **lfx-v1-sync-helper** | Bridges DynamoDB CDC into the `v1-objects` NATS KV bucket | +| **Indexer** | Consumes indexer messages and upserts/deletes records in OpenSearch | +| **FGA-Sync** | Consumes access messages and manages OpenFGA relationship tuples | diff --git a/go.mod b/go.mod index f105cf2..98c0bfc 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nats-io/nats.go v1.31.0 github.com/remychantenay/slog-otel v1.3.4 github.com/stretchr/testify v1.11.1 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 go.opentelemetry.io/otel v1.40.0 @@ -33,6 +34,7 @@ require ( ) require ( + github.com/akamensky/base58 v0.0.0-20210829145138-ce8bf8802e8f // indirect github.com/aws/smithy-go v1.22.5 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -51,6 +53,7 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect go.opentelemetry.io/otel/metric v1.40.0 // indirect diff --git a/go.sum b/go.sum index c3a365f..33221ce 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/akamensky/base58 v0.0.0-20210829145138-ce8bf8802e8f h1:z8MkSJCUyTmW5YQlxsMLBlwA7GmjxC7L4ooicxqnhz8= +github.com/akamensky/base58 v0.0.0-20210829145138-ce8bf8802e8f/go.mod h1:UdUwYgAXBiL+kLfcqxoQJYkHA/vl937/PbFhZM34aZs= github.com/auth0/go-jwt-middleware/v2 v2.3.0 h1:4QREj6cS3d8dS05bEm443jhnqQF97FX9sMBeWqnNRzE= github.com/auth0/go-jwt-middleware/v2 v2.3.0/go.mod h1:dL4ObBs1/dj4/W4cYxd8rqAdDGXYyd5rqbpMIxcbVrU= github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= @@ -62,6 +64,10 @@ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8= diff --git a/internal/domain/model/grpsio_mailing_list.go b/internal/domain/model/grpsio_mailing_list.go index 0c7a441..097c731 100644 --- a/internal/domain/model/grpsio_mailing_list.go +++ b/internal/domain/model/grpsio_mailing_list.go @@ -39,8 +39,12 @@ type GrpsIOMailingList struct { ProjectName string `json:"project_name"` // Inherited from parent service ProjectSlug string `json:"project_slug"` // Inherited from parent service - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + URL string `json:"url,omitempty"` // The groups.io URL for the subgroup + Flags []string `json:"flags,omitempty"` // Warning messages about unusual settings + + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + SystemUpdatedAt time.Time `json:"system_updated_at,omitempty"` // Last modified by system (scripts/webhooks) } // Removed visibility constants - now using Public bool field diff --git a/internal/domain/model/grpsio_member.go b/internal/domain/model/grpsio_member.go index 3b5ee53..23e2636 100644 --- a/internal/domain/model/grpsio_member.go +++ b/internal/domain/model/grpsio_member.go @@ -28,17 +28,30 @@ type GrpsIOMember struct { Source string `json:"source"` // "api", "webhook", or "mock" - tracks origin for business logic // Member Information - Username string `json:"username"` // Username + UserID string `json:"user_id,omitempty"` // User-service ID of the member + Username string `json:"username"` // Username FirstName string `json:"first_name"` LastName string `json:"last_name"` Email string `json:"email"` // Required, RFC 5322 Organization string `json:"organization"` // Optional JobTitle string `json:"job_title"` // Optional + // Normalised search fields (lowercase, for filtering) + GroupsEmail string `json:"groups_email,omitempty"` // Lowercase email from Groups.io + GroupsFullName string `json:"groups_full_name,omitempty"` // Lowercase full name from Groups.io + CommitteeEmail string `json:"committee_email,omitempty"` // Lowercase email from Committee Service + CommitteeFullName string `json:"committee_full_name,omitempty"` // Lowercase full name from Committee Service + + // Committee association + CommitteeID string `json:"committee_id,omitempty"` // Committee ID if member belongs to a committee + Role string `json:"role,omitempty"` // Role of the member + VotingStatus string `json:"voting_status,omitempty"` // Voting status of the member + // Member Configuration - MemberType string `json:"member_type"` // "committee" or "direct" - DeliveryMode string `json:"delivery_mode"` // Email delivery preference - ModStatus string `json:"mod_status"` // "none", "moderator", "owner" + MemberType string `json:"member_type"` // "committee" or "direct" + DeliveryMode string `json:"delivery_mode"` // Email delivery preference + DeliveryModeList string `json:"delivery_mode_list,omitempty"` // Delivery mode list from Groups.io + ModStatus string `json:"mod_status"` // "none", "moderator", "owner" // Status Status string `json:"status"` // Groups.io status: normal, pending, etc. @@ -47,8 +60,9 @@ type GrpsIOMember struct { LastReviewedBy *string `json:"last_reviewed_by"` // Nullable user ID // Timestamps - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + SystemUpdatedAt time.Time `json:"system_updated_at,omitempty"` // Last modified by system (scripts/webhooks) } // BuildIndexKey generates a SHA-256 hash for use as a NATS KV key. diff --git a/internal/domain/model/grpsio_service.go b/internal/domain/model/grpsio_service.go index 385a91c..4e51d85 100644 --- a/internal/domain/model/grpsio_service.go +++ b/internal/domain/model/grpsio_service.go @@ -93,6 +93,7 @@ type GrpsIOService struct { Public bool `json:"public"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` + SystemUpdatedAt time.Time `json:"system_updated_at,omitempty"` // Last modified by system (scripts/webhooks) } // BuildIndexKey generates a SHA-256 hash for use as a NATS KV key diff --git a/internal/domain/model/stream_message.go b/internal/domain/model/stream_message.go new file mode 100644 index 0000000..d584c93 --- /dev/null +++ b/internal/domain/model/stream_message.go @@ -0,0 +1,23 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package model + +import "time" + +// StreamMessage represents a single message from a JetStream KV change stream, +// decoupled from any NATS-specific types. +type StreamMessage struct { + // Key is the bare KV key (subject prefix stripped). + Key string + // Data is the raw JSON payload. Nil for removal operations. + Data []byte + // IsRemoval is true for DEL/PURGE operations. + IsRemoval bool + // DeliveryCount is the number of times this message has been delivered. + DeliveryCount uint64 + // Ack acknowledges successful processing. + Ack func() error + // Nak requeues the message with the given backoff delay. + Nak func(delay time.Duration) error +} diff --git a/internal/domain/port/data_stream_event_handler.go b/internal/domain/port/data_stream_event_handler.go new file mode 100644 index 0000000..84e122c --- /dev/null +++ b/internal/domain/port/data_stream_event_handler.go @@ -0,0 +1,21 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package port + +import "context" + +// DataEventHandler handles keyed data change events from an external data source +// (e.g. a NATS JetStream KV bucket sourced from DynamoDB via lfx-v1-sync-helper). +// +// HandleChange is called when a record is created, updated, or soft-deleted (the +// caller is responsible for detecting soft-deletes from the payload fields). +// HandleRemoval is called when a record is hard-deleted or purged. +// +// Both methods return true to signal a transient error (retry with exponential +// backoff) or false to acknowledge the event (success, or a permanent error that +// should not be retried). +type DataEventHandler interface { + HandleChange(ctx context.Context, key string, data map[string]any) bool + HandleRemoval(ctx context.Context, key string) bool +} diff --git a/internal/domain/port/data_stream_processor.go b/internal/domain/port/data_stream_processor.go new file mode 100644 index 0000000..f60f796 --- /dev/null +++ b/internal/domain/port/data_stream_processor.go @@ -0,0 +1,15 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package port + +import ( + "context" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" +) + +// DataStreamProcessor processes individual messages dispatched by an EventProcessor. +type DataStreamProcessor interface { + Process(ctx context.Context, msg model.StreamMessage) +} diff --git a/internal/domain/port/mapping_store.go b/internal/domain/port/mapping_store.go new file mode 100644 index 0000000..edd94f5 --- /dev/null +++ b/internal/domain/port/mapping_store.go @@ -0,0 +1,50 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package port + +import ( + "context" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" +) + +// MappingReader abstracts read operations on the v1-mappings KV bucket. +// Implementations hide storage-level details such as tombstone markers and +// key-not-found semantics behind domain-meaningful operations. +type MappingReader interface { + // ResolveAction returns ActionCreated when the key is absent or tombstoned + // (entity never seen, or previously deleted and being re-created), and + // ActionUpdated when a live mapping already exists. + ResolveAction(ctx context.Context, key string) model.MessageAction + + // IsMappingPresent returns true when the key exists and is not tombstoned. + // Used for parent-dependency checks (service before subgroup, subgroup before member). + IsMappingPresent(ctx context.Context, key string) bool + + // IsTombstoned returns true when the key holds the deletion marker, + // so duplicate delete events can be skipped. + IsTombstoned(ctx context.Context, key string) bool + + // GetMappingValue returns the stored value and true when the key exists and + // is not tombstoned. Used when the caller needs the actual value (e.g. the + // reverse group_id → subgroup UID index in the member handler). + GetMappingValue(ctx context.Context, key string) (string, bool) +} + +// MappingWriter abstracts write operations on the v1-mappings KV bucket. +type MappingWriter interface { + // PutMapping records that an entity has been successfully processed so that + // subsequent events for the same key are treated as updates rather than creates. + PutMapping(ctx context.Context, key, value string) error + + // PutTombstone writes the deletion marker to prevent duplicate delete + // processing on consumer redelivery. + PutTombstone(ctx context.Context, key string) error +} + +// MappingReaderWriter combines read and write access to the v1-mappings KV bucket. +type MappingReaderWriter interface { + MappingReader + MappingWriter +} diff --git a/internal/infrastructure/mock/mapping_store.go b/internal/infrastructure/mock/mapping_store.go new file mode 100644 index 0000000..285dc60 --- /dev/null +++ b/internal/infrastructure/mock/mapping_store.go @@ -0,0 +1,62 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package mock + +import ( + "context" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" +) + +// FakeMappingStore is an in-memory MappingReaderWriter for unit tests. +type FakeMappingStore struct { + values map[string]string + tombstones map[string]bool +} + +var _ port.MappingReaderWriter = (*FakeMappingStore)(nil) + +// NewFakeMappingStore returns an empty FakeMappingStore. +func NewFakeMappingStore() *FakeMappingStore { + return &FakeMappingStore{values: make(map[string]string), tombstones: make(map[string]bool)} +} + +// Set pre-populates a key/value pair (helper for test setup). +func (f *FakeMappingStore) Set(key, value string) { f.values[key] = value } + +func (f *FakeMappingStore) ResolveAction(_ context.Context, key string) model.MessageAction { + if _, ok := f.values[key]; ok { + return model.ActionUpdated + } + return model.ActionCreated +} + +func (f *FakeMappingStore) IsMappingPresent(_ context.Context, key string) bool { + _, ok := f.values[key] + return ok && !f.tombstones[key] +} + +func (f *FakeMappingStore) IsTombstoned(_ context.Context, key string) bool { + return f.tombstones[key] +} + +func (f *FakeMappingStore) GetMappingValue(_ context.Context, key string) (string, bool) { + if f.tombstones[key] { + return "", false + } + v, ok := f.values[key] + return v, ok +} + +func (f *FakeMappingStore) PutMapping(_ context.Context, key, value string) error { + f.values[key] = value + return nil +} + +func (f *FakeMappingStore) PutTombstone(_ context.Context, key string) error { + f.tombstones[key] = true + delete(f.values, key) + return nil +} diff --git a/internal/infrastructure/mock/message_publisher.go b/internal/infrastructure/mock/message_publisher.go index c225d4b..4db82b1 100644 --- a/internal/infrastructure/mock/message_publisher.go +++ b/internal/infrastructure/mock/message_publisher.go @@ -10,6 +10,30 @@ import ( "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" ) +// SpyMessagePublisher records every call to Indexer and Access for assertion in tests. +type SpyMessagePublisher struct { + IndexerCalls []PublishedMsg + AccessCalls []PublishedMsg +} + +// PublishedMsg holds the subject and message from a single publisher call. +type PublishedMsg struct { + Subject string + Message any +} + +var _ port.MessagePublisher = (*SpyMessagePublisher)(nil) + +func (s *SpyMessagePublisher) Indexer(_ context.Context, subject string, message any) error { + s.IndexerCalls = append(s.IndexerCalls, PublishedMsg{subject, message}) + return nil +} +func (s *SpyMessagePublisher) Access(_ context.Context, subject string, message any) error { + s.AccessCalls = append(s.AccessCalls, PublishedMsg{subject, message}) + return nil +} +func (s *SpyMessagePublisher) Internal(_ context.Context, _ string, _ any) error { return nil } + // mockMessagePublisher is a mock implementation of the MessagePublisher interface type mockMessagePublisher struct{} diff --git a/internal/infrastructure/nats/client.go b/internal/infrastructure/nats/client.go index 01d997c..16a24d3 100644 --- a/internal/infrastructure/nats/client.go +++ b/internal/infrastructure/nats/client.go @@ -19,11 +19,23 @@ import ( // NATSClient wraps the NATS connection and provides access control operations type NATSClient struct { conn *nats.Conn + js jetstream.JetStream config Config kvStore map[string]jetstream.KeyValue timeout time.Duration } +// KeyValue opens the named KV bucket. The bucket is not cached — use KeyValueStore +// for buckets that are accessed repeatedly via the storage adapters. +func (c *NATSClient) KeyValue(ctx context.Context, bucketName string) (jetstream.KeyValue, error) { + return c.js.KeyValue(ctx, bucketName) +} + +// CreateOrUpdateConsumer creates or updates a durable JetStream consumer. +func (c *NATSClient) CreateOrUpdateConsumer(ctx context.Context, streamName string, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { + return c.js.CreateOrUpdateConsumer(ctx, streamName, cfg) +} + // NATSClientInterface defines the interface for NATS operations // This allows for easy mocking and testing type NATSClientInterface interface { @@ -68,17 +80,9 @@ func (c *NATSClient) QueueSubscribe(subject, queue string, handler nats.MsgHandl return c.conn.QueueSubscribe(subject, queue, handler) } -// KeyValueStore creates a JetStream client and gets the key-value store for projects. +// KeyValueStore opens the named KV bucket and caches it on the client. func (c *NATSClient) KeyValueStore(ctx context.Context, bucketName string) error { - js, err := jetstream.New(c.conn) - if err != nil { - slog.ErrorContext(ctx, "error creating NATS JetStream client", - "error", err, - "nats_url", c.conn.ConnectedUrl(), - ) - return err - } - kvStore, err := js.KeyValue(ctx, bucketName) + kvStore, err := c.js.KeyValue(ctx, bucketName) if err != nil { slog.ErrorContext(ctx, "error getting NATS JetStream key-value store", "error", err, @@ -144,8 +148,15 @@ func NewClient(ctx context.Context, config Config) (*NATSClient, error) { return nil, errors.NewServiceUnavailable("failed to connect to NATS", err) } + js, err := jetstream.New(conn) + if err != nil { + conn.Close() + return nil, errors.NewServiceUnavailable("failed to create JetStream context", err) + } + client := &NATSClient{ conn: conn, + js: js, config: config, timeout: config.Timeout, } diff --git a/internal/infrastructure/nats/data_stream_consumer.go b/internal/infrastructure/nats/data_stream_consumer.go new file mode 100644 index 0000000..dbf5bd1 --- /dev/null +++ b/internal/infrastructure/nats/data_stream_consumer.go @@ -0,0 +1,84 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package nats + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + msgpack "github.com/vmihailenco/msgpack/v5" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" +) + +// dataStreamConsumer is the NATS JetStream implementation of port.DataStreamProcessor. +type dataStreamConsumer struct { + handler port.DataEventHandler +} + +// Process routes a single stream message to the appropriate DataEventHandler method +// (HandleChange or HandleRemoval), then ACKs or NAKs with exponential backoff based +// on the handler's return value. +// +// Unrecoverable parse errors (invalid JSON) are always ACKed to prevent a poison-pill loop. +func (c *dataStreamConsumer) Process(ctx context.Context, msg model.StreamMessage) { + if msg.IsRemoval { + if nak := c.handler.HandleRemoval(ctx, msg.Key); nak { + c.nak(ctx, msg) + return + } + c.ack(ctx, msg) + return + } + + var data map[string]any + if err := json.Unmarshal(msg.Data, &data); err != nil { + if msgErr := msgpack.Unmarshal(msg.Data, &data); msgErr != nil { + slog.ErrorContext(ctx, "failed to unmarshal stream message payload as JSON or msgpack, ACKing to avoid poison pill", + "key", msg.Key, "json_error", err, "msgpack_error", msgErr) + c.ack(ctx, msg) + return + } + slog.DebugContext(ctx, "decoded stream message payload as msgpack", "key", msg.Key) + } + + if nak := c.handler.HandleChange(ctx, msg.Key, data); nak { + c.nak(ctx, msg) + return + } + c.ack(ctx, msg) +} + +func (c *dataStreamConsumer) ack(ctx context.Context, msg model.StreamMessage) { + if err := msg.Ack(); err != nil { + slog.ErrorContext(ctx, "failed to ACK stream message", "key", msg.Key, "error", err) + } +} + +func (c *dataStreamConsumer) nak(ctx context.Context, msg model.StreamMessage) { + delay := nakDelay(msg.DeliveryCount) + if err := msg.Nak(delay); err != nil { + slog.ErrorContext(ctx, "failed to NAK stream message", + "key", msg.Key, "delay", delay, "error", err) + } +} + +func nakDelay(numDelivered uint64) time.Duration { + switch numDelivered { + case 1: + return 2 * time.Second + case 2: + return 10 * time.Second + default: + return 20 * time.Second + } +} + +// NewDataStreamConsumer creates a port.DataStreamProcessor that dispatches messages to handler. +func NewDataStreamConsumer(handler port.DataEventHandler) port.DataStreamProcessor { + return &dataStreamConsumer{handler: handler} +} diff --git a/internal/infrastructure/nats/mapping_store.go b/internal/infrastructure/nats/mapping_store.go new file mode 100644 index 0000000..dc0968a --- /dev/null +++ b/internal/infrastructure/nats/mapping_store.go @@ -0,0 +1,76 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package nats + +import ( + "context" + "log/slog" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + "github.com/nats-io/nats.go/jetstream" +) + +type natsMappingReaderWriter struct { + kv jetstream.KeyValue +} + +// NewMappingReaderWriter wraps a JetStream KeyValue bucket as a port.MappingReaderWriter. +// All tombstone marker and key-not-found semantics are encapsulated here so the +// service layer remains free of storage-level concerns. +func NewMappingReaderWriter(kv jetstream.KeyValue) port.MappingReaderWriter { + return &natsMappingReaderWriter{kv: kv} +} + +func (m *natsMappingReaderWriter) ResolveAction(ctx context.Context, key string) model.MessageAction { + entry, err := m.kv.Get(ctx, key) + if err != nil || entry == nil { + return model.ActionCreated + } + if string(entry.Value()) == constants.KVTombstoneMarker { + return model.ActionCreated + } + return model.ActionUpdated +} + +func (m *natsMappingReaderWriter) IsMappingPresent(ctx context.Context, key string) bool { + entry, err := m.kv.Get(ctx, key) + if err != nil || entry == nil { + slog.WarnContext(ctx, "mapping key not found in v1-mappings", "mapping_key", key) + return false + } + return string(entry.Value()) != constants.KVTombstoneMarker +} + +func (m *natsMappingReaderWriter) IsTombstoned(ctx context.Context, key string) bool { + entry, err := m.kv.Get(ctx, key) + if err != nil || entry == nil { + slog.WarnContext(ctx, "mapping key not found in v1-mappings - treating as not tombstoned", "mapping_key", key) + return false + } + return string(entry.Value()) == constants.KVTombstoneMarker +} + +func (m *natsMappingReaderWriter) GetMappingValue(ctx context.Context, key string) (string, bool) { + entry, err := m.kv.Get(ctx, key) + if err != nil || entry == nil { + return "", false + } + val := string(entry.Value()) + if val == constants.KVTombstoneMarker { + return "", false + } + return val, true +} + +func (m *natsMappingReaderWriter) PutMapping(ctx context.Context, key, value string) error { + _, err := m.kv.Put(ctx, key, []byte(value)) + return err +} + +func (m *natsMappingReaderWriter) PutTombstone(ctx context.Context, key string) error { + _, err := m.kv.Put(ctx, key, []byte(constants.KVTombstoneMarker)) + return err +} diff --git a/internal/service/datastream_member_handler.go b/internal/service/datastream_member_handler.go new file mode 100644 index 0000000..5867a7b --- /dev/null +++ b/internal/service/datastream_member_handler.go @@ -0,0 +1,208 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + pkgerrors "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/errors" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/mapconv" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/principal" +) + +// HandleDataStreamMemberUpdate transforms the v1 payload into a GrpsIOMember and publishes an +// indexer message and, when the member has a username, an FGA put_member access message. +// Returns true to NAK when the parent subgroup mapping is absent (ordering guarantee) +// or on transient errors. +func HandleDataStreamMemberUpdate(ctx context.Context, uid string, data map[string]any, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + // Members carry group_id (Groups.io numeric ID) rather than a direct mailing_list_uid. + // Resolve the parent subgroup UID via the reverse index written by the subgroup handler. + groupID := mapconv.Int64Ptr(data, "group_id") + if groupID == nil { + slog.ErrorContext(ctx, "member has no group_id, cannot determine parent mailing list — ACKing", "uid", uid) + return false + } + + gidKey := fmt.Sprintf("%s.%d", constants.KVMappingPrefixSubgroupByGroupID, *groupID) + mailingListUID, ok := mappings.GetMappingValue(ctx, gidKey) + if !ok { + slog.WarnContext(ctx, "parent subgroup not yet processed, NAKing member for retry", + "uid", uid, "group_id", *groupID) + return true // NAK — retry with backoff + } + + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixMember, uid) + + if mappings.IsTombstoned(ctx, mKey) { + slog.InfoContext(ctx, "member mapping is tombstoned, skipping update", "uid", uid) + return false + } + + action := mappings.ResolveAction(ctx, mKey) + + member := transformV1ToGrpsIOMember(uid, mailingListUID, data) + + msg := &model.IndexerMessage{Action: action, Tags: member.Tags()} + built, err := msg.Build(ctx, member) + if err != nil { + slog.ErrorContext(ctx, "failed to build member indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOMemberSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish member indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + if member.Username != "" { + accessMsg := &groupsioMailingListMemberStub{ + UID: uid, + Username: principal.FromUsername(member.Username), + MailingListUID: mailingListUID, + } + if err := publisher.Access(ctx, constants.PutMemberGroupsIOMailingListSubject, accessMsg); err != nil { + slog.WarnContext(ctx, "failed to publish member FGA put message", "uid", uid, "error", err) + } + } + + mappingValue := buildMemberMappingValue(uid, member.Username, mailingListUID) + if err := mappings.PutMapping(ctx, mKey, mappingValue); err != nil { + slog.ErrorContext(ctx, "failed to put mapping key", "mapping_key", mKey, "error", err) + } + return false +} + +// HandleDataStreamMemberDelete publishes a delete indexer message, an FGA remove_member message +// (when the stored mapping contains a username), and tombstones the mapping. +func HandleDataStreamMemberDelete(ctx context.Context, uid string, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixMember, uid) + + if mappings.IsTombstoned(ctx, mKey) { + slog.InfoContext(ctx, "member already deleted, ACKing duplicate", "uid", uid) + return false + } + + // If there is no mapping entry, this record was never indexed — nothing to delete. + storedValue, ok := mappings.GetMappingValue(ctx, mKey) + if !ok { + slog.InfoContext(ctx, "member was never indexed, skipping OpenSearch delete", "uid", uid) + if err := mappings.PutTombstone(ctx, mKey); err != nil { + slog.ErrorContext(ctx, "failed to put tombstone", "mapping_key", mKey, "error", err) + } + return false + } + + msg := &model.IndexerMessage{Action: model.ActionDeleted} + built, err := msg.Build(ctx, uid) + if err != nil { + slog.ErrorContext(ctx, "failed to build member delete indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOMemberSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish member delete indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + _, username, mailingListUID := parseMemberMappingValue(storedValue) + if username != "" { + accessMsg := &groupsioMailingListMemberStub{ + UID: uid, + Username: principal.FromUsername(username), + MailingListUID: mailingListUID, + } + if err := publisher.Access(ctx, constants.RemoveMemberGroupsIOMailingListSubject, accessMsg); err != nil { + slog.WarnContext(ctx, "failed to publish member FGA remove message", "uid", uid, "error", err) + } + } + + if err := mappings.PutTombstone(ctx, mKey); err != nil { + slog.ErrorContext(ctx, "failed to put tombstone", "mapping_key", mKey, "error", err) + } + return false +} + +// transformV1ToGrpsIOMember maps v1 DynamoDB fields to the GrpsIOMember domain model. +// mailingListUID is resolved from the reverse group_id index before calling this function. +func transformV1ToGrpsIOMember(uid, mailingListUID string, data map[string]any) *model.GrpsIOMember { + firstName, lastName := splitFullName(mapconv.StringVal(data, "full_name")) + + member := &model.GrpsIOMember{ + UID: uid, + MailingListUID: mailingListUID, + MemberID: mapconv.Int64Ptr(data, "member_id"), + GroupID: mapconv.Int64Ptr(data, "group_id"), + UserID: mapconv.StringVal(data, "user_id"), + Username: mapconv.StringVal(data, "username"), + FirstName: firstName, + LastName: lastName, + Email: mapconv.StringVal(data, "email"), + Organization: mapconv.StringVal(data, "organization"), + JobTitle: mapconv.StringVal(data, "job_title"), + GroupsEmail: mapconv.StringVal(data, "groups_email"), + GroupsFullName: mapconv.StringVal(data, "groups_full_name"), + CommitteeEmail: mapconv.StringVal(data, "committee_email"), + CommitteeFullName: mapconv.StringVal(data, "committee_full_name"), + CommitteeID: mapconv.StringVal(data, "committee_id"), + Role: mapconv.StringVal(data, "role"), + VotingStatus: mapconv.StringVal(data, "voting_status"), + MemberType: mapconv.StringVal(data, "member_type"), + DeliveryMode: mapconv.StringVal(data, "delivery_mode"), + DeliveryModeList: mapconv.StringVal(data, "delivery_mode_list"), + ModStatus: mapconv.StringVal(data, "mod_status"), + Status: mapconv.StringVal(data, "status"), + Source: "v1-sync", + } + + if ts := mapconv.StringVal(data, "created_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + member.CreatedAt = t + } + } + if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + member.UpdatedAt = t + } + } + if ts := mapconv.StringVal(data, "last_system_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + member.SystemUpdatedAt = t + } + } + + return member +} + +// buildMemberMappingValue encodes uid, username, and mailingListUID into a single string +// so they can be recovered on delete without an extra lookup. +func buildMemberMappingValue(uid, username, mailingListUID string) string { + return fmt.Sprintf("%s|%s|%s", uid, username, mailingListUID) +} + +// parseMemberMappingValue decodes a value written by buildMemberMappingValue. +// Falls back gracefully for old-format entries that only stored uid. +func parseMemberMappingValue(value string) (uid, username, mailingListUID string) { + parts := strings.SplitN(value, "|", 3) + if len(parts) == 3 { + return parts[0], parts[1], parts[2] + } + return value, "", "" +} + +// splitFullName splits "First Last" into (first, last). +// For single-token names (no space), the whole string is returned as first name. +func splitFullName(fullName string) (string, string) { + idx := strings.Index(fullName, " ") + if idx == -1 { + return fullName, "" + } + return fullName[:idx], fullName[idx+1:] +} diff --git a/internal/service/datastream_member_handler_test.go b/internal/service/datastream_member_handler_test.go new file mode 100644 index 0000000..043176f --- /dev/null +++ b/internal/service/datastream_member_handler_test.go @@ -0,0 +1,127 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "testing" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/mock" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + "github.com/stretchr/testify/assert" +) + +// --- HandleDataStreamMemberUpdate --- + +func TestHandleDataStreamMemberUpdate_MissingGroupID_ACK(t *testing.T) { + nak := HandleDataStreamMemberUpdate(context.Background(), "mem-1", + map[string]any{}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.False(t, nak, "missing group_id should ACK (malformed data, no retry)") +} + +func TestHandleDataStreamMemberUpdate_ParentSubgroupAbsent_NAK(t *testing.T) { + // group_id present but no subgroup mapping written yet + nak := HandleDataStreamMemberUpdate(context.Background(), "mem-1", + map[string]any{"group_id": float64(42)}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.True(t, nak, "absent subgroup mapping should NAK for retry") +} + +func TestHandleDataStreamMemberUpdate_Tombstoned_ACK(t *testing.T) { + m := mock.NewFakeMappingStore() + ctx := context.Background() + m.Set(fmt.Sprintf("%s.42", constants.KVMappingPrefixSubgroupByGroupID), "sg-1") + _ = m.PutTombstone(ctx, fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember)) + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamMemberUpdate(ctx, "mem-1", + map[string]any{"group_id": float64(42)}, + pub, m) + + assert.False(t, nak) + assert.Empty(t, pub.IndexerCalls, "tombstoned member should not publish") +} + +func TestHandleDataStreamMemberUpdate_HappyPath_ACKAndPublishesAndWritesMapping(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.42", constants.KVMappingPrefixSubgroupByGroupID), "sg-1") + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamMemberUpdate(context.Background(), "mem-1", + map[string]any{ + "group_id": float64(42), + "member_id": float64(99), + "email": "alice@example.com", + "full_name": "Alice Smith", + }, + pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOMemberSubject, pub.IndexerCalls[0].Subject) + assert.Empty(t, pub.AccessCalls, "member access is inherited — no access message expected") + + _, present := m.GetMappingValue(context.Background(), + fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember)) + assert.True(t, present, "forward mapping should be written after successful processing") +} + +func TestHandleDataStreamMemberUpdate_CreateVsUpdate_Action(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.42", constants.KVMappingPrefixSubgroupByGroupID), "sg-1") + + data := func() map[string]any { return map[string]any{"group_id": float64(42)} } + ctx := context.Background() + mKey := fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember) + + assert.Equal(t, model.ActionCreated, m.ResolveAction(ctx, mKey)) + HandleDataStreamMemberUpdate(ctx, "mem-1", data(), &mock.SpyMessagePublisher{}, m) + assert.Equal(t, model.ActionUpdated, m.ResolveAction(ctx, mKey)) +} + +// --- HandleDataStreamMemberDelete --- + +func TestHandleDataStreamMemberDelete_DuplicateDelete_ACK(t *testing.T) { + m := mock.NewFakeMappingStore() + ctx := context.Background() + _ = m.PutTombstone(ctx, fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember)) + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamMemberDelete(ctx, "mem-1", pub, m) + + assert.False(t, nak) + assert.Empty(t, pub.IndexerCalls, "duplicate delete should not publish") +} + +func TestHandleDataStreamMemberDelete_NeverIndexed_TombstonesWithoutPublishing(t *testing.T) { + m := mock.NewFakeMappingStore() + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamMemberDelete(context.Background(), "mem-1", pub, m) + + assert.False(t, nak) + assert.Empty(t, pub.IndexerCalls, "never-indexed member should not publish indexer message") + assert.True(t, m.IsTombstoned(context.Background(), + fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember)), + "should still tombstone to prevent future re-processing") +} + +func TestHandleDataStreamMemberDelete_HappyPath_ACKAndTombstones(t *testing.T) { + m := mock.NewFakeMappingStore() + ctx := context.Background() + mKey := fmt.Sprintf("%s.mem-1", constants.KVMappingPrefixMember) + _ = m.PutMapping(ctx, mKey, "mem-1") + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamMemberDelete(ctx, "mem-1", pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOMemberSubject, pub.IndexerCalls[0].Subject) + assert.Empty(t, pub.AccessCalls, "member delete should not publish access message") + + assert.True(t, m.IsTombstoned(ctx, mKey)) +} diff --git a/internal/service/datastream_service_handler.go b/internal/service/datastream_service_handler.go new file mode 100644 index 0000000..51d02af --- /dev/null +++ b/internal/service/datastream_service_handler.go @@ -0,0 +1,173 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + pkgerrors "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/errors" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/mapconv" +) + +// HandleDataStreamServiceUpdate transforms the v1 payload into a GrpsIOService and publishes +// indexer + access control messages. Returns true to NAK on transient errors. +func HandleDataStreamServiceUpdate(ctx context.Context, uid string, data map[string]any, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + // Resolve v1 project SFID → v2 project UID via the shared project.sfid.{sfid} mapping + // written by lfx-v1-sync-helper. NAK if the project hasn't been processed yet. + projectSFID := mapconv.StringVal(data, "project_id") + if projectSFID == "" { + slog.ErrorContext(ctx, "missing project_id in service event, discarding", "uid", uid) + return false // ACK — malformed data, retrying won't help + } + projectUID, ok := mappings.GetMappingValue(ctx, fmt.Sprintf("%s.%s", constants.KVMappingPrefixProjectBySFID, projectSFID)) + if !ok { + slog.WarnContext(ctx, "project mapping not yet available, NAKing service for retry", + "uid", uid, "project_sfid", projectSFID) + return true // NAK — retry with backoff + } + data["project_id"] = projectUID + + svc := transformV1ToGrpsIOService(uid, data) + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixService, uid) + action := mappings.ResolveAction(ctx, mKey) + + msg := &model.IndexerMessage{Action: action, Tags: svc.Tags()} + built, err := msg.Build(ctx, svc) + if err != nil { + slog.ErrorContext(ctx, "failed to build service indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOServiceSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish service indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + // Publish settings indexer message when writers or auditors are present. + settings := buildServiceSettings(uid, data) + if settings != nil { + settingsMsg := &model.IndexerMessage{Action: action, Tags: settings.Tags()} + builtSettings, errSettings := settingsMsg.Build(ctx, settings) + if errSettings != nil { + slog.ErrorContext(ctx, "failed to build service settings indexer message", "uid", uid, "error", errSettings) + } + if errSettings == nil { + if errPublish := publisher.Indexer(ctx, constants.IndexGroupsIOServiceSettingsSubject, builtSettings); errPublish != nil { + slog.ErrorContext(ctx, "failed to publish service settings indexer message", "uid", uid, "error", errPublish) + } + } + } + + references := map[string][]string{ + constants.RelationProject: {svc.ProjectUID}, + } + if settings != nil { + if writers := userInfoUsernames(settings.Writers); len(writers) > 0 { + references[constants.RelationWriter] = writers + } + if auditors := userInfoUsernames(settings.Auditors); len(auditors) > 0 { + references[constants.RelationAuditor] = auditors + } + } + accessMsg := &model.AccessMessage{ + UID: uid, + ObjectType: constants.ObjectTypeGroupsIOService, + Public: svc.Public, + References: references, + } + if err := publisher.Access(ctx, constants.UpdateAccessGroupsIOServiceSubject, accessMsg); err != nil { + slog.WarnContext(ctx, "failed to publish service access message", "uid", uid, "error", err) + } + + if err := mappings.PutMapping(ctx, mKey, uid); err != nil { + slog.ErrorContext(ctx, "failed to put mapping key", "mapping_key", mKey, "error", err) + } + return false +} + +// HandleDataStreamServiceDelete publishes a delete indexer message and tombstones the mapping. +// Returns true to NAK on transient errors. +func HandleDataStreamServiceDelete(ctx context.Context, uid string, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixService, uid) + + if mappings.IsTombstoned(ctx, mKey) { + slog.InfoContext(ctx, "service already deleted, ACKing duplicate", "uid", uid) + return false + } + + msg := &model.IndexerMessage{Action: model.ActionDeleted} + built, err := msg.Build(ctx, uid) + if err != nil { + slog.ErrorContext(ctx, "failed to build service delete indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOServiceSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish service delete indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + if err := publisher.Access(ctx, constants.DeleteAllAccessGroupsIOServiceSubject, uid); err != nil { + slog.WarnContext(ctx, "failed to publish service delete access message", "uid", uid, "error", err) + } + + if err := mappings.PutTombstone(ctx, mKey); err != nil { + slog.ErrorContext(ctx, "failed to put tombstone", "mapping_key", mKey, "error", err) + } + return false +} + +// buildServiceSettings constructs a GrpsIOServiceSettings from v1 writers/auditors. +// Returns nil when both slices are empty (no settings message needed). +func buildServiceSettings(uid string, data map[string]any) *model.GrpsIOServiceSettings { + writers := toUserInfoSlice(mapconv.StringSliceVal(data, "writers")) + auditors := toUserInfoSlice(mapconv.StringSliceVal(data, "auditors")) + if len(writers) == 0 && len(auditors) == 0 { + return nil + } + return &model.GrpsIOServiceSettings{ + UID: uid, + Writers: writers, + Auditors: auditors, + } +} + +// transformV1ToGrpsIOService maps v1 DynamoDB fields to the GrpsIOService domain model. +// Source is always "v1-sync" to distinguish these from API-created records. +func transformV1ToGrpsIOService(uid string, data map[string]any) *model.GrpsIOService { + svc := &model.GrpsIOService{ + UID: uid, + Type: mapconv.StringVal(data, "group_service_type"), + Domain: mapconv.StringVal(data, "domain"), + GroupID: mapconv.Int64Ptr(data, "group_id"), + Prefix: mapconv.StringVal(data, "prefix"), + ProjectUID: mapconv.StringVal(data, "project_id"), + ProjectSlug: mapconv.StringVal(data, "proj_id"), + Source: "v1-sync", + } + + if ts := mapconv.StringVal(data, "created_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + svc.CreatedAt = t + } + } + if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + svc.UpdatedAt = t + } + } + if ts := mapconv.StringVal(data, "last_system_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + svc.SystemUpdatedAt = t + } + } + + return svc +} diff --git a/internal/service/datastream_service_handler_test.go b/internal/service/datastream_service_handler_test.go new file mode 100644 index 0000000..22f6d88 --- /dev/null +++ b/internal/service/datastream_service_handler_test.go @@ -0,0 +1,93 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "testing" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/mock" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + "github.com/stretchr/testify/assert" +) + +func TestHandleDataStreamServiceUpdate_MissingProjectID_ACK(t *testing.T) { + nak := HandleDataStreamServiceUpdate(context.Background(), "svc-1", + map[string]any{}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.False(t, nak, "missing project_id should ACK (not retry)") +} + +func TestHandleDataStreamServiceUpdate_ProjectMappingAbsent_NAK(t *testing.T) { + nak := HandleDataStreamServiceUpdate(context.Background(), "svc-1", + map[string]any{"project_id": "sfid-proj"}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.True(t, nak, "unknown project mapping should NAK for retry") +} + +func TestHandleDataStreamServiceUpdate_HappyPath_ACKAndPublishes(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamServiceUpdate(context.Background(), "svc-1", + map[string]any{ + "project_id": "sfid-proj", + "group_service_type": "mailing-list", + "domain": "example.com", + }, + pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOServiceSubject, pub.IndexerCalls[0].Subject) + assert.Len(t, pub.AccessCalls, 1) + assert.Equal(t, constants.UpdateAccessGroupsIOServiceSubject, pub.AccessCalls[0].Subject) + + _, present := m.GetMappingValue(context.Background(), + fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService)) + assert.True(t, present, "mapping should be written after successful processing") +} + +func TestHandleDataStreamServiceUpdate_CreateVsUpdate_Action(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + + data := func() map[string]any { return map[string]any{"project_id": "sfid-proj"} } + ctx := context.Background() + mKey := fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService) + + assert.Equal(t, model.ActionCreated, m.ResolveAction(ctx, mKey)) + HandleDataStreamServiceUpdate(ctx, "svc-1", data(), &mock.SpyMessagePublisher{}, m) + assert.Equal(t, model.ActionUpdated, m.ResolveAction(ctx, mKey)) +} + +func TestHandleDataStreamServiceDelete_DuplicateDelete_ACK(t *testing.T) { + m := mock.NewFakeMappingStore() + ctx := context.Background() + _ = m.PutTombstone(ctx, fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService)) + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamServiceDelete(ctx, "svc-1", pub, m) + + assert.False(t, nak) + assert.Empty(t, pub.IndexerCalls, "duplicate delete should not publish") +} + +func TestHandleDataStreamServiceDelete_HappyPath_ACKAndTombstones(t *testing.T) { + m := mock.NewFakeMappingStore() + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamServiceDelete(context.Background(), "svc-1", pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOServiceSubject, pub.IndexerCalls[0].Subject) + assert.Len(t, pub.AccessCalls, 1) + assert.Equal(t, constants.DeleteAllAccessGroupsIOServiceSubject, pub.AccessCalls[0].Subject) + + assert.True(t, m.IsTombstoned(context.Background(), + fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService))) +} diff --git a/internal/service/datastream_subgroup_handler.go b/internal/service/datastream_subgroup_handler.go new file mode 100644 index 0000000..7efd426 --- /dev/null +++ b/internal/service/datastream_subgroup_handler.go @@ -0,0 +1,271 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/model" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/domain/port" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + pkgerrors "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/errors" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/mapconv" +) + +// HandleDataStreamSubgroupUpdate transforms the v1 payload into a GrpsIOMailingList and publishes +// indexer + access control messages. Returns true to NAK when the parent service mapping +// is absent (ordering guarantee) or on transient errors. +func HandleDataStreamSubgroupUpdate(ctx context.Context, uid string, data map[string]any, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + // Resolve v1 project SFID → v2 project UID via the shared project.sfid.{sfid} mapping + // written by lfx-v1-sync-helper. NAK if the project hasn't been processed yet. + projectSFID := mapconv.StringVal(data, "project_id") + if projectSFID == "" { + slog.ErrorContext(ctx, "missing project_id in subgroup event, discarding", "uid", uid) + return false // ACK — malformed data, retrying won't help + } + projectUID, ok := mappings.GetMappingValue(ctx, fmt.Sprintf("%s.%s", constants.KVMappingPrefixProjectBySFID, projectSFID)) + if !ok { + slog.WarnContext(ctx, "project mapping not yet available, NAKing subgroup for retry", + "uid", uid, "project_sfid", projectSFID) + return true // NAK — retry with backoff + } + data["project_id"] = projectUID + + // Resolve optional v1 committee SFID → v2 committee UID. NAK if the committee + // has been specified but hasn't been synced yet (ordering guarantee). + if committeeSFID := mapconv.StringVal(data, "committee"); committeeSFID != "" { + committeeUID, ok := mappings.GetMappingValue(ctx, fmt.Sprintf("%s.%s", constants.KVMappingPrefixCommitteeBySFID, committeeSFID)) + if !ok { + slog.WarnContext(ctx, "committee mapping not yet available, NAKing subgroup for retry", + "uid", uid, "committee_sfid", committeeSFID) + return true // NAK — retry with backoff + } + data["committee"] = committeeUID + } + + list := transformV1ToGrpsIOMailingList(uid, data) + + if list.ServiceUID == "" { + slog.ErrorContext(ctx, "missing parent_id in subgroup event, discarding", "uid", uid) + return false // ACK — malformed data, retrying won't help + } + + // Parent dependency check: the indexer must have the parent service record before + // the child mailing list to avoid orphaned documents in OpenSearch. + serviceKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixService, list.ServiceUID) + if !mappings.IsMappingPresent(ctx, serviceKey) { + slog.WarnContext(ctx, "parent service not yet processed, NAKing subgroup for retry", + "uid", uid, "service_uid", list.ServiceUID) + return true // NAK — retry with backoff + } + + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixSubgroup, uid) + + if mappings.IsTombstoned(ctx, mKey) { + slog.InfoContext(ctx, "subgroup mapping is tombstoned, skipping update", "uid", uid) + return false + } + + action := mappings.ResolveAction(ctx, mKey) + + msg := &model.IndexerMessage{Action: action, Tags: list.Tags()} + built, err := msg.Build(ctx, list) + if err != nil { + slog.ErrorContext(ctx, "failed to build subgroup indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOMailingListSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish subgroup indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + // Publish settings indexer message when writers or auditors are present. + settings := buildMailingListSettings(uid, data) + if settings != nil { + settingsMsg := &model.IndexerMessage{Action: action, Tags: settings.Tags()} + builtSettings, errSettings := settingsMsg.Build(ctx, settings) + if errSettings != nil { + slog.ErrorContext(ctx, "failed to build subgroup settings indexer message", "uid", uid, "error", errSettings) + } + if errSettings == nil { + if errPublish := publisher.Indexer(ctx, constants.IndexGroupsIOMailingListSettingsSubject, builtSettings); errPublish != nil { + slog.ErrorContext(ctx, "failed to publish subgroup settings indexer message", "uid", uid, "error", errPublish) + } + } + } + + references := map[string][]string{ + // Project access is inherited through the service — only service reference needed. + constants.RelationGroupsIOService: {list.ServiceUID}, + } + for _, committee := range list.Committees { + if committee.UID != "" { + references[constants.RelationCommittee] = append(references[constants.RelationCommittee], committee.UID) + } + } + if settings != nil { + if writers := userInfoUsernames(settings.Writers); len(writers) > 0 { + references[constants.RelationWriter] = writers + } + if auditors := userInfoUsernames(settings.Auditors); len(auditors) > 0 { + references[constants.RelationAuditor] = auditors + } + } + accessMsg := &model.AccessMessage{ + UID: uid, + ObjectType: constants.ObjectTypeGroupsIOMailingList, + Public: list.Public, + References: references, + } + if err := publisher.Access(ctx, constants.UpdateAccessGroupsIOMailingListSubject, accessMsg); err != nil { + slog.WarnContext(ctx, "failed to publish subgroup access message", "uid", uid, "error", err) + } + + if err := mappings.PutMapping(ctx, mKey, uid); err != nil { + slog.ErrorContext(ctx, "failed to put mapping key", "mapping_key", mKey, "error", err) + } + + // Store reverse index: group_id → subgroup UID so member events can resolve MailingListUID. + if list.GroupID != nil { + gidKey := fmt.Sprintf("%s.%d", constants.KVMappingPrefixSubgroupByGroupID, *list.GroupID) + if err := mappings.PutMapping(ctx, gidKey, uid); err != nil { + slog.ErrorContext(ctx, "failed to put mapping key", "mapping_key", gidKey, "error", err) + } + } + + return false +} + +// HandleDataStreamSubgroupDelete publishes a delete indexer message and tombstones the mapping. +func HandleDataStreamSubgroupDelete(ctx context.Context, uid string, publisher port.MessagePublisher, mappings port.MappingReaderWriter) bool { + mKey := fmt.Sprintf("%s.%s", constants.KVMappingPrefixSubgroup, uid) + + if mappings.IsTombstoned(ctx, mKey) { + slog.InfoContext(ctx, "subgroup already deleted, ACKing duplicate", "uid", uid) + return false + } + + // If there is no mapping entry, this record was never indexed — nothing to delete. + if !mappings.IsMappingPresent(ctx, mKey) { + slog.InfoContext(ctx, "subgroup was never indexed, skipping OpenSearch delete", "uid", uid) + if err := mappings.PutTombstone(ctx, mKey); err != nil { + slog.ErrorContext(ctx, "failed to put tombstone", "mapping_key", mKey, "error", err) + } + return false + } + + msg := &model.IndexerMessage{Action: model.ActionDeleted} + built, err := msg.Build(ctx, uid) + if err != nil { + slog.ErrorContext(ctx, "failed to build subgroup delete indexer message", "uid", uid, "error", err) + return false + } + + if err := publisher.Indexer(ctx, constants.IndexGroupsIOMailingListSubject, built); err != nil { + slog.ErrorContext(ctx, "failed to publish subgroup delete indexer message", "uid", uid, "error", err) + return pkgerrors.IsTransient(err) + } + + if err := publisher.Access(ctx, constants.DeleteAllAccessGroupsIOMailingListSubject, uid); err != nil { + slog.WarnContext(ctx, "failed to publish subgroup delete access message", "uid", uid, "error", err) + } + + if err := mappings.PutTombstone(ctx, mKey); err != nil { + slog.ErrorContext(ctx, "failed to put tombstone", "mapping_key", mKey, "error", err) + } + return false +} + +// buildMailingListSettings constructs a GrpsIOMailingListSettings from v1 writers/auditors. +// Returns nil when both slices are empty (no settings message needed). +func buildMailingListSettings(uid string, data map[string]any) *model.GrpsIOMailingListSettings { + writers := toUserInfoSlice(mapconv.StringSliceVal(data, "writers")) + auditors := toUserInfoSlice(mapconv.StringSliceVal(data, "auditors")) + if len(writers) == 0 && len(auditors) == 0 { + return nil + } + return &model.GrpsIOMailingListSettings{ + UID: uid, + Writers: writers, + Auditors: auditors, + } +} + +// toUserInfoSlice converts a slice of username strings to UserInfo values. +func toUserInfoSlice(usernames []string) []model.UserInfo { + if len(usernames) == 0 { + return nil + } + out := make([]model.UserInfo, len(usernames)) + for i, u := range usernames { + username := u + out[i] = model.UserInfo{Username: &username} + } + return out +} + +// userInfoUsernames extracts the non-empty Username pointers from a []UserInfo slice. +func userInfoUsernames(users []model.UserInfo) []string { + out := make([]string, 0, len(users)) + for _, u := range users { + if u.Username != nil && *u.Username != "" { + out = append(out, *u.Username) + } + } + return out +} + +// transformV1ToGrpsIOMailingList maps v1 DynamoDB fields to the GrpsIOMailingList domain model. +func transformV1ToGrpsIOMailingList(uid string, data map[string]any) *model.GrpsIOMailingList { + list := &model.GrpsIOMailingList{ + UID: uid, + GroupID: mapconv.Int64Ptr(data, "group_id"), + GroupName: mapconv.StringVal(data, "group_name"), + Public: mapconv.StringVal(data, "visibility") == "Public", + Type: mapconv.StringVal(data, "type"), + Description: mapconv.StringVal(data, "description"), + Title: mapconv.StringVal(data, "title"), + SubjectTag: mapconv.StringVal(data, "subject_tag"), + URL: mapconv.StringVal(data, "url"), + Flags: mapconv.StringSliceVal(data, "flags"), + ServiceUID: mapconv.StringVal(data, "parent_id"), + ProjectUID: mapconv.StringVal(data, "project_id"), + Source: "v1-sync", + } + + if n := mapconv.Int64Ptr(data, "subscriber_count"); n != nil { + list.SubscriberCount = int(*n) + } + + if committeeUID := mapconv.StringVal(data, "committee"); committeeUID != "" { + list.Committees = []model.Committee{{ + UID: committeeUID, + AllowedVotingStatuses: mapconv.StringSliceVal(data, "committee_filters"), + }} + } + + if ts := mapconv.StringVal(data, "created_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + list.CreatedAt = t + } + } + + if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + list.UpdatedAt = t + } + } + + if ts := mapconv.StringVal(data, "last_system_modified_at"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + list.SystemUpdatedAt = t + } + } + + return list +} diff --git a/internal/service/datastream_subgroup_handler_test.go b/internal/service/datastream_subgroup_handler_test.go new file mode 100644 index 0000000..72b966d --- /dev/null +++ b/internal/service/datastream_subgroup_handler_test.go @@ -0,0 +1,149 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package service + +import ( + "context" + "fmt" + "testing" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/infrastructure/mock" + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants" + "github.com/stretchr/testify/assert" +) + +func TestHandleDataStreamSubgroupUpdate_MissingProjectID_ACK(t *testing.T) { + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.False(t, nak, "missing project_id should ACK") +} + +func TestHandleDataStreamSubgroupUpdate_ProjectMappingAbsent_NAK(t *testing.T) { + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{"project_id": "sfid-proj"}, + &mock.SpyMessagePublisher{}, mock.NewFakeMappingStore()) + assert.True(t, nak, "unknown project mapping should NAK") +} + +func TestHandleDataStreamSubgroupUpdate_CommitteeMappingAbsent_NAK(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + m.Set(fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService), "svc-1") + + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{ + "project_id": "sfid-proj", + "parent_id": "svc-1", + "committee": "sfid-committee", // mapping absent + }, + &mock.SpyMessagePublisher{}, m) + assert.True(t, nak, "unknown committee mapping should NAK") +} + +func TestHandleDataStreamSubgroupUpdate_ParentServiceAbsent_NAK(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + // service mapping deliberately absent + + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{ + "project_id": "sfid-proj", + "parent_id": "svc-1", + }, + &mock.SpyMessagePublisher{}, m) + assert.True(t, nak, "absent parent service should NAK") +} + +func TestHandleDataStreamSubgroupUpdate_HappyPath_ACKAndPublishesAndWritesMappings(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + m.Set(fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService), "svc-1") + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{ + "project_id": "sfid-proj", + "parent_id": "svc-1", + "group_id": float64(42), + "group_name": "dev", + }, + pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOMailingListSubject, pub.IndexerCalls[0].Subject) + assert.Len(t, pub.AccessCalls, 1) + assert.Equal(t, constants.UpdateAccessGroupsIOMailingListSubject, pub.AccessCalls[0].Subject) + + _, present := m.GetMappingValue(context.Background(), + fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup)) + assert.True(t, present, "forward mapping should be written") + + rev, ok := m.GetMappingValue(context.Background(), + fmt.Sprintf("%s.42", constants.KVMappingPrefixSubgroupByGroupID)) + assert.True(t, ok, "reverse group_id index should be written") + assert.Equal(t, "sg-1", rev) +} + +func TestHandleDataStreamSubgroupUpdate_WithCommittee_ResolvesAndPublishes(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + m.Set(fmt.Sprintf("%s.sfid-committee", constants.KVMappingPrefixCommitteeBySFID), "committee-uid") + m.Set(fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService), "svc-1") + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{ + "project_id": "sfid-proj", + "parent_id": "svc-1", + "committee": "sfid-committee", + }, + pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) +} + +func TestHandleDataStreamSubgroupUpdate_NoGroupID_NoReverseIndex(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sfid-proj", constants.KVMappingPrefixProjectBySFID), "proj-uid") + m.Set(fmt.Sprintf("%s.svc-1", constants.KVMappingPrefixService), "svc-1") + + HandleDataStreamSubgroupUpdate(context.Background(), "sg-1", + map[string]any{"project_id": "sfid-proj", "parent_id": "svc-1"}, + &mock.SpyMessagePublisher{}, m) + + _, ok := m.GetMappingValue(context.Background(), + fmt.Sprintf("%s.0", constants.KVMappingPrefixSubgroupByGroupID)) + assert.False(t, ok, "should not write reverse index when group_id is absent") +} + +func TestHandleDataStreamSubgroupDelete_DuplicateDelete_ACK(t *testing.T) { + m := mock.NewFakeMappingStore() + ctx := context.Background() + _ = m.PutTombstone(ctx, fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup)) + + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamSubgroupDelete(ctx, "sg-1", pub, m) + + assert.False(t, nak) + assert.Empty(t, pub.IndexerCalls, "duplicate delete should not publish") +} + +func TestHandleDataStreamSubgroupDelete_HappyPath_ACKAndTombstones(t *testing.T) { + m := mock.NewFakeMappingStore() + m.Set(fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup), "sg-1") + pub := &mock.SpyMessagePublisher{} + nak := HandleDataStreamSubgroupDelete(context.Background(), "sg-1", pub, m) + + assert.False(t, nak) + assert.Len(t, pub.IndexerCalls, 1) + assert.Equal(t, constants.IndexGroupsIOMailingListSubject, pub.IndexerCalls[0].Subject) + assert.Len(t, pub.AccessCalls, 1) + assert.Equal(t, constants.DeleteAllAccessGroupsIOMailingListSubject, pub.AccessCalls[0].Subject) + + assert.True(t, m.IsTombstoned(context.Background(), + fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup))) +} diff --git a/pkg/constants/storage.go b/pkg/constants/storage.go index dc4a46e..8355aad 100644 --- a/pkg/constants/storage.go +++ b/pkg/constants/storage.go @@ -51,6 +51,35 @@ const ( // KVLookupGroupsIOMemberByGroupIDPrefix is the key pattern for GroupsIOGroupID index (lookup by Groups.io group ID) KVLookupGroupsIOMemberByGroupIDPrefix = "lookup/groupsio-member-groupid/%d" + // KVBucketNameV1Mappings is the shared KV bucket used by v1 eventing consumers to track + // processed entities (idempotency, created-vs-updated, tombstone markers for deletes). + KVBucketNameV1Mappings = "v1-mappings" + + // KVBucketV1Objects is the NATS KV bucket that lfx-v1-sync-helper writes DynamoDB records into. + KVBucketV1Objects = "v1-objects" + + // KVTombstoneMarker is the value written to v1-mappings after a successful delete, + // preventing duplicate delete processing on consumer redelivery. + KVTombstoneMarker = "!del" + + // KVMappingPrefixService is the v1-mappings key prefix for GroupsIO services. + KVMappingPrefixService = "groupsio-service" + // KVMappingPrefixSubgroup is the v1-mappings key prefix for GroupsIO subgroups (mailing lists). + KVMappingPrefixSubgroup = "groupsio-subgroup" + // KVMappingPrefixMember is the v1-mappings key prefix for GroupsIO members. + KVMappingPrefixMember = "groupsio-member" + // KVMappingPrefixSubgroupByGroupID is the v1-mappings reverse index: Groups.io group_id → subgroup UID. + // Written by the subgroup handler so the member handler can resolve MailingListUID from group_id. + KVMappingPrefixSubgroupByGroupID = "groupsio-subgroup-gid" + + // KVMappingPrefixProjectBySFID is the v1-mappings forward index written by lfx-v1-sync-helper: + // project.sfid.{sfid} → v2 project UID. Used to resolve the v1 project_id (SFID) to a v2 UID. + KVMappingPrefixProjectBySFID = "project.sfid" + + // KVMappingPrefixCommitteeBySFID is the v1-mappings forward index written by lfx-v1-sync-helper: + // committee.sfid.{sfid} → v2 committee UID. Used to resolve the v1 committee SFID to a v2 UID. + KVMappingPrefixCommitteeBySFID = "committee.sfid" + // Key prefixes for bucket detection // GroupsIOMailingListKeyPrefix is the common prefix for all mailing list related keys GroupsIOMailingListKeyPrefix = "lookup/groupsio-mailing-list/" diff --git a/pkg/errors/transient.go b/pkg/errors/transient.go new file mode 100644 index 0000000..7cc93a7 --- /dev/null +++ b/pkg/errors/transient.go @@ -0,0 +1,20 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package errors + +import "strings" + +// IsTransient reports whether err is likely to resolve on retry. +// It matches against common keywords indicating network or availability failures: +// timeout, connection, unavailable, and deadline. +func IsTransient(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "timeout") || + strings.Contains(s, "connection") || + strings.Contains(s, "unavailable") || + strings.Contains(s, "deadline") +} diff --git a/pkg/mapconv/field_extract.go b/pkg/mapconv/field_extract.go new file mode 100644 index 0000000..37df6ce --- /dev/null +++ b/pkg/mapconv/field_extract.go @@ -0,0 +1,91 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +// Package mapconv provides typed field extraction from map[string]any payloads. +// +// JSON decoded with json.Unmarshal produces map[string]any where numeric values +// are float64 and compound values are []any or map[string]any. Each function +// handles these standard representations so callers do not need to branch on +// the raw type. +package mapconv + +import ( + "fmt" + "math" + "strconv" +) + +// StringVal extracts a string value from data[key]. +// Numeric values are formatted with %g (no unnecessary trailing zeros). +// Returns "" if the key is absent or nil. +func StringVal(data map[string]any, key string) string { + v, ok := data[key] + if !ok || v == nil { + return "" + } + switch t := v.(type) { + case string: + return t + case float64: + return fmt.Sprintf("%g", t) + default: + return fmt.Sprintf("%v", t) + } +} + +// Int64Ptr extracts a nullable int64 from data[key]. +// Accepts float64 (standard JSON number) or string representations. +// Returns nil if the key is absent, nil, or unparseable. +func Int64Ptr(data map[string]any, key string) *int64 { + v, ok := data[key] + if !ok || v == nil { + return nil + } + var n int64 + switch t := v.(type) { + case float64: + if t != math.Trunc(t) { + return nil + } + n = int64(t) + case string: + if t == "" { + return nil + } + parsed, err := strconv.ParseInt(t, 10, 64) + if err != nil { + return nil + } + n = parsed + default: + return nil + } + return &n +} + +// StringSliceVal extracts a []string from data[key]. +// Accepts a JSON array of strings or a bare string (returned as a one-element slice). +// Returns nil if the key is absent or the value is an empty string. +func StringSliceVal(data map[string]any, key string) []string { + v, ok := data[key] + if !ok || v == nil { + return nil + } + switch t := v.(type) { + case []any: + out := make([]string, 0, len(t)) + for _, item := range t { + if s, ok := item.(string); ok { + out = append(out, s) + } + } + return out + case string: + if t == "" { + return nil + } + return []string{t} + default: + return nil + } +} diff --git a/pkg/mapconv/field_extract_test.go b/pkg/mapconv/field_extract_test.go new file mode 100644 index 0000000..df993cb --- /dev/null +++ b/pkg/mapconv/field_extract_test.go @@ -0,0 +1,77 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package mapconv_test + +import ( + "testing" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/mapconv" + "github.com/stretchr/testify/assert" +) + +func TestStringVal(t *testing.T) { + tests := []struct { + name string + data map[string]any + key string + expected string + }{ + {"string value", map[string]any{"k": "hello"}, "k", "hello"}, + {"float64 whole number", map[string]any{"k": float64(42)}, "k", "42"}, + {"float64 with decimals", map[string]any{"k": float64(3.14)}, "k", "3.14"}, + {"nil value", map[string]any{"k": nil}, "k", ""}, + {"missing key", map[string]any{}, "k", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, mapconv.StringVal(tt.data, tt.key)) + }) + } +} + +func TestInt64Ptr(t *testing.T) { + ptr := func(n int64) *int64 { return &n } + + tests := []struct { + name string + data map[string]any + key string + expected *int64 + }{ + {"float64 value", map[string]any{"k": float64(99)}, "k", ptr(99)}, + {"string value", map[string]any{"k": "12345"}, "k", ptr(12345)}, + {"empty string", map[string]any{"k": ""}, "k", nil}, + {"nil value", map[string]any{"k": nil}, "k", nil}, + {"missing key", map[string]any{}, "k", nil}, + {"unparseable string", map[string]any{"k": "abc"}, "k", nil}, + {"partial string 123abc", map[string]any{"k": "123abc"}, "k", nil}, + {"non-integer float64 truncated", map[string]any{"k": float64(3.9)}, "k", nil}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, mapconv.Int64Ptr(tt.data, tt.key)) + }) + } +} + +func TestStringSliceVal(t *testing.T) { + tests := []struct { + name string + data map[string]any + key string + expected []string + }{ + {"array of strings", map[string]any{"k": []any{"a", "b", "c"}}, "k", []string{"a", "b", "c"}}, + {"single string", map[string]any{"k": "only"}, "k", []string{"only"}}, + {"empty string", map[string]any{"k": ""}, "k", nil}, + {"nil value", map[string]any{"k": nil}, "k", nil}, + {"missing key", map[string]any{}, "k", nil}, + {"non-string items in array are skipped", map[string]any{"k": []any{"a", float64(1), "b"}}, "k", []string{"a", "b"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, mapconv.StringSliceVal(tt.data, tt.key)) + }) + } +} diff --git a/pkg/principal/principal.go b/pkg/principal/principal.go new file mode 100644 index 0000000..765dc48 --- /dev/null +++ b/pkg/principal/principal.go @@ -0,0 +1,46 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +// Package principal converts v1 usernames to the Auth0 "sub" format expected by v2 +// services: "auth0|{userID}". +// +// The mapping logic is shared with lfx-v1-sync-helper and must remain in sync with it. +package principal + +import ( + "crypto/sha512" + "regexp" + + "github.com/akamensky/base58" +) + +var ( + // safeNameRE matches usernames safe to use directly as Auth0 user IDs. + safeNameRE = regexp.MustCompile(`^[A-Za-z0-9][A-Za-z0-9._-]{0,58}[A-Za-z0-9]$`) + // hexUserRE matches usernames that could collide with Auth0 native-DB hexadecimal IDs. + hexUserRE = regexp.MustCompile(`^[0-9a-f]{24,60}$`) +) + +// FromUsername converts a raw v1 username to the Auth0 "sub" format: "auth0|{userID}". +// +// Safe usernames (matching safeNameRE but not hexUserRE) are used verbatim as the userID. +// All others are SHA-512 hashed and base58-encoded (~80 chars) to handle legacy usernames +// that are too long, contain non-standard characters, or risk colliding with future Auth0 +// native-DB hexadecimal IDs. +// +// Returns an empty string when username is empty. +func FromUsername(username string) string { + if username == "" { + return "" + } + + var userID string + if safeNameRE.MatchString(username) && !hexUserRE.MatchString(username) { + userID = username + } else { + hash := sha512.Sum512([]byte(username)) + userID = base58.Encode(hash[:]) + } + + return "auth0|" + userID +} diff --git a/pkg/principal/principal_test.go b/pkg/principal/principal_test.go new file mode 100644 index 0000000..aeb18d5 --- /dev/null +++ b/pkg/principal/principal_test.go @@ -0,0 +1,79 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package principal_test + +import ( + "testing" + + "github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/principal" + "github.com/stretchr/testify/assert" +) + +func TestFromUsername(t *testing.T) { + tests := []struct { + name string + username string + expected string + }{ + { + name: "empty returns empty", + username: "", + expected: "", + }, + { + // Safe username: matches safeNameRE, does not match hexUserRE — used verbatim. + name: "simple safe username", + username: "john.doe", + expected: "auth0|john.doe", + }, + { + name: "safe username with hyphens and digits", + username: "user-123", + expected: "auth0|user-123", + }, + { + // 60 chars = 1 + 58 + 1, the maximum safeNameRE allows. + name: "exactly 60 safe chars used verbatim", + username: "abcdefghij0123456789abcdefghij0123456789abcdefghij0123456789", + expected: "auth0|abcdefghij0123456789abcdefghij0123456789abcdefghij0123456789", + }, + { + // 61 chars — exceeds safeNameRE max, must be SHA-512 + base58 encoded. + name: "username longer than 60 chars is hashed", + username: "abcdefghij0123456789abcdefghij0123456789abcdefghij01234567890", + expected: "auth0|5fLnnbn4KGc4pxKzgK9JE4GGGpKoWdUqSnsQtutw2XBBTr8qBbv6vv71m1TsGe3mbNvr6a6ncktckEBVD2yhUKD3", + }, + { + // Pure lowercase hex 24+ chars matches hexUserRE — must be hashed to avoid + // colliding with Auth0 native-DB IDs. + name: "24-char hex username is hashed", + username: "0123456789abcdef01234567", + expected: "auth0|3TjHYyavZDgNgHjy8pnsNZAD7Ek7bVyv9NRnF5384aAmUdvqh2NADaPWr1k1QyX2sbs8Yoh2m5wV7BdMwmpkstf2", + }, + { + // Space is not in [A-Za-z0-9._-] so safeNameRE won't match — must be hashed. + name: "username with space is hashed", + username: "John Doe", + expected: "auth0|dsNnfygV3vpJeB65SWP4JwT4Jcud9eUHjuqetwR7XYZdCokZ7vcs1VUqkov1ktH5qppUQsmHgy5Z3j6ZhwekXY2", + }, + { + // @ is not allowed by safeNameRE — must be hashed. + name: "email-style username is hashed", + username: "user@example.com", + expected: "auth0|3CCfg5ewbyYkXLuXR6oq4aXDZCrV4dqpMSY9XdoWUdENu9MPm9RcZUrCVzMe1W1vzXfkaMVN8awpp82tMSF8AswD", + }, + { + // Hashing must be deterministic: calling twice yields the same result. + name: "deterministic output for safe username", + username: "stable-user", + expected: "auth0|stable-user", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, principal.FromUsername(tt.username)) + }) + } +}