Skip to content

Commit e7d4446

Browse files
authored
feat: add domain event publishing to Party and InternalAccount services (#1313)
* feat: add domain event publishing to Party and InternalAccount services Add outbox-pattern event publishing to Party and InternalBankAccount services, which previously published no domain events. Party service: - Add PartyCreatedEvent and PartyUpdatedEvent to party_events.proto - Add SaveInTx to Repository interface and persistence implementation - Publish PartyCreatedEvent on RegisterParty (atomic with save) - Publish PartyUpdatedEvent on UpdateParty (atomic with save) - Create OutboxVerificationEventPublisher implementing VerificationEventPublisher - Wire outbox worker in cmd/main.go with Kafka bootstrap support InternalAccount service: - Add internal_account_events.proto with FacilityCreatedEvent and BookingCreatedEvent - Add SaveInTx to domain.Repository interface and persistence implementation - Publish FacilityCreatedEvent on InitiateInternalAccount (atomic with save) - Add SetOutboxPublisher method and WithOutboxPublisher option - Wire outbox worker in cmd/main.go with Kafka bootstrap support Shared: - Add party.created.v1, party.updated.v1, party.verification-completed.v1 topic constants - Add internal-account.facility-created.v1, internal-account.booking-created.v1 topic constants - Update topics.yaml and topics_test.go for consistency * refactor: move SaveInTx out of domain.Repository to preserve hexagonal boundary domain.Repository is a pure domain port and must remain free of infrastructure imports. *gorm.DB is an infrastructure detail that belongs in the service layer. - Remove SaveInTx and gorm import from services/internal-account/domain/repository.go - Define service.Repository interface in server.go that embeds domain.Repository and adds SaveInTx, mirroring the pattern used by the party service - Update all constructors (NewService, NewServiceWithClients, NewServiceWithValuationFeatures, NewServiceFull) to accept service.Repository - Update test helpers to use service.Repository instead of domain.Repository --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 804201f commit e7d4446

14 files changed

Lines changed: 613 additions & 70 deletions

File tree

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
syntax = "proto3";
2+
3+
package meridian.events.v1;
4+
5+
import "buf/validate/validate.proto";
6+
import "google/protobuf/timestamp.proto";
7+
8+
option go_package = "github.com/meridianhub/meridian/api/proto/meridian/events/v1;eventsv1";
9+
10+
// FacilityCreatedEvent is published when a new internal account facility is
11+
// initiated. Consumers can use this event to react to new internal account
12+
// creation, such as updating account registries or triggering provisioning.
13+
message FacilityCreatedEvent {
14+
// event_id uniquely identifies this event instance
15+
string event_id = 1 [(buf.validate.field).string = {
16+
uuid: true
17+
}];
18+
19+
// account_id uniquely identifies the internal account facility
20+
string account_id = 2 [(buf.validate.field).string = {
21+
min_len: 1
22+
max_len: 255
23+
}];
24+
25+
// account_code is the human-readable code for the account
26+
string account_code = 3 [(buf.validate.field).string = {
27+
min_len: 1
28+
max_len: 100
29+
}];
30+
31+
// account_type is the classification of the internal account (e.g., "NOSTRO", "VOSTRO")
32+
string account_type = 4 [(buf.validate.field).string = {
33+
max_len: 100
34+
}];
35+
36+
// instrument_code is the instrument code for the facility (e.g., "GBP", "kWh")
37+
string instrument_code = 5 [(buf.validate.field).string = {
38+
min_len: 1
39+
max_len: 50
40+
}];
41+
42+
// correlation_id links related events across services
43+
string correlation_id = 6 [(buf.validate.field).string = {
44+
max_len: 255
45+
}];
46+
47+
// causation_id identifies the event or command that caused this event
48+
string causation_id = 7 [(buf.validate.field).string = {
49+
max_len: 255
50+
}];
51+
52+
// timestamp when the event was created
53+
google.protobuf.Timestamp timestamp = 8 [(buf.validate.field).required = true];
54+
}
55+
56+
// BookingCreatedEvent is published when a new booking (position keeping
57+
// transaction) is posted to an internal account. Consumers can use this
58+
// event to synchronise balances or trigger downstream processing.
59+
message BookingCreatedEvent {
60+
// event_id uniquely identifies this event instance
61+
string event_id = 1 [(buf.validate.field).string = {
62+
uuid: true
63+
}];
64+
65+
// account_id identifies the internal account receiving the booking
66+
string account_id = 2 [(buf.validate.field).string = {
67+
min_len: 1
68+
max_len: 255
69+
}];
70+
71+
// transaction_id identifies the position-keeping transaction associated with the booking
72+
string transaction_id = 3 [(buf.validate.field).string = {
73+
min_len: 1
74+
max_len: 255
75+
}];
76+
77+
// instrument_code is the instrument for this booking (e.g., "GBP", "kWh")
78+
string instrument_code = 4 [(buf.validate.field).string = {
79+
min_len: 1
80+
max_len: 50
81+
}];
82+
83+
// correlation_id links related events across services
84+
string correlation_id = 5 [(buf.validate.field).string = {
85+
max_len: 255
86+
}];
87+
88+
// causation_id identifies the event or command that caused this event
89+
string causation_id = 6 [(buf.validate.field).string = {
90+
max_len: 255
91+
}];
92+
93+
// timestamp when the event was created
94+
google.protobuf.Timestamp timestamp = 7 [(buf.validate.field).required = true];
95+
}

api/proto/meridian/events/v1/party_events.proto

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,89 @@ import "google/protobuf/timestamp.proto";
77

88
option go_package = "github.com/meridianhub/meridian/api/proto/meridian/events/v1;eventsv1";
99

10+
// PartyCreatedEvent is published when a new party is registered in the system.
11+
// Consumers can use this event to react to new party creation, such as
12+
// initialising downstream records or triggering onboarding workflows.
13+
message PartyCreatedEvent {
14+
// event_id uniquely identifies this event instance
15+
string event_id = 1 [(buf.validate.field).string = {
16+
uuid: true
17+
}];
18+
19+
// party_id identifies the newly created party
20+
string party_id = 2 [(buf.validate.field).string = {
21+
uuid: true
22+
}];
23+
24+
// party_type identifies the classification of the party (e.g., "INDIVIDUAL", "ORGANISATION")
25+
string party_type = 3 [(buf.validate.field).string = {
26+
min_len: 1
27+
max_len: 100
28+
}];
29+
30+
// legal_name is the registered legal name of the party
31+
string legal_name = 4 [(buf.validate.field).string = {
32+
min_len: 1
33+
max_len: 500
34+
}];
35+
36+
// status is the initial lifecycle status of the party
37+
string status = 5 [(buf.validate.field).string = {
38+
max_len: 50
39+
}];
40+
41+
// correlation_id links related events across services
42+
string correlation_id = 6 [(buf.validate.field).string = {
43+
max_len: 255
44+
}];
45+
46+
// causation_id identifies the event or command that caused this event
47+
string causation_id = 7 [(buf.validate.field).string = {
48+
max_len: 255
49+
}];
50+
51+
// timestamp when the event was created
52+
google.protobuf.Timestamp timestamp = 8 [(buf.validate.field).required = true];
53+
}
54+
55+
// PartyUpdatedEvent is published when an existing party's details are modified.
56+
// Consumers can use this event to synchronise derived records with updated party data.
57+
message PartyUpdatedEvent {
58+
// event_id uniquely identifies this event instance
59+
string event_id = 1 [(buf.validate.field).string = {
60+
uuid: true
61+
}];
62+
63+
// party_id identifies the party that was updated
64+
string party_id = 2 [(buf.validate.field).string = {
65+
uuid: true
66+
}];
67+
68+
// party_type identifies the classification of the party
69+
string party_type = 3 [(buf.validate.field).string = {
70+
min_len: 1
71+
max_len: 100
72+
}];
73+
74+
// status is the current lifecycle status of the party after the update
75+
string status = 4 [(buf.validate.field).string = {
76+
max_len: 50
77+
}];
78+
79+
// correlation_id links related events across services
80+
string correlation_id = 5 [(buf.validate.field).string = {
81+
max_len: 255
82+
}];
83+
84+
// causation_id identifies the event or command that caused this event
85+
string causation_id = 6 [(buf.validate.field).string = {
86+
max_len: 255
87+
}];
88+
89+
// timestamp when the event was created
90+
google.protobuf.Timestamp timestamp = 7 [(buf.validate.field).required = true];
91+
}
92+
1093
// PartyVerificationCompletedEvent is published when a KYC/AML verification
1194
// transitions to a terminal state (APPROVED, REJECTED, or MANUAL_REVIEW).
1295
// This event enables downstream services to react to verification outcomes,

services/internal-account/adapters/persistence/repository.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ func (r *Repository) Save(ctx context.Context, account domain.InternalAccount) e
162162
})
163163
}
164164

165+
// SaveInTx persists a new or updated account within the provided transaction.
166+
// The caller is responsible for managing the transaction boundary.
167+
func (r *Repository) SaveInTx(ctx context.Context, account domain.InternalAccount, tx *gorm.DB) error {
168+
return r.WithTx(tx).Save(ctx, account)
169+
}
170+
165171
// FindByID retrieves an account by its UUID.
166172
func (r *Repository) FindByID(ctx context.Context, id uuid.UUID) (domain.InternalAccount, error) {
167173
var account domain.InternalAccount

services/internal-account/cmd/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"github.com/meridianhub/meridian/shared/platform/bootstrap"
2222
"github.com/meridianhub/meridian/shared/platform/defaults"
2323
"github.com/meridianhub/meridian/shared/platform/env"
24+
"github.com/meridianhub/meridian/shared/platform/events"
25+
"github.com/meridianhub/meridian/shared/platform/kafka"
2426
"github.com/meridianhub/meridian/shared/platform/observability"
2527
"github.com/meridianhub/meridian/shared/platform/ports"
2628
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -88,6 +90,10 @@ func run(logger *slog.Logger) error {
8890
// Create repository
8991
repo := persistence.NewRepository(db)
9092

93+
// Create outbox repository and publisher for event publishing
94+
outboxRepo := events.NewPostgresOutboxRepository(db)
95+
outboxPublisher := events.NewOutboxPublisher("internal-account")
96+
9197
// Get Kubernetes namespace from environment (defaults to "default")
9298
namespace := env.GetEnvOrDefault("K8S_NAMESPACE", "default")
9399

@@ -106,8 +112,35 @@ func run(logger *slog.Logger) error {
106112
return fmt.Errorf("failed to create service: %w", err)
107113
}
108114

115+
// Wire outbox publisher for domain event publishing
116+
internalAccountService.SetOutboxPublisher(outboxPublisher, db)
117+
109118
logger.Info("service initialized with external clients")
110119

120+
// Start outbox worker for Kafka event delivery (optional - depends on KAFKA_BOOTSTRAP_SERVERS)
121+
var outboxWorkerStop func()
122+
bootstrapServers := env.GetEnvOrDefault("KAFKA_BOOTSTRAP_SERVERS", "")
123+
if bootstrapServers != "" {
124+
producer, err := kafka.NewProtoProducer(kafka.ProducerConfig{
125+
BootstrapServers: bootstrapServers,
126+
ClientID: "internal-account-outbox-worker",
127+
Acks: "all",
128+
Retries: 3,
129+
Compression: "snappy",
130+
})
131+
if err != nil {
132+
logger.Warn("failed to create Kafka producer for outbox worker - events will be persisted but not published",
133+
"error", err)
134+
} else {
135+
w := events.NewWorker(outboxRepo, producer, events.DefaultWorkerConfig("internal-account"), logger)
136+
w.Start(ctx)
137+
outboxWorkerStop = w.Stop
138+
logger.Info("outbox worker started")
139+
}
140+
} else {
141+
logger.Warn("KAFKA_BOOTSTRAP_SERVERS not configured, outbox worker disabled - events will be persisted but not published")
142+
}
143+
111144
// Initialize auth interceptor (optional - based on AUTH_ENABLED)
112145
authConfig := bootstrap.DefaultAuthConfig(logger)
113146
authInterceptor, err := bootstrap.NewAuthInterceptor(ctx, authConfig)
@@ -241,6 +274,13 @@ func run(logger *slog.Logger) error {
241274
return nil
242275
})
243276

277+
if outboxWorkerStop != nil {
278+
orchestrator.AddCleanup(func() error {
279+
outboxWorkerStop()
280+
return nil
281+
})
282+
}
283+
244284
for _, cleanup := range svcClients.cleanupFuncs {
245285
fn := cleanup // capture for closure
246286
orchestrator.AddCleanup(func() error {

0 commit comments

Comments
 (0)