Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,43 @@ This is a **Clean Architecture** implementation processing NATS messages into Op

```
NATS → MessagingRepository → IndexingMessageHandler → MessageProcessor → IndexerService → [Enricher] → OpenSearch
NATS Event Published
lfx.{object_type}.{action}
```

### Domain Events (Outbound)

After every successful OpenSearch write, the service publishes a NATS event. The subject is dynamic based on the object type and action — every object type the indexer handles (project, committee, meeting, etc.) automatically gets its own set of event subjects.

**Subject format**: `lfx.{object_type}.{action}`

Examples for `project` (same pattern applies to all object types):

- `lfx.project.created`
- `lfx.project.updated`
- `lfx.project.deleted`

Useful wildcard subscriptions:

- `lfx.project.*` — all actions for a specific type
- `lfx.*.created` — created events across all types

**Payload** (`internal/domain/contracts/events.go` — `IndexingEvent`):

```json
{
"document_id": "project:abc-123",
"object_id": "abc-123",
"object_type": "project",
"action": "created",
"timestamp": "2026-03-05T19:57:25.679Z",
"body": { ... }
}
```

`body` is the full `TransactionBody` written to OpenSearch. Publish failures are **non-blocking** — the OpenSearch write is unaffected and the error is logged.

### Critical Patterns

**Message Routing**: Subject prefix determines version (`lfx.index.*` = V2, `lfx.v1.index.*` = V1)
Expand Down Expand Up @@ -126,6 +161,8 @@ JANITOR_ENABLED=true
3. Register enricher in `IndexerService.NewIndexerService()`
4. Add tests for the new enricher

Domain events (`lfx.{object_type}.created/updated/deleted`) are emitted automatically for all object types — no additional work required.

### Message Processing Requirements

- Always reply to NATS messages with "OK" or "ERROR: details"
Expand All @@ -143,8 +180,9 @@ JANITOR_ENABLED=true
## Key Files for Understanding the System

- `internal/domain/services/indexer_service.go`: Core business logic
- `internal/application/message_processor.go`: Message workflow orchestration
- `internal/application/message_processor.go`: Message workflow orchestration
- `internal/domain/contracts/transaction.go`: Core business entities
- `internal/domain/contracts/events.go`: `IndexingEvent` — the outbound domain event payload
- `internal/enrichers/registry.go`: Enricher registration and lookup
- `internal/infrastructure/config/app_config.go`: Configuration management
- `cmd/lfx-indexer/main.go`: Dependency injection and service startup
36 changes: 36 additions & 0 deletions internal/domain/contracts/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

// Package contracts defines the interfaces and contracts for the domain layer of the LFX indexer service.
package contracts

import (
"time"

"github.com/linuxfoundation/lfx-v2-indexer-service/pkg/constants"
)

// IndexingEvent is the NATS event payload published after a successful OpenSearch write.
// It is emitted on subjects of the form lfx.{object_type}.{action}
// (e.g., lfx.project.created, lfx.committee.deleted).
type IndexingEvent struct {
// DocumentID is the canonical OpenSearch document reference in the form
// "object_type:object_id" (e.g., "project:abc-123").
DocumentID string `json:"document_id"`

// ObjectID is the identifier of the object (e.g., "abc-123").
ObjectID string `json:"object_id"`

// ObjectType is the type of the object (e.g., "project", "committee").
ObjectType string `json:"object_type"`

// Action is the canonical past-tense action that produced this event
// (e.g., "created", "updated", "deleted").
Action constants.MessageAction `json:"action"`

// Body is the full TransactionBody that was written to OpenSearch.
Body *TransactionBody `json:"body"`

// Timestamp is the UTC time at which the indexing operation completed.
Timestamp time.Time `json:"timestamp"`
}
46 changes: 46 additions & 0 deletions internal/domain/services/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,9 +806,55 @@ func (s *IndexerService) ProcessTransaction(ctx context.Context, transaction *co
"transaction_id", transactionID,
"object_ref", body.ObjectRef)

// Publish post-indexing event. Non-blocking — failure is logged but does not
// affect the result of the indexing operation itself.
s.publishIndexingEvent(ctx, body, s.GetCanonicalAction(transaction))

return result, nil
}

// publishIndexingEvent publishes a domain event to NATS after a successful OpenSearch write.
// Failures are non-blocking: the error is logged and the caller continues normally.
// Subject format: lfx.{object_type}.{action} (e.g., "lfx.project.created").
func (s *IndexerService) publishIndexingEvent(ctx context.Context, body *contracts.TransactionBody, canonicalAction constants.MessageAction) {
logger := logging.FromContext(ctx, s.logger)

event := &contracts.IndexingEvent{
DocumentID: body.ObjectRef,
ObjectID: body.ObjectID,
ObjectType: body.ObjectType,
Action: canonicalAction,
Body: body,
Timestamp: time.Now().UTC(),
}

eventBytes, err := json.Marshal(event)
if err != nil {
logging.LogError(logger, "Failed to marshal indexing event — skipping publish", err,
"document_id", body.ObjectRef,
"action", canonicalAction,
"object_type", body.ObjectType)
return
}

subject := constants.BuildEventSubject(body.ObjectType, canonicalAction)

if err := s.messagingRepo.Publish(ctx, subject, eventBytes); err != nil {
logging.LogError(logger, "Failed to publish indexing event — index write already succeeded", err,
"subject", subject,
"document_id", body.ObjectRef,
"action", canonicalAction,
"object_type", body.ObjectType)
return
}

logger.Info("Indexing event published",
"subject", subject,
"document_id", body.ObjectRef,
"action", canonicalAction,
"object_type", body.ObjectType)
}

// =================
// HEALTH CHECK METHODS
// =================
Expand Down
143 changes: 143 additions & 0 deletions internal/domain/services/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package services

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -1175,3 +1177,144 @@ func TestIndexerService_enrichTransactionData(t *testing.T) {
})
}
}

// =============================================================================
// publishIndexingEvent tests
// =============================================================================

func TestIndexerService_ProcessTransaction_PublishesIndexingEvent(t *testing.T) {
mockStorageRepo := mocks.NewMockStorageRepository()
mockMessagingRepo := mocks.NewMockMessagingRepository()
logger, _ := logging.TestLogger(t)
service := NewIndexerService(mockStorageRepo, mockMessagingRepo, logger)

transaction := &contracts.LFXTransaction{
Action: constants.ActionCreated,
ObjectType: constants.ObjectTypeProject,
Headers: map[string]string{"authorization": "Bearer valid-token"},
Data: map[string]any{
"id": "test-project",
"name": "Test Project",
"public": true,
},
Timestamp: time.Now(),
ParsedPrincipals: []contracts.Principal{
{Principal: "test_user", Email: "test@example.com"},
},
}

result, err := service.ProcessTransaction(context.Background(), transaction, "test-index")

assert.NoError(t, err)
assert.True(t, result.Success)

// Exactly one publish call with the correct subject
assert.Len(t, mockMessagingRepo.PublishCalls, 1)
publishCall := mockMessagingRepo.PublishCalls[0]
assert.Equal(t, "lfx.project.created", publishCall.Subject)

// Payload deserializes to a valid IndexingEvent
var event contracts.IndexingEvent
assert.NoError(t, json.Unmarshal(publishCall.Data, &event))
assert.Equal(t, "project:test-project", event.DocumentID)
assert.Equal(t, "test-project", event.ObjectID)
assert.Equal(t, constants.ObjectTypeProject, event.ObjectType)
assert.Equal(t, constants.ActionCreated, event.Action)
assert.NotNil(t, event.Body)
assert.Equal(t, constants.ObjectTypeProject, event.Body.ObjectType)
assert.False(t, event.Timestamp.IsZero())
}

func TestIndexerService_ProcessTransaction_PublishFailureIsNonBlocking(t *testing.T) {
mockStorageRepo := mocks.NewMockStorageRepository()
mockMessagingRepo := mocks.NewMockMessagingRepository()
mockMessagingRepo.PublishError = fmt.Errorf("NATS connection lost")
logger, _ := logging.TestLogger(t)
service := NewIndexerService(mockStorageRepo, mockMessagingRepo, logger)

transaction := &contracts.LFXTransaction{
Action: constants.ActionUpdated,
ObjectType: constants.ObjectTypeProject,
Headers: map[string]string{"authorization": "Bearer valid-token"},
Data: map[string]any{
"id": "test-project",
"name": "Test Project",
"public": true,
},
Timestamp: time.Now(),
ParsedPrincipals: []contracts.Principal{
{Principal: "test_user", Email: "test@example.com"},
},
}

result, err := service.ProcessTransaction(context.Background(), transaction, "test-index")

// Index succeeded even though publish failed
assert.NoError(t, err)
assert.True(t, result.Success)
assert.True(t, result.IndexSuccess)
assert.Equal(t, "project:test-project", result.DocumentID)
assert.Len(t, mockStorageRepo.IndexCalls, 1)
}

func TestIndexerService_ProcessTransaction_NoEventPublishedOnIndexFailure(t *testing.T) {
mockStorageRepo := mocks.NewMockStorageRepository()
mockStorageRepo.IndexError = fmt.Errorf("opensearch unavailable")
mockMessagingRepo := mocks.NewMockMessagingRepository()
logger, _ := logging.TestLogger(t)
service := NewIndexerService(mockStorageRepo, mockMessagingRepo, logger)

transaction := &contracts.LFXTransaction{
Action: constants.ActionCreated,
ObjectType: constants.ObjectTypeProject,
Headers: map[string]string{"authorization": "Bearer valid-token"},
Data: map[string]any{
"id": "test-project",
"name": "Test Project",
"public": true,
},
Timestamp: time.Now(),
ParsedPrincipals: []contracts.Principal{
{Principal: "test_user", Email: "test@example.com"},
},
}

result, err := service.ProcessTransaction(context.Background(), transaction, "test-index")

assert.Error(t, err)
assert.False(t, result.Success)
// No event published when indexing fails
assert.Len(t, mockMessagingRepo.PublishCalls, 0)
}

func TestIndexerService_ProcessTransaction_V1ActionCanonicalizedInEvent(t *testing.T) {
mockStorageRepo := mocks.NewMockStorageRepository()
mockMessagingRepo := mocks.NewMockMessagingRepository()
logger, _ := logging.TestLogger(t)
service := NewIndexerService(mockStorageRepo, mockMessagingRepo, logger)

// V1 uses present-tense "create" instead of past-tense "created"
transaction := &contracts.LFXTransaction{
Action: constants.ActionCreate,
ObjectType: constants.ObjectTypeProject,
IsV1: true,
Headers: map[string]string{"x-username": "admin", "x-email": "admin@example.com"},
Data: map[string]any{
"id": "v1-project",
"name": "V1 Project",
"public": true,
},
Timestamp: time.Now(),
ParsedPrincipals: []contracts.Principal{
{Principal: "admin", Email: "admin@example.com"},
},
}

result, err := service.ProcessTransaction(context.Background(), transaction, "test-index")

assert.NoError(t, err)
assert.True(t, result.Success)
assert.Len(t, mockMessagingRepo.PublishCalls, 1)
// Subject must use canonical past-tense "created", not raw "create"
assert.Equal(t, "lfx.project.created", mockMessagingRepo.PublishCalls[0].Subject)
}
11 changes: 9 additions & 2 deletions pkg/constants/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ import "time"
// NATS subject prefixes (preserved from existing)
// These constants define the protocol format for NATS message subjects
const (
IndexPrefix = "lfx.index." // V2 message prefix
FromV1Prefix = "lfx.v1.index." // V1 message prefix
IndexPrefix = "lfx.index." // V2 message prefix
FromV1Prefix = "lfx.v1.index." // V1 message prefix
EventSubjectPrefix = "lfx." // Post-indexing domain event prefix: lfx.{object_type}.{action}
)

// BuildEventSubject constructs the NATS subject for a post-indexing domain event.
// Format: lfx.{object_type}.{action} (e.g., "lfx.project.created", "lfx.committee.deleted").
func BuildEventSubject(objectType string, action MessageAction) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker at all, but it seems a bit odd to see functions like this in the constant package. In the future, we might want to create another folder here (with a meaningful name, like eventing) or in the domain (like a parent of all event types) or something else and move this useful function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I agree we can make this update.

return EventSubjectPrefix + objectType + "." + string(action)
}

// NATS subjects (professional expansion)
const (
ProjectSubject = IndexPrefix + "project" // V2 project messages
Expand Down
31 changes: 31 additions & 0 deletions pkg/constants/messaging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

package constants_test

import (
"testing"

"github.com/linuxfoundation/lfx-v2-indexer-service/pkg/constants"
"github.com/stretchr/testify/assert"
)

func TestBuildEventSubject(t *testing.T) {
tests := []struct {
objectType string
action constants.MessageAction
expected string
}{
{constants.ObjectTypeProject, constants.ActionCreated, "lfx.project.created"},
{constants.ObjectTypeProject, constants.ActionUpdated, "lfx.project.updated"},
{constants.ObjectTypeProject, constants.ActionDeleted, "lfx.project.deleted"},
{constants.ObjectTypeCommittee, constants.ActionCreated, "lfx.committee.created"},
{constants.ObjectTypeMeeting, constants.ActionDeleted, "lfx.meeting.deleted"},
}
for _, tt := range tests {
t.Run(tt.expected, func(t *testing.T) {
got := constants.BuildEventSubject(tt.objectType, tt.action)
assert.Equal(t, tt.expected, got)
})
}
}
Loading