[LFXV2-1187] Add post-indexing NATS domain events#44
Conversation
After every successful OpenSearch write, publish a NATS event to
lfx.{object_type}.{action} (e.g. lfx.project.created) so downstream
consumers can react to indexing completions without polling.
- Add IndexingEvent contract with document_id, object_id, object_type,
action, body, and timestamp fields
- Add BuildEventSubject helper and EventSubjectPrefix constant
- Publish event from IndexerService.ProcessTransaction after successful
storage write; failures are non-blocking and logged
- Add tests covering publish on success, non-blocking publish failure,
no event on index failure, and V1 action canonicalization
- Update CLAUDE.md to document outbound events and payload shape
Generated with [Claude Code](https://claude.ai/code)
Signed-off-by: Andres Tobon <andrest2455@gmail.com>
WalkthroughThis pull request introduces outbound domain events published to NATS after successful OpenSearch writes. Events use subject pattern lfx.{object_type}.{action} and include document ID, object ID, object type, action, timestamp, and transaction body. Publishing is non-blocking and does not alter indexing results. Changes
Sequence Diagram(s)sequenceDiagram
participant MP as MessageProcessor
participant IS as IndexerService
participant ES as OpenSearch
participant MQ as MessagingRepo (NATS)
MP->>IS: ProcessTransaction(tx)
IS->>ES: Index document (Write)
ES-->>IS: Index success
IS->>MQ: publishIndexingEvent(event) %% rgba(50,150,250,0.5)
MQ-->>IS: Ack / or Error (logged, non-blocking)
IS-->>MP: Return processing result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
Adds outbound NATS “post-indexing” domain events emitted after successful OpenSearch writes so downstream consumers can react to indexing completion.
Changes:
- Introduces dynamic event subjects
lfx.{object_type}.{action}viaconstants.BuildEventSubject. - Publishes a new
IndexingEventpayload after successful indexing (non-blocking on publish failure). - Adds unit tests covering event subject construction and publish behavior (success, publish failure, index failure, V1 action canonicalization).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/constants/messaging.go | Adds EventSubjectPrefix and BuildEventSubject helper for domain event subjects. |
| pkg/constants/messaging_test.go | Adds unit tests validating BuildEventSubject output for multiple object types/actions. |
| internal/domain/services/indexer_service.go | Publishes IndexingEvent after successful indexing via publishIndexingEvent. |
| internal/domain/services/indexer_service_test.go | Adds tests asserting event publish, payload shape, non-blocking publish failures, and no publish on index failure. |
| internal/domain/contracts/events.go | Introduces IndexingEvent contract (document/object metadata + timestamp + full TransactionBody). |
| CLAUDE.md | Documents the new outbound domain-event behavior and payload shape. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/domain/services/indexer_service_test.go (1)
1252-1258: Assert publish was attempted in the non-blocking failure test.Right now this test proves indexing still succeeds, but not that publish execution was actually reached.
Proposed test hardening
// Index succeeded even though publish failed assert.NoError(t, err) assert.True(t, result.Success) assert.True(t, result.IndexSuccess) assert.Equal(t, "project:test-project", result.DocumentID) assert.Len(t, mockStorageRepo.IndexCalls, 1) + assert.Len(t, mockMessagingRepo.PublishCalls, 1)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/domain/services/indexer_service_test.go` around lines 1252 - 1258, The test currently verifies indexing succeeded after a non-blocking publish failure but doesn't assert that the publish path was actually invoked; add an assertion that the publish mock recorded a call (e.g., assert.Len(t, mockPublisher.PublishCalls, 1) or equivalent for the mock used in the test) after the index assertions so the test ensures publish execution was attempted (reference the existing mockStorageRepo.IndexCalls pattern and the publish mock's call list).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/domain/services/indexer_service.go`:
- Around line 822-829: The current creation of contracts.IndexingEvent populates
Body with the full TransactionBody (variable body), which can leak PII and
arbitrary Data; update the code that builds the IndexingEvent (where
contracts.IndexingEvent is constructed) to instead publish a
sanitized/allowlisted payload: create a new struct or map containing only safe
fields (e.g., ObjectRef/ObjectID/ObjectType/action/timestamp and any explicit
non-PII fields) or implement a redactTransactionBody(body) helper that strips
emails, credentials and Data before assigning to Body, and gate emitting the
full body behind a strict config flag (e.g., EnableEmitFullTransactionBody) so
full-body emission only occurs when explicitly enabled. Ensure the change
touches the IndexingEvent construction site and any publisher code that consumes
Body to avoid breaking consumers.
---
Nitpick comments:
In `@internal/domain/services/indexer_service_test.go`:
- Around line 1252-1258: The test currently verifies indexing succeeded after a
non-blocking publish failure but doesn't assert that the publish path was
actually invoked; add an assertion that the publish mock recorded a call (e.g.,
assert.Len(t, mockPublisher.PublishCalls, 1) or equivalent for the mock used in
the test) after the index assertions so the test ensures publish execution was
attempted (reference the existing mockStorageRepo.IndexCalls pattern and the
publish mock's call list).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 49c0b490-d045-488c-a2f5-088f10ddb5f4
📒 Files selected for processing (6)
CLAUDE.mdinternal/domain/contracts/events.gointernal/domain/services/indexer_service.gointernal/domain/services/indexer_service_test.gopkg/constants/messaging.gopkg/constants/messaging_test.go
After adding publishIndexingEvent to IndexerService, the application
layer tests panicked because the testify/mock-based MockMessagingRepository
received an unexpected Publish call. Added On("Publish", ...).Return(nil)
to all tests with a successful index path and named the previously
discarded mockMessagingRepo variable where needed.
Generated with [Claude Code](https://claude.ai/code)
Signed-off-by: Andres Tobon <andrest2455@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/application/message_processor_test.go (1)
321-352:⚠️ Potential issue | 🟠 MajorAssert messaging mock expectations in both V1 success tests.
Line 344 and Line 503 add
Publishexpectations, but these tests never callmockMessagingRepo.AssertExpectations(t). They can pass even ifPublishis never invoked.Suggested fix
func TestMessageProcessor_ProcessV1IndexingMessage_Success(t *testing.T) { mp, mockMessagingRepo, mockStorageRepo := setupTestMessageProcessor() @@ assert.NoError(t, err) mockStorageRepo.AssertExpectations(t) + mockMessagingRepo.AssertExpectations(t) } @@ func TestIndexingHandler_HandleWithReply_V1Message(t *testing.T) { mp, mockMessagingRepo, mockStorageRepo := setupTestMessageProcessor() @@ assert.NoError(t, err) assert.True(t, replyCalled) mockStorageRepo.AssertExpectations(t) + mockMessagingRepo.AssertExpectations(t) }Also applies to: 475-515
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/application/message_processor_test.go` around lines 321 - 352, The tests that set a Publish expectation on mockMessagingRepo but never assert it (in the tests that call ProcessV1IndexingMessage) should call mockMessagingRepo.AssertExpectations(t) after executing the function and before ending assertions; update the test(s) that create mp via setupTestMessageProcessor(), marshal testData, call mp.ProcessV1IndexingMessage(ctx, data, subject) and currently only call mockStorageRepo.AssertExpectations(t) to also call mockMessagingRepo.AssertExpectations(t) so the Publish expectation is verified.
🧹 Nitpick comments (1)
internal/application/message_processor_test.go (1)
211-212: Add explicit publish-failure tests to lock in non-blocking behavior.All new
Publishexpectations returnnil; there’s no test proving processing still succeeds whenPublishreturns an error. That is the core behavior of this PR and should be covered for both V2 and V1 paths.Suggested test shape
func TestMessageProcessor_ProcessIndexingMessage_PublishFailureNonBlocking(t *testing.T) { mp, mockMessagingRepo, mockStorageRepo := setupTestMessageProcessor() ctx := context.Background() testData := map[string]any{ "action": "created", "data": map[string]any{ "id": "test-123", "name": "Test Project", "public": true, }, "headers": map[string]string{"authorization": "Bearer test-token"}, } data, _ := json.Marshal(testData) mockStorageRepo.On("Index", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mockMessagingRepo.On("ParsePrincipals", mock.Anything, mock.Anything).Return([]contracts.Principal{}, nil) mockMessagingRepo.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("nats publish failed")) err := mp.ProcessIndexingMessage(ctx, data, "lfx.index.project") assert.NoError(t, err) // non-blocking guarantee mockStorageRepo.AssertExpectations(t) mockMessagingRepo.AssertExpectations(t) }As per coding guidelines,
internal/{application,presentation}/**/*_test.go: "Test both V2 and V1 message formats where applicable."Also applies to: 344-345, 458-459, 503-504, 586-587, 690-691
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/application/message_processor_test.go` around lines 211 - 212, Add explicit tests that verify Publish errors are non-blocking by extending the existing test suite: create tests (e.g., TestMessageProcessor_ProcessIndexingMessage_PublishFailureNonBlocking for both V2 and V1 message shapes) using setupTestMessageProcessor(), marshal the V2 and V1 payloads, stub mockStorageRepo.Index to return nil, stub mockMessagingRepo.ParsePrincipals to return empty principals, and stub mockMessagingRepo.Publish to return an error (e.g., errors.New("nats publish failed")); then call ProcessIndexingMessage(ctx, data, topic) and assert no error is returned and that mockStorageRepo.AssertExpectations(t) and mockMessagingRepo.AssertExpectations(t) are satisfied. Ensure you add one test for the V2 format and one for the V1 format (matching other tests in the file).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/application/message_processor_test.go`:
- Around line 744-750: The benchmark tests set up mp via
setupTestMessageProcessor() and mock only mockStorageRepo.Index and
mockMessagingRepo.Publish, but the V2 path may call ParsePrincipals when an
authorization header is present; add a mock expectation for ParsePrincipals to
the benchmark setup to avoid unexpected-call panics—specifically, in the
benchmark sections that use authorization headers (around the blocks that
currently set mockStorageRepo.On("Index", ...) and
mockMessagingRepo.On("Publish", ...)), add a mock for ParsePrincipals (or the
function/method the code invokes to parse principals) that returns the expected
principals and nil error so the V2 processing path can proceed in the
benchmarks.
---
Outside diff comments:
In `@internal/application/message_processor_test.go`:
- Around line 321-352: The tests that set a Publish expectation on
mockMessagingRepo but never assert it (in the tests that call
ProcessV1IndexingMessage) should call mockMessagingRepo.AssertExpectations(t)
after executing the function and before ending assertions; update the test(s)
that create mp via setupTestMessageProcessor(), marshal testData, call
mp.ProcessV1IndexingMessage(ctx, data, subject) and currently only call
mockStorageRepo.AssertExpectations(t) to also call
mockMessagingRepo.AssertExpectations(t) so the Publish expectation is verified.
---
Nitpick comments:
In `@internal/application/message_processor_test.go`:
- Around line 211-212: Add explicit tests that verify Publish errors are
non-blocking by extending the existing test suite: create tests (e.g.,
TestMessageProcessor_ProcessIndexingMessage_PublishFailureNonBlocking for both
V2 and V1 message shapes) using setupTestMessageProcessor(), marshal the V2 and
V1 payloads, stub mockStorageRepo.Index to return nil, stub
mockMessagingRepo.ParsePrincipals to return empty principals, and stub
mockMessagingRepo.Publish to return an error (e.g., errors.New("nats publish
failed")); then call ProcessIndexingMessage(ctx, data, topic) and assert no
error is returned and that mockStorageRepo.AssertExpectations(t) and
mockMessagingRepo.AssertExpectations(t) are satisfied. Ensure you add one test
for the V2 format and one for the V1 format (matching other tests in the file).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e5e3c00d-babb-441b-9351-319a3629d3f5
📒 Files selected for processing (1)
internal/application/message_processor_test.go
|
|
||
| // BuildEventSubject constructs the NATS subject for a post-indexing domain event. | ||
| // Format: lfx.{object_type}.{action} (e.g., "lfx.project.created", "lfx.committee.deleted"). | ||
| func BuildEventSubject(objectType string, action MessageAction) string { |
There was a problem hiding this comment.
Not a blocker at all, but it seems a bit odd to see functions like this in the constant package. In the future, we might want to create another folder here (with a meaningful name, like eventing) or in the domain (like a parent of all event types) or something else and move this useful function.
There was a problem hiding this comment.
That's a good point, I agree we can make this update.
Summary
Domain Events
Subject format
The subject is derived from the object type and the past-tense action. For example:
lfx.project.createdlfx.project.updatedlfx.project.deletedlfx.committee.createdlfx.meeting.deletedConsumers can subscribe to a single type (
lfx.project.*), a single action across all types, or all indexing events at once.Payload fields
document_id{object_type}:{object_id}object_idobject_typeactioncreated,updated, ordeletedtimestampbodyWatching events live
Subscribe to all committee events using the NATS CLI:
When a committee is created, the indexer publishes to
lfx.committee.createdand the following message arrives:Ticket
LFXV2-1187
🤖 Generated with Claude Code