Skip to content

[LFXV2-1223] Event processor for indexer & access-control handlers #34

Open
mauriciozanettisalomao wants to merge 11 commits intolinuxfoundation:mainfrom
mauriciozanettisalomao:feat/lfxv2-1223-eventing-handlers-indexer-access
Open

[LFXV2-1223] Event processor for indexer & access-control handlers #34
mauriciozanettisalomao wants to merge 11 commits intolinuxfoundation:mainfrom
mauriciozanettisalomao:feat/lfxv2-1223-eventing-handlers-indexer-access

Conversation

@mauriciozanettisalomao
Copy link
Contributor

@mauriciozanettisalomao mauriciozanettisalomao commented Mar 12, 2026

Summary

Jira Ticke: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

  • Introduces a durable NATS JetStream KV-bucket consumer (EventProcessor) that watches three GroupsIO entity buckets (itx-groupsio-v2-service, itx-groupsio-v2-subgroup, itx-groupsio-v2-member) and fans events out to typed domain handlers.
  • Adds domain-layer handler interfaces (DataStreamEventHandler, DataStreamProcessor) and concrete service implementations for service, subgroup/mailing-list, and member entities, each publishing indexer and access-control messages to the platform NATS subjects.
  • Adds a MappingStore abstraction (backed by NATS JetStream KV) to resolve entity references (e.g. member → mailing_list → service → project) at event-handling time.
  • Adds pkg/mapconv helpers for safe, typed extraction of nested fields from raw map[string]any payloads received from the KV bucket.
  • Wires the processor into main.go as a long-running goroutine alongside the existing committee and mailing-list sync loops.

Files changed

Eventing/Data Stream Integration

  • Added handleDataStream function and supporting environment-based configuration in data_stream.go to start a JetStream consumer for DynamoDB KV change events, with graceful shutdown logic.
  • Introduced event_processor.go implementing the EventProcessor interface for JetStream KV event consumption, including durable consumer setup, message processing, and shutdown.
  • Added handler.go to route KV events to appropriate entity handlers (service, subgroup, member) based on key prefixes, supporting both soft and hard deletes.
  • Integrated data stream processor startup in main.go, conditionally enabled via EVENTING_ENABLED environment variable.
  • Provided a MappingReaderWriter provider in service/providers.go for idempotency tracking in event handling. [1] [2]

Configuration and Documentation

  • Added EVENTING_ENABLED environment variable to .env.example and Helm values for Kubernetes deployment, enabling eventing features. [1] [2]
  • Updated README.md with directory structure changes, new eventing components, and a link to additional documentation describing v1→v2 data stream event processing. [1] [2] [3]

Code Architectural decisions

The eventing pipeline follows the same hexagonal structure already in place across the service. Each concern lives in the layer that owns it — infrastructure never leaks into domain, and domain never reaches into infrastructure.

Decision What was built Why it matters
Domain ports define the contract DataEventHandler, DataStreamProcessor, MappingReader/Writer in internal/domain/port/ The domain says what it needs in its own terms. Nothing in internal/service/ imports NATS — the compiler enforces the layer boundary.
Handlers depend on interfaces, not structs HandleDataStreamServiceUpdate(ctx, uid, data, publisher, mappings) takes port.MessagePublisher and port.MappingReaderWriter The service layer never sees a NATS client. Swapping the broker means changing only the infrastructure package.
Infrastructure sealed in its own package NATS client, JetStream consumer, and KV-backed MappingStore all live in internal/infrastructure/nats/ Concrete types don't leak out. If the broker changes, no domain or service file needs to change.
Domain model owns the event type model.StreamMessage in internal/domain/model/ carries Key, Data, and IsRemoval The domain describes events in its own language. The eventing/ adapter translates jetstream.Msg into StreamMessage — the domain never sees a JetStream type.
Adapter seam at the cmd/ boundary eventing.EventProcessor interface; natsEventProcessor is the only concrete impl, wired in cmd/.../data_stream.go The only file that knows about concrete infrastructure is in cmd/. Everything inward depends on interfaces.
Interface segregation on MappingStore MappingReader, MappingWriter, MappingReaderWriter in internal/domain/port/mapping_store.go Handlers that only read state take MappingReader. Only handlers that also write take the full interface. No handler depends on more than it uses.

Test evidence

The log trace below was captured while running the service against the staging NATS cluster with AUTH_SOURCE=mock, GROUPSIO_SOURCE=mock, REPOSITORY_SOURCE=nats (real KV storage).

1. Service startup & processor initialisation

[INFO] Starting mailing list service  http-port=8080
[INFO] provider configuration validation  auth_source=mock  repository_source=nats  groupsio_source=mock
[INFO] initializing NATS service
[INFO] NATS client created successfully  connected_url=nats://lfx-platform-nats.lfx.svc.cluster.local:4222  status=1
[INFO] data stream processor created  consumer_name=mailing-list-service-kv-consumer  stream_name=KV_v1-objects
[INFO] starting data stream processor  consumer_name=mailing-list-service-kv-consumer
[INFO] data stream processor started successfully
[DEBUG] NATS client is ready  url=nats://lfx-platform-nats.lfx.svc.cluster.local:4222

The durable consumer mailing-list-service-kv-consumer is created (or resumed) on stream KV_v1-objects using DeliverLastPerSubjectPolicy — on restart it picks up from the last acknowledged offset per key, so no events are missed or double-processed.


2. Mailing-list indexer event — created

[DEBUG] message published successfully
  subject=lfx.index.groupsio_mailing_list  message_type=indexer  message_size=1161

[INFO] Transaction created successfully
  component=indexer
  transaction_id=txn_groupsio_mailing_list_created_1773321220255207790
  action=created  object_type=groupsio_mailing_list  is_v1=false
  tags=[
    project_uid:a0941000002wBz9AAE,
    service_uid:ab9de462-ac38-4175-99a9-9e209e335ca6,
    type:discussion_open, public:false,
    committee_uid:e062715b-fd2b-4e6f-8166-c51d69e455f2,
    committee_voting_status:observer,
    committee_voting_status:alt_voting_rep,
    committee_voting_status:voting_rep,
    groupsio_mailing_list_uid:147542,
    group_name:ddb-stream-consumer-mailint-list
  ]

[INFO] Transaction processing completed successfully
  component=nats  request_id=7a5782112d144bda
  object_ref=groupsio_mailing_list:147542

[INFO] Indexing event published
  subject=lfx.groupsio_mailing_list.created
  document_id=groupsio_mailing_list:147542

[INFO] Indexing message processed successfully
  action=created  object_type=groupsio_mailing_list
  document_id=groupsio_mailing_list:147542  index_success=true

[INFO] Janitor: Single document found, no cleanup needed
  object_ref=groupsio_mailing_list:147542  hits=1

3. Mailing-list indexer event

[INFO] Transaction created successfully
  transaction_id=txn_groupsio_mailing_list_updated_1773321240908737221
  action=updated  object_type=groupsio_mailing_list

[INFO] Indexing event published  subject=lfx.groupsio_mailing_list.updated
[INFO] Indexing message processed successfully  action=updated  index_success=true

-- second update (project resolved from mapping store) --
[INFO] Transaction created successfully
  transaction_id=txn_groupsio_mailing_list_updated_1773322841349932036
  project_uid=b0a360ec-1e37-452f-b5e3-233d4000365a   ← resolved via MappingStore
[INFO] Indexing event published  subject=lfx.groupsio_mailing_list.updated
[INFO] Indexing message processed successfully  action=updated  index_success=true

4. Access-control (OpenFGA) sync

[DEBUG] message published successfully
  subject=lfx.update_access.groupsio_mailing_list  message_type=access  message_size=161

[INFO] handling groupsio_mailing_list access control update
  message={"uid":"147542","object_type":"groupsio_mailing_list","public":false,
           "relations":null,"references":{"groupsio_service":["ab9de462-ac38-4175-99a9-9e209e335ca6"]}}

[DEBUG] will add relation in batch write
  user=groupsio_service:ab9de462-ac38-4175-99a9-9e209e335ca6
  relation=groupsio_service
  object=groupsio_mailing_list:147542

[INFO] wrote and deleted tuples
  writes_count=1  deletes_count=0
  writes=[{object:groupsio_mailing_list:147542, relation:groupsio_service,
            user:groupsio_service:ab9de462-ac38-4175-99a9-9e209e335ca6}]

[INFO] synced tuples  object=groupsio_mailing_list:147542

5. Member indexer event — created

[DEBUG] message published successfully
  subject=lfx.index.groupsio_member  message_type=indexer  message_size=729

[INFO] Transaction created successfully
  transaction_id=txn_groupsio_member_created_1773323870836706819
  action=created  object_type=groupsio_member
  tags=[member_uid:14985390, mailing_list_uid:147542,
        email:[email protected], status:normal]

6. OpenFGA access checks

Principal Object Relation Allowed
user:project_super_admin groupsio_service:ab9de462-... viewer true
user:mauriciozanetti groupsio_service:ab9de462-... viewer false
user:project_super_admin groupsio_mailing_list:147542 viewer true
user:mauriciozanetti groupsio_mailing_list:147542 viewer false

Access tuples are correctly scoped: only admins of the owning project can view resources.


7. OpenSearch — indexed documents

groupsio_service (ab9de462-ac38-4175-99a9-9e209e335ca6)

{
  "object_ref": "groupsio_service:ab9de462-ac38-4175-99a9-9e209e335ca6",
  "object_type": "groupsio_service",
  "tags": [
    "project_uid:b0a360ec-1e37-452f-b5e3-233d4000365a",
    "service_uid:ab9de462-ac38-4175-99a9-9e209e335ca6",
    "service_type:v2_primary"
  ],
  "access_check_query": "groupsio_service:ab9de462-...#viewer",
  "latest": true,
  "data": {
    "domain": "lists.sssdsdtestwew22.com",
    "project_uid": "b0a360ec-1e37-452f-b5e3-233d4000365a",
    "type": "v2_primary",
    "source": "v1-sync"
  }
}

groupsio_mailing_list (147542)

{
  "object_ref": "groupsio_mailing_list:147542",
  "object_type": "groupsio_mailing_list",
  "parent_refs": ["service:ab9de462-...", "project:b0a360ec-..."],
  "tags": [
    "project_uid:b0a360ec-1e37-452f-b5e3-233d4000365a",
    "service_uid:ab9de462-ac38-4175-99a9-9e209e335ca6",
    "type:discussion_open",
    "public:false",
    "committee_uid:e062715b-fd2b-4e6f-8166-c51d69e455f2",
    "committee_voting_status:observer",
    "committee_voting_status:alt_voting_rep",
    "committee_voting_status:voting_rep",
    "groupsio_mailing_list_uid:147542",
    "group_name:ddb-stream-consumer-mailint-list"
  ],
  "data": {
    "group_id": 147542,
    "group_name": "ddb-stream-consumer-mailint-list",
    "project_uid": "b0a360ec-1e37-452f-b5e3-233d4000365a",
    "type": "discussion_open",
    "source": "v1-sync",
    "committees": [
      {
        "uid": "e062715b-fd2b-4e6f-8166-c51d69e455f2",
        "allowed_voting_statuses": ["observer", "alt_voting_rep", "voting_rep"]
      }
    ]
  }
}

groupsio_member (14985390)

{
  "object_ref": "groupsio_member:14985390",
  "object_type": "groupsio_member",
  "parent_refs": ["mailing_list:147542"],
  "sort_name": "Mauricio",
  "name_and_aliases": ["Mauricio", "Salomao"],
  "tags": [
    "member_uid:14985390",
    "mailing_list_uid:147542",
    "email:[email protected]",
    "status:normal"
  ],
  "access_check_query": "groupsio_mailing_list:147542#auditor",
  "data": {
    "first_name": "Mauricio",
    "last_name": "Salomao",
    "email": "[email protected]",
    "member_type": "direct",
    "status": "normal",
    "delivery_mode": "email_delivery_digest",
    "source": "v1-sync"
  }
}

Test checklist

  • Service starts and durable consumer is created/resumed on KV_v1-objects
  • groupsio_service KV change → indexer message published → document indexed in OpenSearch
  • groupsio_mailing_list KV change → indexer message published + access-control tuple synced to OpenFGA
  • groupsio_member KV change → indexer message published with correct parent_refs and access_check_query
  • project_uid correctly resolved from MappingStore (service → project lookup) across multiple update events
  • OpenFGA tuples enforce correct viewer access (project admins only)
  • DeliverLastPerSubjectPolicy ensures no replay flood on restart
  • Unit tests pass: go test -v -race ./pkg/mapconv/...

- Add event handler for processing DynamoDB change events related to GroupsIO services, subgroups, and members.
- Introduce mapping functions to track processed entities and manage idempotency.
- Implement message publishing for indexer and access control messages.
- Create a data stream consumer to route JetStream KV messages to the appropriate event handlers.
- Enhance error handling with transient error detection for retry logic.
- Update main application to start the data stream processor based on environment configuration.

Jira Ticket: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

Assisted by [Claude Code](https://claude.ai/code)

Signed-off-by: Mauricio Zanetti Salomao <[email protected]>
…d improve error handling

Jira Ticket: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

Assisted by [Claude Code](https://claude.ai/code)

Signed-off-by: Mauricio Zanetti Salomao <[email protected]>
…r GroupsIO entities

- Add event handler for processing KV events related to services, subgroups, and members.
- Implement mapping reader/writer for idempotency tracking in the v1-mappings KV bucket.
- Create documentation for event processing architecture, event flow, and data transformation.
- Introduce service, subgroup, and member handlers for transforming v1 payloads to v2 domain models.
- Add constants for project and committee mappings to facilitate resolution of v1 SFIDs to v2 UIDs.
- Implement error handling and logging for transient and permanent errors during event processing.

Jira Ticket: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

Assisted by [Claude Code](https://claude.ai/code)

Signed-off-by: Mauricio Zanetti Salomao <[email protected]>
@mauriciozanettisalomao mauriciozanettisalomao requested a review from a team as a code owner March 12, 2026 14:45
Copilot AI review requested due to automatic review settings March 12, 2026 14:45
@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds an opt-in NATS JetStream eventing subsystem: startup wiring and durable consumer, JetStream client KV/consumer adapters, domain ports/models for stream messages and mapping store, event handlers for service/subgroup/member entities, mapping store adapter and mocks, mapconv helpers and tests, docs, and an EVENTING_ENABLED flag.

Changes

Cohort / File(s) Summary
Configuration & Docs
\.env.example, README.md, docs/event-processing.md, charts/lfx-v2-mailing-list-service/values.yaml
Added EVENTING_ENABLED example/Helm value; expanded README and added detailed event-processing documentation.
Startup & wiring
cmd/mailing-list-api/main.go, cmd/mailing-list-api/data_stream.go
Opt-in data stream startup wired into main; reads env vars, builds processor config, starts EventProcessor goroutine and graceful shutdown; fatal on init error.
Event processing core
cmd/mailing-list-api/eventing/event_processor.go, cmd/mailing-list-api/eventing/handler.go
New EventProcessor (durable JetStream consumer lifecycle) and event handler routing KV change/removal events by key prefix; public Config and EventProcessor API added.
Domain models & ports
internal/domain/model/stream_message.go, internal/domain/port/data_stream_event_handler.go, internal/domain/port/data_stream_processor.go, internal/domain/port/mapping_store.go
Added StreamMessage with Ack/Nak callbacks; defined DataEventHandler, DataStreamProcessor, and MappingReader/Writer/ReaderWriter interfaces with tombstone/action semantics.
NATS JetStream infra
internal/infrastructure/nats/client.go, internal/infrastructure/nats/data_stream_consumer.go, internal/infrastructure/nats/mapping_store.go
NATS client extended with JetStream context and methods (KeyValue, CreateOrUpdateConsumer); dataStreamConsumer that decodes JSON/msgpack and ACK/NAKs with backoff; KV-to-mapping adapter with tombstone handling.
Service handlers & provider
internal/service/datastream_service_handler.go, internal/service/datastream_subgroup_handler.go, internal/service/datastream_member_handler.go, cmd/mailing-list-api/service/providers.go
Handlers to transform v1 payloads to v2 models, publish indexer/access messages, resolve dependencies via mappings, maintain forward/reverse mappings; added MappingReaderWriter provider function.
Constants & utilities
pkg/constants/storage.go, pkg/errors/transient.go
Added KV bucket/prefix constants and tombstone marker; IsTransient helper for retry classification.
Mapconv helpers & tests
pkg/mapconv/field_extract.go, pkg/mapconv/field_extract_test.go, go.mod
Typed field extraction helpers with unit tests; added msgpack dependency.
Mocks & tests
internal/infrastructure/mock/mapping_store.go, internal/infrastructure/mock/message_publisher.go, internal/service/datastream_*_test.go
Added in-memory FakeMappingStore, SpyMessagePublisher, and comprehensive unit tests for service/subgroup/member handlers validating ACK/NAK behavior, publishing, and mapping operations.

Sequence Diagram

sequenceDiagram
    participant DDB as DynamoDB KV
    participant JS as JetStream
    participant Proc as EventProcessor
    participant Handler as EventHandler
    participant Service as ServiceHandler
    participant Map as MappingStore
    participant Pub as Publisher

    DDB->>JS: KV change event
    JS->>Proc: Deliver StreamMessage
    Proc->>Handler: Process(key, data/isRemoval)
    Handler->>Handler: Determine entity type (service/subgroup/member)
    alt IsRemoval
        Handler->>Service: HandleRemoval
    else Update/Create
        Handler->>Service: HandleChange
    end
    Service->>Map: Resolve dependency mappings (e.g., SFID→UID)
    Map-->>Service: Mapping present / tombstoned / missing
    alt Dependency missing
        Service->>Proc: Indicate transient (NAK)
    else Dependency available
        Service->>Service: Transform v1→v2, build messages
        Service->>Pub: Publish indexer/access
        Pub-->>Service: Success/Error
        Service->>Map: Put mapping / Put tombstone
        Service->>Proc: ACK
    end
    Proc->>JS: Ack or NAK with backoff
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly reflects the main change: introducing an event processor for indexing and access-control handlers as part of event-driven data stream processing.
Description check ✅ Passed The description comprehensively covers the changeset, detailing the durable NATS JetStream consumer, domain handlers, mapping store abstraction, configuration, and test evidence.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a v1→v2 eventing pipeline to the mailing-list service by consuming NATS JetStream KV change events, transforming v1 payloads into v2 domain models, and publishing downstream indexer/FGA-sync messages with idempotency tracking.

Changes:

  • Introduces a JetStream KV event processor + key-prefix router and wires it into cmd/mailing-list-api startup behind EVENTING_ENABLED.
  • Adds v1-mappings KV abstractions (domain ports + NATS implementation) and constants for mapping keys/buckets/tombstones.
  • Adds supporting utilities (pkg/mapconv, transient error classifier) and documentation (docs/event-processing.md, README updates, .env.example).

Reviewed changes

Copilot reviewed 22 out of 22 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
README.md Documents new eventing folders and links to the event-processing doc.
pkg/mapconv/field_extract.go Adds typed extraction helpers for map[string]any payloads.
pkg/mapconv/field_extract_test.go Unit tests for the new mapconv extraction helpers.
pkg/errors/transient.go Adds a simple transient error classifier used to decide retry/NAK behavior.
pkg/constants/storage.go Adds v1 eventing KV bucket names, mapping prefixes, and tombstone marker constants.
internal/service/datastream_subgroup_handler.go Implements v1 subgroup→v2 mailing list transform, publish, mapping writes, and reverse index.
internal/service/datastream_service_handler.go Implements v1 service→v2 service transform, publish, and mapping writes.
internal/service/datastream_member_handler.go Implements v1 member→v2 member transform, publish, and parent resolution via reverse index.
internal/infrastructure/nats/mapping_store.go JetStream KV-backed MappingReaderWriter implementation (idempotency + tombstones).
internal/infrastructure/nats/data_stream_consumer.go Dispatches JetStream messages to handlers and performs ACK/NAK with backoff.
internal/infrastructure/nats/client.go Reuses a single JetStream context; adds helpers for KV access and durable consumer creation.
internal/domain/port/mapping_store.go Defines mapping store reader/writer ports for v1-mappings access.
internal/domain/port/data_stream_processor.go Defines a processor interface for individual stream messages.
internal/domain/port/data_stream_event_handler.go Defines the handler interface for keyed change/removal events.
internal/domain/model/stream_message.go Introduces a NATS-agnostic stream message model (ack/nak callbacks + metadata).
docs/event-processing.md Adds end-to-end documentation for the KV eventing architecture, flow, and operations.
cmd/mailing-list-api/service/providers.go Adds a provider for the v1-mappings MappingReaderWriter backed by JetStream KV.
cmd/mailing-list-api/main.go Starts the data stream processor during service startup (when enabled).
cmd/mailing-list-api/eventing/handler.go Routes events by KV key prefix and handles soft-delete detection.
cmd/mailing-list-api/eventing/event_processor.go Implements the durable JetStream consumer lifecycle and message dispatch.
cmd/mailing-list-api/data_stream.go Startup wiring and env-based configuration for event processing.
.env.example Adds example eventing configuration (EVENTING_ENABLED).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (12)
.env.example (1)

39-41: Consider documenting additional eventing configuration options.

The documentation in docs/event-processing.md references several other eventing configuration variables (EVENTING_CONSUMER_NAME, EVENTING_MAX_DELIVER, EVENTING_ACK_WAIT_SECS, EVENTING_MAX_ACK_PENDING) that have defaults but can be overridden. Adding these as commented examples would improve discoverability.

📝 Suggested additions for discoverability
 # Eventing Configuration
 EVENTING_ENABLED=true
+# EVENTING_CONSUMER_NAME=mailing-list-service-kv-consumer
+# EVENTING_MAX_DELIVER=3
+# EVENTING_ACK_WAIT_SECS=30
+# EVENTING_MAX_ACK_PENDING=1000
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.env.example around lines 39 - 41, Add commented example entries for the
additional eventing environment variables referenced in docs to improve
discoverability: include commented lines for EVENTING_CONSUMER_NAME,
EVENTING_MAX_DELIVER, EVENTING_ACK_WAIT_SECS, and EVENTING_MAX_ACK_PENDING with
their default values and a short inline note; place them adjacent to
EVENTING_ENABLED in the .env.example so maintainers see the overrides and
defaults together.
cmd/mailing-list-api/service/providers.go (1)

503-511: Consider using singleton pattern for consistency.

Other provider functions in this file (e.g., natsInit, GroupsIOClient, GrpsIOWebhookValidator) use sync.Once to ensure single initialization. This function creates a new MappingReaderWriter on each call, which could be intentional if multiple instances are needed, but may be inconsistent with the overall provider pattern.

If this should be a singleton (likely, since it wraps the same KV bucket), consider adding a sync.Once guard:

♻️ Optional singleton pattern
+var (
+	mappingReaderWriterClient port.MappingReaderWriter
+	mappingReaderWriterOnce   sync.Once
+)
+
 // MappingReaderWriter initializes the v1-mappings KV abstraction used by the
 // data stream event handler for idempotency tracking.
 func MappingReaderWriter(ctx context.Context) port.MappingReaderWriter {
-	client := GetNATSClient(ctx)
-	kv, err := client.KeyValue(ctx, constants.KVBucketNameV1Mappings)
-	if err != nil {
-		log.Fatalf("failed to access %s KV bucket: %v", constants.KVBucketNameV1Mappings, err)
-	}
-	return nats.NewMappingReaderWriter(kv)
+	mappingReaderWriterOnce.Do(func() {
+		client := GetNATSClient(ctx)
+		kv, err := client.KeyValue(ctx, constants.KVBucketNameV1Mappings)
+		if err != nil {
+			log.Fatalf("failed to access %s KV bucket: %v", constants.KVBucketNameV1Mappings, err)
+		}
+		mappingReaderWriterClient = nats.NewMappingReaderWriter(kv)
+	})
+	return mappingReaderWriterClient
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/mailing-list-api/service/providers.go` around lines 503 - 511, The
MappingReaderWriter provider currently creates a new instance on each call
(MappingReaderWriter -> GetNATSClient -> nats.NewMappingReaderWriter), which is
inconsistent with other providers that use a singleton; change it to a
lazily-initialized singleton using a package-level variable (e.g., mappingRW)
and a sync.Once (e.g., mappingOnce) to ensure the KV bucket
(constants.KVBucketNameV1Mappings) is accessed and the
nats.NewMappingReaderWriter is constructed only once, returning the cached
mappingRW on subsequent calls while preserving the existing error handling with
client.KeyValue.
pkg/errors/transient.go (1)

11-19: Consider unwrapping errors for deeper inspection.

The current implementation only inspects the top-level error message. If errors are wrapped (e.g., via fmt.Errorf("operation failed: %w", err)), the transient indicator in the wrapped error may not surface in the outer message. Consider recursively unwrapping with errors.Unwrap or using errors.As if transient errors implement a marker interface.

♻️ Optional enhancement to handle wrapped errors
 func IsTransient(err error) bool {
 	if err == nil {
 		return false
 	}
-	s := strings.ToLower(err.Error())
-	return strings.Contains(s, "timeout") ||
-		strings.Contains(s, "connection") ||
-		strings.Contains(s, "unavailable") ||
-		strings.Contains(s, "deadline")
+	for err != nil {
+		s := strings.ToLower(err.Error())
+		if strings.Contains(s, "timeout") ||
+			strings.Contains(s, "connection") ||
+			strings.Contains(s, "unavailable") ||
+			strings.Contains(s, "deadline") {
+			return true
+		}
+		err = errors.Unwrap(err)
+	}
+	return false
 }

Add "errors" to imports if applying this change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/errors/transient.go` around lines 11 - 19, IsTransient currently only
checks the top-level err.Error() string; change it to recursively unwrap the
error (using errors.Unwrap in a loop or errors.As with a marker interface) and
inspect each underlying error message for the transient tokens
("timeout","connection","unavailable","deadline") so wrapped errors are
detected; update the IsTransient function to iterate through err and its
unwrapped causes (or use errors.As) and add "errors" to imports if needed.
internal/infrastructure/nats/mapping_store.go (1)

38-54: Logging on expected misses may be noisy.

IsMappingPresent and IsTombstoned both log warnings when a key is not found. If these methods are called speculatively (e.g., checking if a mapping exists before creating it), the logs could become noisy. Consider using debug-level logging or removing these warnings.

📝 Proposed fix to reduce log noise
 func (m *natsMappingReaderWriter) IsMappingPresent(ctx context.Context, key string) bool {
 	entry, err := m.kv.Get(ctx, key)
 	if err != nil || entry == nil {
-		slog.WarnContext(ctx, "mapping key not found in v1-mappings", "mapping_key", key)
+		slog.DebugContext(ctx, "mapping key not found in v1-mappings", "mapping_key", key)
 		return false
 	}
 	return string(entry.Value()) != constants.KVTombstoneMarker
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/infrastructure/nats/mapping_store.go` around lines 38 - 54, The two
methods IsMappingPresent and IsTombstoned currently emit slog.WarnContext when
m.kv.Get returns no entry, which can produce noisy logs on expected misses;
change those slog.WarnContext calls to a lower-verbosity logging call (e.g.,
slog.DebugContext) or remove them entirely so that routine existence checks do
not flood logs, keeping the rest of the logic (m.kv.Get, nil check, and
tombstone comparison) unchanged.
cmd/mailing-list-api/data_stream.go (2)

98-107: Silent fallback on parse error may hide misconfiguration.

When strconv.Atoi fails, the function silently returns the default value. Consider logging a warning so operators are aware of invalid configuration values.

📝 Proposed fix to add logging
 func envInt(key string, defaultVal int) int {
 	s := os.Getenv(key)
 	if s == "" {
 		return defaultVal
 	}
 	n, err := strconv.Atoi(s)
 	if err != nil {
+		slog.Warn("invalid integer for environment variable, using default",
+			"key", key, "value", s, "default", defaultVal)
 		return defaultVal
 	}
 	return n
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/mailing-list-api/data_stream.go` around lines 98 - 107, The envInt
function currently swallows strconv.Atoi parse errors and returns the default
without any notice; update envInt to log a warning when strconv.Atoi fails
(include the env key, the raw value s, and the parse error) before returning the
default so operators see misconfigurations; keep the existing return behavior on
error and use the project's logger (or log.Printf if no logger exists) to emit
the warning, referencing envInt and the strconv.Atoi failure point.

48-65: Consider consolidating shutdown into a single goroutine or using errgroup.

The current implementation spawns two goroutines: one for running the processor and one solely to wait for context cancellation. This works but adds complexity.

♻️ Alternative using select in the processor goroutine
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		if err := processor.Start(ctx, streamConsumer); err != nil {
-			slog.ErrorContext(ctx, "data stream processor exited with error", "error", err)
-		}
-	}()
-
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		<-ctx.Done()
-		stopCtx, cancel := context.WithTimeout(context.Background(), gracefulShutdownSeconds*time.Second)
-		defer cancel()
-		if err := processor.Stop(stopCtx); err != nil {
-			slog.ErrorContext(stopCtx, "error stopping data stream processor", "error", err)
+		errCh := make(chan error, 1)
+		go func() {
+			errCh <- processor.Start(ctx, streamConsumer)
+		}()
+		select {
+		case err := <-errCh:
+			if err != nil {
+				slog.ErrorContext(ctx, "data stream processor exited with error", "error", err)
+			}
+		case <-ctx.Done():
+			stopCtx, cancel := context.WithTimeout(context.Background(), gracefulShutdownSeconds*time.Second)
+			defer cancel()
+			if err := processor.Stop(stopCtx); err != nil {
+				slog.ErrorContext(stopCtx, "error stopping data stream processor", "error", err)
+			}
 		}
 	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/mailing-list-api/data_stream.go` around lines 48 - 65, Consolidate the
two goroutines into one to simplify shutdown: run processor.Start(ctx,
streamConsumer) in a single goroutine that also listens for ctx.Done() (or use
an errgroup to run Start and handle cancellation) and when cancellation occurs
create stopCtx with context.WithTimeout and call processor.Stop(stopCtx); ensure
you still call wg.Add(1)/wg.Done around this single goroutine and log errors
from both Start (using processor.Start) and Stop (using processor.Stop) as
before.
internal/service/datastream_service_handler.go (2)

61-64: Mapping write failure is logged but doesn't prevent ACK.

If PutMapping fails, the message is still ACKed (returns false). On retry, the message would be processed again but ResolveAction might return ActionUpdated instead of ActionCreated if a partial write occurred, or the mapping state could be inconsistent. Consider returning true (NAK) for transient mapping errors.

♻️ Proposed fix to NAK on transient mapping errors
 	if err := mappings.PutMapping(ctx, mKey, uid); err != nil {
 		slog.ErrorContext(ctx, "failed to put mapping key", "mapping_key", mKey, "error", err)
+		return pkgerrors.IsTransient(err)
 	}
 	return false
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_service_handler.go` around lines 61 - 64, The
current code logs errors from mappings.PutMapping(ctx, mKey, uid) but still
returns false (ACK); change the control flow so that when PutMapping returns a
non-nil error you log the error and return true (NAK) to trigger a retry,
preventing ACK of partially persisted state that could cause ResolveAction to
see ActionUpdated instead of ActionCreated; locate the mappings.PutMapping call
and adjust the return path to return true on error (or propagate
transient-vs-permanent classification if available) so the message is retried.

100-120: Time parse errors are silently ignored.

If last_modified_at contains an invalid timestamp, the error is silently swallowed and UpdatedAt remains the zero value. Consider logging parse failures at debug level to aid troubleshooting data quality issues.

📝 Proposed fix to log parse errors
 	if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" {
-		if t, err := time.Parse(time.RFC3339, ts); err == nil {
+		t, err := time.Parse(time.RFC3339, ts)
+		if err != nil {
+			slog.Debug("failed to parse last_modified_at timestamp", "uid", uid, "value", ts, "error", err)
+		} else {
 			svc.UpdatedAt = t
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_service_handler.go` around lines 100 - 120, In
transformV1ToGrpsIOService, the time.Parse error for last_modified_at is
currently ignored; update the block that reads ts := mapconv.StringVal(data,
"last_modified_at") so that when time.Parse(time.RFC3339, ts) returns an error
you log the failure at debug (including uid, ts and the error) before leaving
UpdatedAt unchanged; use the existing logger or package logger used elsewhere in
this file (or add one if missing) and keep the successful parse path assigning
svc.UpdatedAt unchanged.
cmd/mailing-list-api/eventing/handler.go (1)

44-73: Consider validating extracted UID is non-empty.

After stripping the prefix, if the key exactly matches the prefix (e.g., itx-groupsio-v2-service.), the extracted uid would be empty. This edge case could cause issues in downstream handlers.

🛡️ Proposed defensive check
 	case strings.HasPrefix(key, kvPrefixService):
 		uid := key[len(kvPrefixService):]
+		if uid == "" {
+			slog.WarnContext(ctx, "empty UID after prefix strip, ACKing", "key", key)
+			return false
+		}
 		if isSoftDelete {
 			return service.HandleDataStreamServiceDelete(ctx, uid, h.publisher, h.mappings)
 		}

Apply similar checks for subgroup and member cases.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/mailing-list-api/eventing/handler.go` around lines 44 - 73, After
extracting uid in eventHandler.HandleChange for kvPrefixService,
kvPrefixSubgroup and kvPrefixMember, add a defensive check that uid is
non-empty; if empty, call slog.WarnContext with context and key (and a short
message like "empty UID after prefix strip") and return false to ACK instead of
calling service.HandleDataStream...; apply the same check for the service,
subgroup, and member branches to prevent passing an empty uid into
service.HandleDataStreamServiceUpdate/Delete,
service.HandleDataStreamSubgroupUpdate/Delete, and
service.HandleDataStreamMemberUpdate/Delete.
internal/service/datastream_subgroup_handler.go (1)

160-164: Consider populating CreatedAt if available.

Only UpdatedAt is parsed from last_modified_at. If the v1 payload contains a created_at field (similar to the member handler at lines 115-119 of datastream_member_handler.go), it should be mapped for consistency.

♻️ Optional: Add CreatedAt parsing
+	if ts := mapconv.StringVal(data, "created_at"); ts != "" {
+		if t, err := time.Parse(time.RFC3339, ts); err == nil {
+			list.CreatedAt = t
+		}
+	}
 	if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" {
 		if t, err := time.Parse(time.RFC3339, ts); err == nil {
 			list.UpdatedAt = t
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_subgroup_handler.go` around lines 160 - 164, The
code only parses last_modified_at into list.UpdatedAt; add analogous parsing for
created_at so list.CreatedAt is populated when present. Locate the block using
mapconv.StringVal(data, "last_modified_at") and time.Parse(time.RFC3339, ts) and
replicate the logic for mapconv.StringVal(data, "created_at"), parsing with
time.Parse(time.RFC3339, ...) and assigning to list.CreatedAt (mirroring the
approach used in datastream_member_handler.go for CreatedAt).
cmd/mailing-list-api/eventing/event_processor.go (1)

48-54: Consider validating required config fields.

The constructor doesn't validate that required fields like ConsumerName and StreamName are non-empty. This would fail later in Start() but with a less clear error message.

♻️ Optional validation
 func NewEventProcessor(_ context.Context, cfg Config, natsClient *infraNATS.NATSClient) (EventProcessor, error) {
+	if cfg.ConsumerName == "" || cfg.StreamName == "" {
+		return nil, fmt.Errorf("ConsumerName and StreamName are required")
+	}
 	return &natsEventProcessor{
 		natsClient: natsClient,
 		config:     cfg,
 	}, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/mailing-list-api/eventing/event_processor.go` around lines 48 - 54,
NewEventProcessor currently returns a natsEventProcessor without validating
required Config fields, so missing ConsumerName or StreamName causes unclear
failures later in Start; update NewEventProcessor to check cfg.ConsumerName and
cfg.StreamName (and any other required cfg fields) for non-empty values and
return a descriptive error if any are missing, ensuring the validation occurs
before constructing and returning the natsEventProcessor instance.
internal/service/datastream_member_handler.go (1)

129-137: Consider trimming whitespace for robustness.

The function handles common cases well. For extra robustness against leading/trailing whitespace in the input, consider trimming:

♻️ Optional improvement
 func splitFullName(fullName string) (string, string) {
+	fullName = strings.TrimSpace(fullName)
 	idx := strings.Index(fullName, " ")
 	if idx == -1 {
 		return fullName, ""
 	}
-	return fullName[:idx], fullName[idx+1:]
+	return fullName[:idx], strings.TrimSpace(fullName[idx+1:])
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_member_handler.go` around lines 129 - 137, The
splitFullName function should trim leading/trailing whitespace before splitting
to avoid empty first/last names from stray spaces; in splitFullName, call
strings.TrimSpace(fullName) at the start, handle the case of an empty string by
returning ("", ""), then proceed with the existing splitting logic (using
strings.Index) on the trimmed value so first and last name results are robust to
extra whitespace.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/mailing-list-api/data_stream.go`:
- Around line 21-30: The docstring for handleDataStream incorrectly references
MAILING_LIST_EVENTING_ENABLED while the runtime check uses EVENTING_ENABLED;
update the docstring to match the actual environment variable used by
dataStreamEnabled() (EVENTING_ENABLED) or change dataStreamEnabled()/its check
to use MAILING_LIST_EVENTING_ENABLED so they are consistent; locate
handleDataStream and the dataStreamEnabled function and make the docstring and
code refer to the same single env var name.

In `@internal/infrastructure/nats/mapping_store.go`:
- Around line 27-36: ResolveAction currently treats any kv.Get error as "key not
found" and silently returns model.ActionCreated; change it to mirror
IsMappingPresent/IsTombstoned by checking errors.Is(err,
jetstream.ErrKeyNotFound) and only treat that (or nil entry) as ActionCreated,
while for any other non-nil err log a warning (including the key and err) using
the same logger used in IsMappingPresent/IsTombstoned and then return a sensible
default; ensure you reference m.kv.Get, constants.KVTombstoneMarker,
model.ActionCreated/Updated, and add errors.Is/jetstream.ErrKeyNotFound checks
(and imports) as needed so transient failures are not silently treated as
missing keys.

In `@internal/service/datastream_service_handler.go`:
- Around line 21-31: HandleDataStreamServiceUpdate currently mutates the
incoming data map by setting data["project_id"]=projectUID which can produce
unexpected caller-visible side effects; instead create a shallow copy of the map
(iterate keys from the incoming data into a new map, set
newMap["project_id"]=projectUID) and pass that copy into downstream calls (e.g.,
transformV1ToGrpsIOService) or, preferably, change the downstream call to accept
projectUID directly and avoid injecting it into the map; update usages of data
after projectUID resolution to use the new copy (or new parameter) and leave the
original data unmodified.

In `@pkg/mapconv/field_extract.go`:
- Around line 38-57: Int64Ptr and IntVal currently use fmt.Sscanf and permissive
float64 casting which accepts things like "123abc" and truncates floats; change
them to strict integer parsing: for string inputs use strconv.ParseInt (base 10,
64-bit) and return nil on any parse error, and for float64 inputs only accept
values that are exact integers (e.g., compare float64(int64(v)) == v) otherwise
return nil; ensure any other types return nil and preserve the same return
shapes for Int64Ptr and IntVal so malformed or non-integral inputs fail rather
than being coerced.

---

Nitpick comments:
In @.env.example:
- Around line 39-41: Add commented example entries for the additional eventing
environment variables referenced in docs to improve discoverability: include
commented lines for EVENTING_CONSUMER_NAME, EVENTING_MAX_DELIVER,
EVENTING_ACK_WAIT_SECS, and EVENTING_MAX_ACK_PENDING with their default values
and a short inline note; place them adjacent to EVENTING_ENABLED in the
.env.example so maintainers see the overrides and defaults together.

In `@cmd/mailing-list-api/data_stream.go`:
- Around line 98-107: The envInt function currently swallows strconv.Atoi parse
errors and returns the default without any notice; update envInt to log a
warning when strconv.Atoi fails (include the env key, the raw value s, and the
parse error) before returning the default so operators see misconfigurations;
keep the existing return behavior on error and use the project's logger (or
log.Printf if no logger exists) to emit the warning, referencing envInt and the
strconv.Atoi failure point.
- Around line 48-65: Consolidate the two goroutines into one to simplify
shutdown: run processor.Start(ctx, streamConsumer) in a single goroutine that
also listens for ctx.Done() (or use an errgroup to run Start and handle
cancellation) and when cancellation occurs create stopCtx with
context.WithTimeout and call processor.Stop(stopCtx); ensure you still call
wg.Add(1)/wg.Done around this single goroutine and log errors from both Start
(using processor.Start) and Stop (using processor.Stop) as before.

In `@cmd/mailing-list-api/eventing/event_processor.go`:
- Around line 48-54: NewEventProcessor currently returns a natsEventProcessor
without validating required Config fields, so missing ConsumerName or StreamName
causes unclear failures later in Start; update NewEventProcessor to check
cfg.ConsumerName and cfg.StreamName (and any other required cfg fields) for
non-empty values and return a descriptive error if any are missing, ensuring the
validation occurs before constructing and returning the natsEventProcessor
instance.

In `@cmd/mailing-list-api/eventing/handler.go`:
- Around line 44-73: After extracting uid in eventHandler.HandleChange for
kvPrefixService, kvPrefixSubgroup and kvPrefixMember, add a defensive check that
uid is non-empty; if empty, call slog.WarnContext with context and key (and a
short message like "empty UID after prefix strip") and return false to ACK
instead of calling service.HandleDataStream...; apply the same check for the
service, subgroup, and member branches to prevent passing an empty uid into
service.HandleDataStreamServiceUpdate/Delete,
service.HandleDataStreamSubgroupUpdate/Delete, and
service.HandleDataStreamMemberUpdate/Delete.

In `@cmd/mailing-list-api/service/providers.go`:
- Around line 503-511: The MappingReaderWriter provider currently creates a new
instance on each call (MappingReaderWriter -> GetNATSClient ->
nats.NewMappingReaderWriter), which is inconsistent with other providers that
use a singleton; change it to a lazily-initialized singleton using a
package-level variable (e.g., mappingRW) and a sync.Once (e.g., mappingOnce) to
ensure the KV bucket (constants.KVBucketNameV1Mappings) is accessed and the
nats.NewMappingReaderWriter is constructed only once, returning the cached
mappingRW on subsequent calls while preserving the existing error handling with
client.KeyValue.

In `@internal/infrastructure/nats/mapping_store.go`:
- Around line 38-54: The two methods IsMappingPresent and IsTombstoned currently
emit slog.WarnContext when m.kv.Get returns no entry, which can produce noisy
logs on expected misses; change those slog.WarnContext calls to a
lower-verbosity logging call (e.g., slog.DebugContext) or remove them entirely
so that routine existence checks do not flood logs, keeping the rest of the
logic (m.kv.Get, nil check, and tombstone comparison) unchanged.

In `@internal/service/datastream_member_handler.go`:
- Around line 129-137: The splitFullName function should trim leading/trailing
whitespace before splitting to avoid empty first/last names from stray spaces;
in splitFullName, call strings.TrimSpace(fullName) at the start, handle the case
of an empty string by returning ("", ""), then proceed with the existing
splitting logic (using strings.Index) on the trimmed value so first and last
name results are robust to extra whitespace.

In `@internal/service/datastream_service_handler.go`:
- Around line 61-64: The current code logs errors from mappings.PutMapping(ctx,
mKey, uid) but still returns false (ACK); change the control flow so that when
PutMapping returns a non-nil error you log the error and return true (NAK) to
trigger a retry, preventing ACK of partially persisted state that could cause
ResolveAction to see ActionUpdated instead of ActionCreated; locate the
mappings.PutMapping call and adjust the return path to return true on error (or
propagate transient-vs-permanent classification if available) so the message is
retried.
- Around line 100-120: In transformV1ToGrpsIOService, the time.Parse error for
last_modified_at is currently ignored; update the block that reads ts :=
mapconv.StringVal(data, "last_modified_at") so that when
time.Parse(time.RFC3339, ts) returns an error you log the failure at debug
(including uid, ts and the error) before leaving UpdatedAt unchanged; use the
existing logger or package logger used elsewhere in this file (or add one if
missing) and keep the successful parse path assigning svc.UpdatedAt unchanged.

In `@internal/service/datastream_subgroup_handler.go`:
- Around line 160-164: The code only parses last_modified_at into
list.UpdatedAt; add analogous parsing for created_at so list.CreatedAt is
populated when present. Locate the block using mapconv.StringVal(data,
"last_modified_at") and time.Parse(time.RFC3339, ts) and replicate the logic for
mapconv.StringVal(data, "created_at"), parsing with time.Parse(time.RFC3339,
...) and assigning to list.CreatedAt (mirroring the approach used in
datastream_member_handler.go for CreatedAt).

In `@pkg/errors/transient.go`:
- Around line 11-19: IsTransient currently only checks the top-level err.Error()
string; change it to recursively unwrap the error (using errors.Unwrap in a loop
or errors.As with a marker interface) and inspect each underlying error message
for the transient tokens ("timeout","connection","unavailable","deadline") so
wrapped errors are detected; update the IsTransient function to iterate through
err and its unwrapped causes (or use errors.As) and add "errors" to imports if
needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5bc27278-0b08-43a1-b9fc-92fb9a7944c3

📥 Commits

Reviewing files that changed from the base of the PR and between f0591d2 and 4b8df54.

📒 Files selected for processing (22)
  • .env.example
  • README.md
  • cmd/mailing-list-api/data_stream.go
  • cmd/mailing-list-api/eventing/event_processor.go
  • cmd/mailing-list-api/eventing/handler.go
  • cmd/mailing-list-api/main.go
  • cmd/mailing-list-api/service/providers.go
  • docs/event-processing.md
  • internal/domain/model/stream_message.go
  • internal/domain/port/data_stream_event_handler.go
  • internal/domain/port/data_stream_processor.go
  • internal/domain/port/mapping_store.go
  • internal/infrastructure/nats/client.go
  • internal/infrastructure/nats/data_stream_consumer.go
  • internal/infrastructure/nats/mapping_store.go
  • internal/service/datastream_member_handler.go
  • internal/service/datastream_service_handler.go
  • internal/service/datastream_subgroup_handler.go
  • pkg/constants/storage.go
  • pkg/errors/transient.go
  • pkg/mapconv/field_extract.go
  • pkg/mapconv/field_extract_test.go

… service and subgroup updates

Jira Ticket: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

Assisted by [Claude Code](https://claude.ai/code)

Signed-off-by: Mauricio Zanetti Salomao <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
pkg/mapconv/field_extract.go (1)

119-126: Consider logging or returning an indicator when non-string items are silently skipped.

When the input array contains mixed types (e.g., [1, "valid", true]), non-string items are silently dropped. For an event pipeline processing identifiers, this could mask upstream data corruption.

If strict validation isn't desired, consider at minimum documenting this behavior in the function comment.

📝 Option: Document the lenient behavior
 // StringSliceVal extracts a []string from data[key].
 // Accepts a JSON array of strings or a bare string (returned as a one-element slice).
+// Non-string items in a JSON array are silently skipped.
 // Returns nil if the key is absent or the value is an empty string.
 func StringSliceVal(data map[string]any, key string) []string {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/mapconv/field_extract.go` around lines 119 - 126, The slice conversion in
the case []any block silently drops non-string elements (iterating variable t
and appending to out); update this code to detect and count non-string items and
either log a warning with context (e.g., "skipped N non-string items while
converting []any to []string") or return an indicator/error alongside the
[]string so upstream can detect partial conversions; if changing the function
signature is undesirable, at minimum add a clear comment above the switch
describing the lenient behavior and that non-string items are ignored.
internal/service/datastream_subgroup_handler.go (1)

36-47: Input map mutations for resolved IDs.

Similar to the service handler, this function mutates the input data map:

  • Line 36: data["project_id"] = projectUID
  • Line 47: data["committee"] = committeeUID

This pattern is consistent with the service handler. If this is intentional (to pass resolved UIDs to transformV1ToGrpsIOMailingList), consider documenting the mutation behavior or passing resolved UIDs as separate parameters to the transform function.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_subgroup_handler.go` around lines 36 - 47, The
handler currently mutates the input data map by assigning resolved IDs
(data["project_id"]=projectUID and data["committee"]=committeeUID) before
calling transformV1ToGrpsIOMailingList; either avoid mutating the caller's map
by creating a shallow copy of data and setting project_id/committee on that copy
before passing it to transformV1ToGrpsIOMailingList, or change
transformV1ToGrpsIOMailingList to accept the resolved IDs as explicit parameters
(e.g., pass projectUID and committeeUID) and leave the original data untouched;
update comments to document the chosen behavior and adjust call sites
accordingly (look for transformV1ToGrpsIOMailingList and usages of data in this
handler).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/service/datastream_subgroup_handler.go`:
- Around line 36-47: The handler currently mutates the input data map by
assigning resolved IDs (data["project_id"]=projectUID and
data["committee"]=committeeUID) before calling transformV1ToGrpsIOMailingList;
either avoid mutating the caller's map by creating a shallow copy of data and
setting project_id/committee on that copy before passing it to
transformV1ToGrpsIOMailingList, or change transformV1ToGrpsIOMailingList to
accept the resolved IDs as explicit parameters (e.g., pass projectUID and
committeeUID) and leave the original data untouched; update comments to document
the chosen behavior and adjust call sites accordingly (look for
transformV1ToGrpsIOMailingList and usages of data in this handler).

In `@pkg/mapconv/field_extract.go`:
- Around line 119-126: The slice conversion in the case []any block silently
drops non-string elements (iterating variable t and appending to out); update
this code to detect and count non-string items and either log a warning with
context (e.g., "skipped N non-string items while converting []any to []string")
or return an indicator/error alongside the []string so upstream can detect
partial conversions; if changing the function signature is undesirable, at
minimum add a clear comment above the switch describing the lenient behavior and
that non-string items are ignored.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6b9b2bce-4088-4ea8-966d-1fa35cf52935

📥 Commits

Reviewing files that changed from the base of the PR and between 4b8df54 and 2b96419.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (11)
  • cmd/mailing-list-api/data_stream.go
  • go.mod
  • internal/infrastructure/mock/mapping_store.go
  • internal/infrastructure/mock/message_publisher.go
  • internal/infrastructure/nats/data_stream_consumer.go
  • internal/service/datastream_service_handler.go
  • internal/service/datastream_service_handler_test.go
  • internal/service/datastream_subgroup_handler.go
  • internal/service/datastream_subgroup_handler_test.go
  • pkg/mapconv/field_extract.go
  • pkg/mapconv/field_extract_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/mapconv/field_extract_test.go
  • internal/infrastructure/nats/data_stream_consumer.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
internal/service/datastream_member_handler_test.go (1)

49-127: Add failure-path tests for mapping persistence errors.

Please add cases where mapping store writes fail (PutMapping in update, PutTombstone in delete) and assert NAK/retry behavior. These are critical consistency paths and currently untested.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_member_handler_test.go` around lines 49 - 127,
Add tests that simulate mapping store write failures for
HandleDataStreamMemberUpdate and HandleDataStreamMemberDelete by configuring the
mock.NewFakeMappingStore to return an error from PutMapping (for update) and
PutTombstone (for delete), then call HandleDataStreamMemberUpdate and
HandleDataStreamMemberDelete with a mock.SpyMessagePublisher and assert the
handler returns nak==true, that no indexer/access messages were published
(pub.IndexerCalls and pub.AccessCalls remain empty), and that the mapping store
did not record a successful mapping/tombstone (e.g., ResolveAction still shows
created for updates or IsTombstoned is false for deletes); reference the
functions HandleDataStreamMemberUpdate, HandleDataStreamMemberDelete and mock
methods PutMapping/PutTombstone and the spy publisher to locate where to inject
the simulated errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/datastream_member_handler.go`:
- Around line 102-105: The tombstone write currently logs errors from
mappings.PutTombstone and returns false (ACKing) which allows stale updates to
be reprocessed; change this in the delete flow so PutTombstone is retried (with
a limited retry count and small backoff) until success or context cancellation,
and only proceed/ACK after a successful tombstone write (or return an error when
retries exhausted). Locate the call to mappings.PutTombstone (using mKey and
ctx), implement a retry loop that respects ctx.Done(), uses a maxAttempts and
incremental sleep between attempts, logs each retry via slog.ErrorContext
including mKey and attempt number, and return/propagate an error instead of
returning false when retries fail so the message is not ACKed.
- Around line 66-69: The mapping write failure path currently just logs and
returns false which causes the message to be ACKed; change it so that when
mappings.PutMapping(ctx, mKey, uid) returns an error you propagate that error
(or otherwise signal a non-ACK) instead of returning false. Specifically, in the
handler that calls mappings.PutMapping (refer to mappings.PutMapping, mKey and
uid in datastream_member_handler.go), replace the log-then-return-false block
with code that logs the error and returns the error (or returns a non-ACK
indicator) so the message is not acknowledged when PutMapping fails.

In `@internal/service/datastream_subgroup_handler.go`:
- Around line 54-59: The current check NAKs indefinitely when list.ServiceUID is
empty; update the logic in datastream_subgroup_handler where serviceKey is built
(using list.ServiceUID) and where similar parent lookup occurs (the block
referencing ServiceUID/uid and mappings.IsMappingPresent) to treat a missing
parent_id (empty list.ServiceUID) as a malformed message: log a warning/error
indicating missing parent_id and return false (ACK/drop) instead of returning
true (NAK). Ensure both occurrences check for list.ServiceUID == "" before
calling mappings.IsMappingPresent and handle accordingly so the subgroup is not
retried forever.
- Around line 97-99: The current code calls publisher.Access(...) and logs
failures but still ACKs the incoming message, which can hide publish failures;
update both occurrences of the publisher.Access call (the call with
constants.UpdateAccessGroupsIOMailingListSubject) to NOT ACK on error — instead
return the error (or propagate it up) so the message consumer will not
ACK/complete and can be retried; locate the publisher.Access(...) calls and the
subsequent msg.Ack()/ack logic in the handler function (the block around the
publisher.Access call at lines shown) and change the control flow to return the
publish error (or requeue) rather than acknowledging the message.
- Around line 101-110: The current mappings.PutMapping calls (e.g., the one
using mKey and the gidKey built from constants.KVMappingPrefixSubgroupByGroupID
and *list.GroupID) only log failures via slog.ErrorContext and allow the handler
to ACK; instead, propagate/return the error so the message is NAKed/retried.
Update each mapping/tombstone write site (all mappings.PutMapping and the
corresponding tombstone write calls in this handler) to return a non-nil error
(or wrap and return the err) when the write fails rather than just logging,
ensuring the caller/handler observes the failure and triggers retry/NAK
handling.

---

Nitpick comments:
In `@internal/service/datastream_member_handler_test.go`:
- Around line 49-127: Add tests that simulate mapping store write failures for
HandleDataStreamMemberUpdate and HandleDataStreamMemberDelete by configuring the
mock.NewFakeMappingStore to return an error from PutMapping (for update) and
PutTombstone (for delete), then call HandleDataStreamMemberUpdate and
HandleDataStreamMemberDelete with a mock.SpyMessagePublisher and assert the
handler returns nak==true, that no indexer/access messages were published
(pub.IndexerCalls and pub.AccessCalls remain empty), and that the mapping store
did not record a successful mapping/tombstone (e.g., ResolveAction still shows
created for updates or IsTombstoned is false for deletes); reference the
functions HandleDataStreamMemberUpdate, HandleDataStreamMemberDelete and mock
methods PutMapping/PutTombstone and the spy publisher to locate where to inject
the simulated errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 25a034b3-7fe5-45b3-ac01-5e21171c6fec

📥 Commits

Reviewing files that changed from the base of the PR and between 2b96419 and 5368c99.

📒 Files selected for processing (4)
  • charts/lfx-v2-mailing-list-service/values.yaml
  • internal/service/datastream_member_handler.go
  • internal/service/datastream_member_handler_test.go
  • internal/service/datastream_subgroup_handler.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/service/datastream_subgroup_handler_test.go (1)

123-149: Add coverage for delete when mapping is absent.

HandleDataStreamSubgroupDelete has a distinct branch for “never indexed” (!IsMappingPresent) that writes tombstone and skips publish. Adding a test for that branch would close an important behavior gap.

🧪 Suggested test case
+func TestHandleDataStreamSubgroupDelete_MappingAbsent_ACKAndTombstonesWithoutPublish(t *testing.T) {
+	m := mock.NewFakeMappingStore()
+	pub := &mock.SpyMessagePublisher{}
+
+	nak := HandleDataStreamSubgroupDelete(context.Background(), "sg-1", pub, m)
+
+	assert.False(t, nak)
+	assert.Empty(t, pub.IndexerCalls)
+	assert.Empty(t, pub.AccessCalls)
+	assert.True(t, m.IsTombstoned(context.Background(),
+		fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup)))
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_subgroup_handler_test.go` around lines 123 - 149,
Add a unit test for the branch in HandleDataStreamSubgroupDelete where the
mapping is absent (never indexed): create a mock.NewFakeMappingStore without Set
or existing mapping, call HandleDataStreamSubgroupDelete(ctx, "sg-1", pub, m),
assert the returned nak is false, assert pub.IndexerCalls and pub.AccessCalls
are empty (no publish), and assert the mapping store has a tombstone for
fmt.Sprintf("%s.sg-1", constants.KVMappingPrefixSubgroup) via m.IsTombstoned to
verify the tombstone was written.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/datastream_subgroup_handler_test.go`:
- Around line 114-120: The test currently ignores the return from
HandleDataStreamSubgroupUpdate which can mask NAKs; change the call to capture
the returned error (e.g., err := HandleDataStreamSubgroupUpdate(...)) and add an
assertion that it indicates an ACK (assert.NoError(t, err) or assert.Nil(t,
err)) before checking the reverse-index key; reference the existing call to
HandleDataStreamSubgroupUpdate, the mock.SpyMessagePublisher{}, and mapping m so
the new assertion is placed immediately after that call.

---

Nitpick comments:
In `@internal/service/datastream_subgroup_handler_test.go`:
- Around line 123-149: Add a unit test for the branch in
HandleDataStreamSubgroupDelete where the mapping is absent (never indexed):
create a mock.NewFakeMappingStore without Set or existing mapping, call
HandleDataStreamSubgroupDelete(ctx, "sg-1", pub, m), assert the returned nak is
false, assert pub.IndexerCalls and pub.AccessCalls are empty (no publish), and
assert the mapping store has a tombstone for fmt.Sprintf("%s.sg-1",
constants.KVMappingPrefixSubgroup) via m.IsTombstoned to verify the tombstone
was written.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 488237b2-6c5c-4aed-842b-217a353527a9

📥 Commits

Reviewing files that changed from the base of the PR and between 5368c99 and b182d8e.

📒 Files selected for processing (1)
  • internal/service/datastream_subgroup_handler_test.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/service/datastream_subgroup_handler.go (1)

183-187: Consider logging timestamp parse failures for observability.

When last_modified_at parsing fails, the error is silently ignored and UpdatedAt remains at its zero value. While this resilient approach is reasonable for event processing, adding a debug or warning log would help identify data quality issues in v1 payloads.

🔍 Proposed enhancement for observability
 	if ts := mapconv.StringVal(data, "last_modified_at"); ts != "" {
-		if t, err := time.Parse(time.RFC3339, ts); err == nil {
+		t, err := time.Parse(time.RFC3339, ts)
+		if err != nil {
+			slog.DebugContext(context.Background(), "failed to parse last_modified_at", "uid", uid, "value", ts, "error", err)
+		} else {
 			list.UpdatedAt = t
 		}
 	}

Note: You'd need to pass ctx into the function or use context.Background() for the log call.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/datastream_subgroup_handler.go` around lines 183 - 187, The
time.Parse failure for mapconv.StringVal(data, "last_modified_at") is currently
ignored; modify the block so that when time.Parse returns an error you emit a
debug/warn log including the raw timestamp and the parse error (but still keep
the current fallback behavior), e.g. detect err from time.Parse and call the
existing request/logger (or create one from ctx if the surrounding function has
a context parameter) or use context.Background() + a logger to log the timestamp
and err; keep assigning list.UpdatedAt only on successful parse. Use the same
symbols shown (mapconv.StringVal, time.Parse, list.UpdatedAt) to locate and
change the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/service/datastream_subgroup_handler.go`:
- Around line 183-187: The time.Parse failure for mapconv.StringVal(data,
"last_modified_at") is currently ignored; modify the block so that when
time.Parse returns an error you emit a debug/warn log including the raw
timestamp and the parse error (but still keep the current fallback behavior),
e.g. detect err from time.Parse and call the existing request/logger (or create
one from ctx if the surrounding function has a context parameter) or use
context.Background() + a logger to log the timestamp and err; keep assigning
list.UpdatedAt only on successful parse. Use the same symbols shown
(mapconv.StringVal, time.Parse, list.UpdatedAt) to locate and change the code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 00db58c5-ae59-4f45-a9ba-a1026f3e4f88

📥 Commits

Reviewing files that changed from the base of the PR and between b182d8e and 1389aa4.

📒 Files selected for processing (3)
  • README.md
  • charts/lfx-v2-mailing-list-service/values.yaml
  • internal/service/datastream_subgroup_handler.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • charts/lfx-v2-mailing-list-service/values.yaml

…ds and settings for improved event processing

Jira Ticket: https://linuxfoundation.atlassian.net/browse/LFXV2-1223

Assisted by [Claude Code](https://claude.ai/code)

Signed-off-by: Mauricio Zanetti Salomao <[email protected]>
Copy link
Contributor

@andrest50 andrest50 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, there's one important thing I commented about. I like the code structure, we should refactor the meeting, voting, surveys services to have the data streaming align with this in the near future. I can help out with that as there is time.

// indexer message. Returns true to NAK when the parent subgroup mapping is absent
// (ordering guarantee) or on transient errors.
//
// No FGA access message is published — member access is inherited from the parent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a member relation on the mailing list fga object type https://github.com/linuxfoundation/lfx-v2-helm/blob/main/charts/lfx-platform/templates/openfga/model.yaml#L71, so I think we need to use the member_put message subject to add that relationship of the member on the mailing list - and conversely, member_remove to remove that relationship.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants