Skip to content

Commit 0d8eb59

Browse files
authored
feat: migrate current-account to outbox publisher (#1311)
* feat: migrate current-account control events to transactional outbox Replace fire-and-forget PublishWithTenant() calls for account lifecycle events (frozen, unfrozen, closed) with the transactional outbox pattern. The account state change and event write now happen atomically within a single database transaction, guaranteeing at-least-once delivery via the background outbox worker rather than direct Kafka publishing. - Add outboxPublisher field to Service struct - Wire OutboxPublisher in NewServiceWithExistingClients - Replace publishControlActionEvent (fire-and-forget) with saveWithOutboxEvent which atomically saves account + writes to outbox - Use topic constants from shared/platform/events/topics registry - Add integration tests verifying outbox entries are created for each control action (freeze, unfreeze, close) and fallback path works when outbox publisher is not configured * fix: use precision-aware balance conversion in account closed event Replace hardcoded /100 divisor with precision-derived computation in the AccountClosedEvent closing balance field. Supports currencies with any precision (JPY=0, GBP=2, KWH=3) rather than assuming 2 decimal places. Also correct stale test comment in TestControlCurrentAccount_FallbackWithoutOutbox. * test: scope outbox assertions by aggregate_id and service_name Avoids flaky cross-test matches on the shared public.event_outbox table by filtering queries on topic + aggregate_id + service_name. Also uses a unique account ID per test run (derived from the tenant ID) to eliminate fixed-string collisions across parallel runs. * test: pin outbox test connection and add negative fallback assertion Pin SetMaxOpenConns(1) in setupControlOutboxDB so the SET search_path call reliably applies to every subsequent query on the same connection. Also use a unique account ID in the fallback test and add an assertion that no outbox rows are written when the service operates without an OutboxPublisher. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 8c9df13 commit 0d8eb59

3 files changed

Lines changed: 399 additions & 96 deletions

File tree

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
package service
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/lib/pq"
8+
pb "github.com/meridianhub/meridian/api/proto/meridian/current_account/v1"
9+
"github.com/meridianhub/meridian/services/current-account/adapters/persistence"
10+
"github.com/meridianhub/meridian/shared/platform/events"
11+
"github.com/meridianhub/meridian/shared/platform/events/topics"
12+
"github.com/meridianhub/meridian/shared/platform/tenant"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
"gorm.io/gorm"
16+
)
17+
18+
// setupControlOutboxDB creates a test database with both account and event_outbox tables.
19+
func setupControlOutboxDB(t *testing.T) (*gorm.DB, *persistence.Repository, *persistence.LienRepository, tenant.TenantID, func()) {
20+
t.Helper()
21+
22+
db := openSharedDB(t)
23+
24+
// Pin to a single connection so that SET search_path (which is connection-scoped)
25+
// is reliably applied to every subsequent query on this *gorm.DB instance.
26+
sqlDB, err := db.DB()
27+
require.NoError(t, err)
28+
sqlDB.SetMaxOpenConns(1)
29+
30+
tid := uniqueTenantID()
31+
schemaName := tid.SchemaName()
32+
quotedSchema := pq.QuoteIdentifier(schemaName)
33+
34+
// Create tenant schema
35+
err = db.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", quotedSchema)).Error
36+
require.NoError(t, err)
37+
38+
// Set search_path (applies to the pinned connection for all subsequent queries)
39+
err = db.Exec(fmt.Sprintf("SET search_path TO %s, public", quotedSchema)).Error
40+
require.NoError(t, err)
41+
42+
// Create account table (same DDL as setupControlTestDB)
43+
err = db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.account (
44+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
45+
account_id VARCHAR(100) NOT NULL UNIQUE,
46+
account_identification VARCHAR(34) NOT NULL UNIQUE,
47+
account_type VARCHAR(50) NOT NULL DEFAULT 'current',
48+
instrument_code VARCHAR(32) NOT NULL DEFAULT 'GBP',
49+
dimension VARCHAR(20) NOT NULL DEFAULT 'CURRENCY',
50+
precision INT NOT NULL DEFAULT 2,
51+
status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE',
52+
party_id UUID NOT NULL,
53+
org_party_id UUID NULL,
54+
balance BIGINT NOT NULL DEFAULT 0,
55+
available_balance BIGINT NOT NULL DEFAULT 0,
56+
overdraft_limit BIGINT NOT NULL DEFAULT 0,
57+
overdraft_rate NUMERIC(5,4) NOT NULL DEFAULT 0,
58+
balance_updated_at TIMESTAMP WITH TIME ZONE,
59+
opened_at TIMESTAMP WITH TIME ZONE,
60+
closed_at TIMESTAMP WITH TIME ZONE,
61+
freeze_reason VARCHAR(1000),
62+
status_history JSONB NOT NULL DEFAULT '[]'::jsonb,
63+
product_type_code VARCHAR(50) NULL,
64+
product_type_version INT NULL,
65+
behavior_class VARCHAR(50) NULL,
66+
version BIGINT NOT NULL DEFAULT 1,
67+
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
68+
created_by VARCHAR(100) NOT NULL DEFAULT 'test',
69+
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
70+
updated_by VARCHAR(100) NOT NULL DEFAULT 'test',
71+
deleted_at TIMESTAMP WITH TIME ZONE
72+
)`, quotedSchema)).Error
73+
require.NoError(t, err)
74+
75+
// Create lien table
76+
err = db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.lien (
77+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
78+
account_id UUID NOT NULL,
79+
amount_cents BIGINT NOT NULL,
80+
currency VARCHAR(3) NOT NULL,
81+
instrument_code VARCHAR(32) NOT NULL DEFAULT '',
82+
dimension VARCHAR(20) NOT NULL DEFAULT 'CURRENCY',
83+
precision INT NOT NULL DEFAULT 2,
84+
bucket_id VARCHAR(255) NOT NULL DEFAULT '',
85+
status VARCHAR(20) NOT NULL,
86+
payment_order_reference VARCHAR(255) NOT NULL UNIQUE,
87+
termination_reason TEXT,
88+
expires_at TIMESTAMP WITH TIME ZONE,
89+
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
90+
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
91+
version BIGINT NOT NULL DEFAULT 1,
92+
reserved_quantity JSONB,
93+
valued_amount JSONB,
94+
valuation_analysis JSONB
95+
)`, quotedSchema)).Error
96+
require.NoError(t, err)
97+
98+
// Create event_outbox table in public schema (shared across tenants)
99+
err = db.AutoMigrate(&events.EventOutbox{})
100+
require.NoError(t, err, "failed to create event_outbox table")
101+
102+
repo := persistence.NewRepository(db)
103+
lienRepo := persistence.NewLienRepository(db)
104+
105+
cleanup := func() {
106+
_ = db.Exec(fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", quotedSchema))
107+
sqlDB, _ := db.DB()
108+
if sqlDB != nil {
109+
_ = sqlDB.Close()
110+
}
111+
}
112+
113+
return db, repo, lienRepo, tid, cleanup
114+
}
115+
116+
// TestControlCurrentAccount_OutboxEvents verifies that control actions write events to the
117+
// event outbox table atomically with the account state change.
118+
func TestControlCurrentAccount_OutboxEvents(t *testing.T) {
119+
db, repo, lienRepo, tid, cleanup := setupControlOutboxDB(t)
120+
defer cleanup()
121+
122+
ctx := tenant.WithTenant(t.Context(), tid)
123+
124+
outboxRepo := events.NewPostgresOutboxRepository(db)
125+
126+
// Use a unique account ID per test run to avoid cross-test pollution on the shared
127+
// public.event_outbox table (which is not scoped to the tenant schema).
128+
accountID := fmt.Sprintf("ACC-OUTBOX-%s", tid)
129+
130+
// Mock position keeping returns zero balance (needed for CLOSE validation)
131+
mockPosKeeping := &mockPositionKeepingClient{
132+
accountBalances: map[string]int64{
133+
accountID: 0,
134+
},
135+
}
136+
137+
// Build a service wired with outboxPublisher and db
138+
svc := &Service{
139+
repo: repo,
140+
lienRepo: lienRepo,
141+
outboxRepo: outboxRepo,
142+
outboxPublisher: events.NewOutboxPublisher("current-account"),
143+
db: db,
144+
posKeepingClient: mockPosKeeping,
145+
logger: testLogger(),
146+
}
147+
148+
_ = createTestAccountForControl(t, ctx, repo, accountID)
149+
150+
t.Run("FreezeWritesOutboxEntry", func(t *testing.T) {
151+
_, err := svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
152+
AccountId: accountID,
153+
ControlAction: pb.ControlAction_CONTROL_ACTION_FREEZE,
154+
Reason: "Compliance hold for regulatory review",
155+
})
156+
require.NoError(t, err)
157+
158+
// Verify exactly one outbox entry was created for the freeze event.
159+
// Filter by topic + aggregate_id + service_name to avoid matching rows from other runs.
160+
var entries []events.EventOutbox
161+
err = db.Where(
162+
"topic = ? AND aggregate_id = ? AND service_name = ?",
163+
topics.CurrentAccountAccountFrozenV1, accountID, "current-account",
164+
).Find(&entries).Error
165+
require.NoError(t, err)
166+
require.Len(t, entries, 1, "exactly one frozen event should be in outbox")
167+
168+
e := entries[0]
169+
assert.Equal(t, topics.CurrentAccountAccountFrozenV1, e.Topic)
170+
assert.Equal(t, "current_account.account_frozen.v1", e.EventType)
171+
assert.Equal(t, "CurrentAccount", e.AggregateType)
172+
assert.Equal(t, accountID, e.AggregateID)
173+
assert.Equal(t, "current-account", e.ServiceName)
174+
assert.Equal(t, events.StatusPending, e.Status)
175+
assert.NotEmpty(t, e.EventPayload)
176+
})
177+
178+
t.Run("UnfreezeWritesOutboxEntry", func(t *testing.T) {
179+
_, err := svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
180+
AccountId: accountID,
181+
ControlAction: pb.ControlAction_CONTROL_ACTION_UNFREEZE,
182+
})
183+
require.NoError(t, err)
184+
185+
var entries []events.EventOutbox
186+
err = db.Where(
187+
"topic = ? AND aggregate_id = ? AND service_name = ?",
188+
topics.CurrentAccountAccountUnfrozenV1, accountID, "current-account",
189+
).Find(&entries).Error
190+
require.NoError(t, err)
191+
require.Len(t, entries, 1, "exactly one unfrozen event should be in outbox")
192+
193+
e := entries[0]
194+
assert.Equal(t, topics.CurrentAccountAccountUnfrozenV1, e.Topic)
195+
assert.Equal(t, "current_account.account_unfrozen.v1", e.EventType)
196+
assert.Equal(t, "CurrentAccount", e.AggregateType)
197+
assert.Equal(t, accountID, e.AggregateID)
198+
assert.Equal(t, events.StatusPending, e.Status)
199+
})
200+
201+
t.Run("CloseWritesOutboxEntry", func(t *testing.T) {
202+
_, err := svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
203+
AccountId: accountID,
204+
ControlAction: pb.ControlAction_CONTROL_ACTION_CLOSE,
205+
Reason: "Customer requested account closure",
206+
})
207+
require.NoError(t, err)
208+
209+
var entries []events.EventOutbox
210+
err = db.Where(
211+
"topic = ? AND aggregate_id = ? AND service_name = ?",
212+
topics.CurrentAccountAccountClosedV1, accountID, "current-account",
213+
).Find(&entries).Error
214+
require.NoError(t, err)
215+
require.Len(t, entries, 1, "exactly one closed event should be in outbox")
216+
217+
e := entries[0]
218+
assert.Equal(t, topics.CurrentAccountAccountClosedV1, e.Topic)
219+
assert.Equal(t, "current_account.account_closed.v1", e.EventType)
220+
assert.Equal(t, "CurrentAccount", e.AggregateType)
221+
assert.Equal(t, accountID, e.AggregateID)
222+
assert.Equal(t, events.StatusPending, e.Status)
223+
})
224+
}
225+
226+
// TestControlCurrentAccount_FallbackWithoutOutbox verifies that control actions succeed
227+
// via the fallback path (direct repo.Save) when the outbox publisher is not configured.
228+
// This ensures backward compatibility for test environments and services that start without
229+
// a database connection available at construction time.
230+
func TestControlCurrentAccount_FallbackWithoutOutbox(t *testing.T) {
231+
db, repo, lienRepo, tid, cleanup := setupControlOutboxDB(t)
232+
defer cleanup()
233+
234+
ctx := tenant.WithTenant(t.Context(), tid)
235+
236+
accountID := fmt.Sprintf("ACC-FALLBACK-%s", tid)
237+
238+
mockPosKeeping := &mockPositionKeepingClient{
239+
accountBalances: map[string]int64{
240+
accountID: 0, // zero balance required for CLOSE validation
241+
},
242+
}
243+
244+
// Service without outboxPublisher or db — exercises fallback path
245+
svc := &Service{
246+
repo: repo,
247+
lienRepo: lienRepo,
248+
posKeepingClient: mockPosKeeping,
249+
logger: testLogger(),
250+
}
251+
252+
_ = createTestAccountForControl(t, ctx, repo, accountID)
253+
254+
// All three control actions should succeed via fallback (repo.Save only)
255+
_, err := svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
256+
AccountId: accountID,
257+
ControlAction: pb.ControlAction_CONTROL_ACTION_FREEZE,
258+
Reason: "Testing fallback path without outbox",
259+
})
260+
require.NoError(t, err, "freeze should succeed without outbox publisher")
261+
262+
_, err = svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
263+
AccountId: accountID,
264+
ControlAction: pb.ControlAction_CONTROL_ACTION_UNFREEZE,
265+
})
266+
require.NoError(t, err, "unfreeze should succeed without outbox publisher")
267+
268+
_, err = svc.ControlCurrentAccount(ctx, &pb.ControlCurrentAccountRequest{
269+
AccountId: accountID,
270+
ControlAction: pb.ControlAction_CONTROL_ACTION_CLOSE,
271+
Reason: "Close via fallback path",
272+
})
273+
require.NoError(t, err, "close should succeed without outbox publisher")
274+
275+
// Confirm no outbox rows were written — the fallback path must not silently emit events.
276+
var outboxCount int64
277+
err = db.Model(&events.EventOutbox{}).
278+
Where("aggregate_id = ? AND service_name = ?", accountID, "current-account").
279+
Count(&outboxCount).Error
280+
require.NoError(t, err)
281+
assert.Zero(t, outboxCount, "fallback path should not write to event_outbox")
282+
}

0 commit comments

Comments
 (0)