Skip to content

Commit 986bdf4

Browse files
authored
feat(identity): add audit system migration and provisioner reconciliation (#2179)
* feat(identity): add audit_log and audit_outbox migration Identity service was missing audit infrastructure, causing identity mutations to produce no audit trail. Add migration matching the party service pattern: audit_log for permanent records, audit_outbox for async processing. Tables use unqualified names so per-tenant search_path routing places them in each org schema. Without this migration, audit hooks targeting meridian_identity have no destination tables and existing tenant schemas have no audit trail at all. Companion change in the provisioner reconciles this schema into already-provisioned tenants. * feat(tenant): reconcile migrations on provisioning worker startup The ReconcileMigrations endpoint already applies new migration files to existing tenant schemas, but was only reachable via a manual gRPC call. This left the gap where a deploy adding new migration files (such as the identity audit_log/audit_outbox tables) would fix new tenants but leave existing tenants without the schema additions. Wire the worker's Start path to invoke ReconcileMigrations(nil) once after crash recovery and before the polling loop begins. Errors are logged but never block startup - reconciliation is best-effort, identical to the existing crash recovery pass. Tests use a controlled mock that tracks reconciliation calls to verify the wiring runs exactly once on startup, plus a positive-path test that confirms reconciliation does not block subsequent provisioning of pending tenants. The MockProvisioner gains a ReconciliationCalls slice so other tests can assert on the tenantID argument shape. * fix(identity): align audit_log schema with audit infrastructure The original migration used changed_at, matching the historical party audit table at 20251217. Every other service had to retrofit that with 20260323 alignment migrations because the runtime audit code (GORM AuditLog model, Kafka consumer buildAuditLogFromEvent, outbox processor, audit-worker TenantAuditWriter) all read and write created_at, plus event_id, schema_name, correlation_id, causation_id, and idempotency_key. Identity is brand new so it can start with the aligned schema and skip the retrofit chain. Concretely: - Rename changed_at to created_at (with default now()) and update the index name. Without this, INSERT/SELECT from the audit consumer and the outbox worker would fail at runtime against the identity schema. - Add event_id (with unique index) plus schema_name, correlation_id, causation_id, and idempotency_key columns expected by the audit-worker TenantAuditWriter idempotent insert path. - Add INITIAL_IMPORT to the operation CHECK enum to match the proto AuditOperation surface in shared/platform/audit/consumer.go. Also clear ReconciliationCalls in MockProvisioner.Reset() so the slice behaves like the other call-tracking slices and doesn't leak state across tests that share a mock instance. Atlas hash regenerated; atlas migrate validate passes. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 9bd6176 commit 986bdf4

5 files changed

Lines changed: 275 additions & 8 deletions

File tree

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
-- Identity Service Audit System
2+
-- Static audit table creation (CockroachDB compatible)
3+
-- Audit logging handled at application level via GORM hooks
4+
-- See ADR-0009 for rationale
5+
-- Uses unqualified table names (relies on per-tenant schema routing via search_path)
6+
--
7+
-- Schema is aligned with shared/platform/audit (created_at, not changed_at) and the
8+
-- audit-worker TenantAuditWriter (event_id, schema_name, correlation_id, causation_id,
9+
-- idempotency_key) from the start. Other services accreted these columns through
10+
-- follow-up alignment migrations (services/party/migrations/20260323*); identity
11+
-- starts aligned to avoid the same retrofit.
12+
13+
-- Create audit_log table (unqualified, singular)
14+
CREATE TABLE IF NOT EXISTS audit_log (
15+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
16+
17+
-- What changed
18+
table_name VARCHAR(100) NOT NULL,
19+
operation VARCHAR(10) NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE', 'INITIAL_IMPORT')),
20+
21+
-- Record identification
22+
record_id UUID NOT NULL,
23+
24+
-- Change metadata
25+
-- created_at matches the GORM AuditLog model in shared/platform/audit/hooks.go
26+
-- and the consumer/worker INSERT/SELECT paths.
27+
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
28+
changed_by VARCHAR(100),
29+
30+
-- Change details (TEXT to match GORM model - contains JSON strings)
31+
old_values TEXT,
32+
new_values TEXT,
33+
34+
-- Additional context
35+
transaction_id VARCHAR(100),
36+
client_ip VARCHAR(45),
37+
user_agent TEXT,
38+
39+
-- Idempotency and tenant-aware writer fields used by the audit-worker
40+
-- TenantAuditWriter (services/audit-worker/adapters/persistence/tenant_audit_writer.go).
41+
-- event_id supports ON CONFLICT idempotent inserts.
42+
event_id VARCHAR(100),
43+
schema_name VARCHAR(100),
44+
correlation_id VARCHAR(255),
45+
causation_id VARCHAR(255),
46+
idempotency_key VARCHAR(255)
47+
);
48+
49+
-- Indexes for efficient audit queries
50+
CREATE INDEX IF NOT EXISTS idx_audit_log_table_name ON audit_log(table_name);
51+
CREATE INDEX IF NOT EXISTS idx_audit_log_record_id ON audit_log(record_id);
52+
CREATE INDEX IF NOT EXISTS idx_audit_log_created_at ON audit_log(created_at);
53+
CREATE INDEX IF NOT EXISTS idx_audit_log_changed_by ON audit_log(changed_by);
54+
CREATE INDEX IF NOT EXISTS idx_audit_log_operation ON audit_log(operation);
55+
-- Unique index on event_id supports ON CONFLICT for idempotent writes from
56+
-- the Kafka consumer path. CockroachDB treats NULLs as distinct in UNIQUE
57+
-- indexes, so outbox-path rows that leave event_id NULL coexist with
58+
-- Kafka-path rows that populate it (matches services/party/migrations/20260323*).
59+
CREATE UNIQUE INDEX IF NOT EXISTS idx_audit_log_event_id ON audit_log(event_id);
60+
61+
-- Create audit_outbox table for async processing (unqualified, singular)
62+
-- GORM hooks write to outbox, background worker moves to audit_log
63+
CREATE TABLE IF NOT EXISTS audit_outbox (
64+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
65+
66+
-- What changed
67+
table_name VARCHAR(100) NOT NULL,
68+
operation VARCHAR(10) NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE', 'INITIAL_IMPORT')),
69+
70+
-- Record identification
71+
record_id UUID NOT NULL,
72+
73+
-- Change details (TEXT to match GORM model - contains JSON strings)
74+
old_values TEXT,
75+
new_values TEXT,
76+
77+
-- Processing status (includes 'completed' for successful processing)
78+
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
79+
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
80+
retry_count INT NOT NULL DEFAULT 0,
81+
last_error TEXT,
82+
83+
-- Additional context
84+
changed_by VARCHAR(100),
85+
transaction_id VARCHAR(100),
86+
client_ip VARCHAR(45),
87+
user_agent TEXT
88+
);
89+
90+
-- Index for worker to efficiently find pending entries
91+
CREATE INDEX IF NOT EXISTS idx_audit_outbox_status_created ON audit_outbox(status, created_at);

services/identity/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:HbreHZ7DfYjRRTYEAvjljVJTg+KizbMvh1A0wMA0WcU=
1+
h1:c25TZhcUPvpzfYUa0V6OPPg1/HQbd7kkUtN8519OZdM=
22
20260302000001_initial.sql h1:SsQN4cGTrm/6qs12GD9YXoSO6QyKTIpNZrM+4rRBWJA=
33
20260326000001_add_tenant_id.sql h1:+/NF9LBhT7EjKVqu5gxY1B+vJ+YDM9acG8lLSXGJEWs=
44
20260326000002_add_tenant_id_indexes.sql h1:Ov5Wfsl+MTIrtNdnqdh9p5qGpaeBW/wBPXILN4JvuvM=
@@ -8,3 +8,4 @@ h1:HbreHZ7DfYjRRTYEAvjljVJTg+KizbMvh1A0wMA0WcU=
88
20260327000004_password_reset_tokens.sql h1:KbLBrcNV5fCjyj04r6YNETLkNXg1dwSUGL4ZHk1pqqQ=
99
20260327000005_drop_role_constraint.sql h1:d4mxJFQHkcJEtDEZ+ztR+U1j7Z47eud+KorVVr50TUQ=
1010
20260327000006_add_role_constraint.sql h1:WAiUQF0nKzDyrc506KuT1RGEkVeWbMEIfPYd/c+gfuM=
11+
20260425000001_audit_system.sql h1:1LjVA/FeCTwjkuQqroCA03dFlmClLKxd/1VAwJjFduo=

services/tenant/provisioner/mock_provisioner.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type MockProvisioner struct {
4242
// PurgeCalls tracks calls to PurgeSchemas for verification
4343
PurgeCalls []tenant.TenantID
4444

45+
// ReconciliationCalls tracks calls to ReconcileMigrations for verification.
46+
// Each entry is the tenantID argument; nil entries indicate "all tenants" reconciliation.
47+
ReconciliationCalls []*tenant.TenantID
48+
4549
// DataRetentionPeriod for testing retention enforcement
4650
DataRetentionPeriod time.Duration
4751

@@ -63,6 +67,7 @@ func NewMockProvisioner(services []ServiceConfig) *MockProvisioner {
6367
ProvisioningCalls: make([]tenant.TenantID, 0),
6468
DeprovisioningCalls: make([]tenant.TenantID, 0),
6569
PurgeCalls: make([]tenant.TenantID, 0),
70+
ReconciliationCalls: make([]*tenant.TenantID, 0),
6671
DataRetentionPeriod: 0, // No retention period by default for testing
6772
}
6873
}
@@ -271,6 +276,7 @@ func (m *MockProvisioner) Reset() {
271276
m.ProvisioningCalls = make([]tenant.TenantID, 0)
272277
m.DeprovisioningCalls = make([]tenant.TenantID, 0)
273278
m.PurgeCalls = make([]tenant.TenantID, 0)
279+
m.ReconciliationCalls = make([]*tenant.TenantID, 0)
274280
m.FailProvisioningFor = make(map[string]error)
275281
m.FailDeprovisioningFor = make(map[string]error)
276282
m.FailPurgeFor = make(map[string]error)
@@ -288,11 +294,21 @@ func (m *MockProvisioner) SetStatus(status *ProvisioningStatus) {
288294

289295
// ReconcileMigrations simulates reconciling migrations for existing tenants.
290296
// In the mock, this is a no-op that returns success. Tests can verify calls
291-
// via ReconciliationCalls if needed.
297+
// via ReconciliationCalls.
292298
func (m *MockProvisioner) ReconcileMigrations(_ context.Context, tenantID *tenant.TenantID) (int, []string) {
293299
m.mu.Lock()
294300
defer m.mu.Unlock()
295301

302+
// Record the call for test verification. Copy the tenant ID so tests can
303+
// distinguish between separate "single tenant" calls without the caller's
304+
// pointer aliasing affecting recorded history.
305+
if tenantID != nil {
306+
copied := *tenantID
307+
m.ReconciliationCalls = append(m.ReconciliationCalls, &copied)
308+
} else {
309+
m.ReconciliationCalls = append(m.ReconciliationCalls, nil)
310+
}
311+
296312
if tenantID != nil {
297313
// Single tenant reconciliation
298314
if status, exists := m.statuses[tenantID.String()]; exists && status.State == StateActive {

services/tenant/worker/provisioning_worker.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,16 @@ func applyConfigDefaults(config Config) Config {
150150
// It runs until ctx is cancelled or Stop() is called.
151151
// The method blocks and should be run in a separate goroutine.
152152
//
153-
// On startup, it recovers any tenants that were stuck in PROVISIONING status
154-
// from a previous worker crash. This ensures they get re-queued for provisioning.
153+
// On startup, it performs two best-effort passes before entering the polling loop:
154+
// 1. Recover any tenants stuck in PROVISIONING status from a previous worker crash,
155+
// re-queuing them for provisioning.
156+
// 2. Reconcile migrations against all active tenants, applying any new migration
157+
// files that were added since each tenant was originally provisioned. This
158+
// allows new schema additions (e.g. audit tables) to roll out to existing
159+
// tenants on the next deploy without requiring a manual gRPC trigger.
160+
//
161+
// Both passes log errors but never block the worker from starting - a failed
162+
// reconciliation should not stop the worker from processing pending tenants.
155163
func (w *ProvisioningWorker) Start(ctx context.Context) {
156164
// Recover any tenants stuck in PROVISIONING status from previous worker crash.
157165
// This is best-effort - we log errors but continue starting the worker.
@@ -162,6 +170,12 @@ func (w *ProvisioningWorker) Start(ctx context.Context) {
162170
w.logger.Info("startup recovery completed", "recovered_count", recoveredCount)
163171
}
164172

173+
// Reconcile migrations across all active tenants. This applies new migration
174+
// files (e.g. the identity audit_log/audit_outbox tables) to schemas that
175+
// were provisioned before those migrations existed. Best-effort - per-tenant
176+
// errors are logged but do not prevent the worker from starting.
177+
w.reconcileMigrationsOnStartup(ctx)
178+
165179
ticker := time.NewTicker(w.pollInterval)
166180
defer ticker.Stop()
167181

@@ -597,6 +611,36 @@ var permanentPatterns = []string{
597611
"deprovisioned", // Tenant is deprovisioned
598612
}
599613

614+
// reconcileMigrationsOnStartup invokes the provisioner's ReconcileMigrations against
615+
// all active tenants. This applies any new migration files that were added since the
616+
// tenant was originally provisioned.
617+
//
618+
// Best-effort: per-tenant errors are logged but never propagated. The method panics
619+
// recovery so a misbehaving provisioner cannot prevent worker startup.
620+
func (w *ProvisioningWorker) reconcileMigrationsOnStartup(ctx context.Context) {
621+
defer func() {
622+
if r := recover(); r != nil {
623+
w.logger.Error("panic during startup migration reconciliation",
624+
"panic", r)
625+
}
626+
}()
627+
628+
w.logger.Info("starting migration reconciliation for active tenants")
629+
630+
reconciledCount, errs := w.provisioner.ReconcileMigrations(ctx, nil)
631+
632+
if len(errs) > 0 {
633+
w.logger.Warn("startup migration reconciliation completed with errors",
634+
"reconciled_count", reconciledCount,
635+
"error_count", len(errs),
636+
"errors", errs)
637+
return
638+
}
639+
640+
w.logger.Info("startup migration reconciliation completed",
641+
"reconciled_count", reconciledCount)
642+
}
643+
600644
// RecoverStuckTenants resets tenants that have been stuck in PROVISIONING status
601645
// for longer than the specified threshold back to PROVISIONING_PENDING status.
602646
// This allows them to be picked up and re-provisioned on the next polling cycle.

services/tenant/worker/provisioning_worker_test.go

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -778,10 +778,11 @@ var (
778778

779779
// ControlledMockProvisioner is a mock that allows controlled failure/success sequences.
780780
type ControlledMockProvisioner struct {
781-
mu sync.Mutex
782-
calls []tenant.TenantID
783-
failureSequence []error // nil = success, error = fail with this error
784-
callIndex int
781+
mu sync.Mutex
782+
calls []tenant.TenantID
783+
failureSequence []error // nil = success, error = fail with this error
784+
callIndex int
785+
reconciliationCalls int // Tracks ReconcileMigrations invocations
785786
}
786787

787788
func (m *ControlledMockProvisioner) ProvisionSchemas(_ context.Context, tenantID tenant.TenantID) error {
@@ -813,9 +814,20 @@ func (m *ControlledMockProvisioner) GetProvisioningStatus(_ context.Context, _ t
813814
}
814815

815816
func (m *ControlledMockProvisioner) ReconcileMigrations(_ context.Context, _ *tenant.TenantID) (int, []string) {
817+
m.mu.Lock()
818+
defer m.mu.Unlock()
819+
m.reconciliationCalls++
816820
return 0, nil
817821
}
818822

823+
// GetReconciliationCallCount returns the number of times ReconcileMigrations
824+
// has been invoked. Used to verify worker startup wiring.
825+
func (m *ControlledMockProvisioner) GetReconciliationCallCount() int {
826+
m.mu.Lock()
827+
defer m.mu.Unlock()
828+
return m.reconciliationCalls
829+
}
830+
819831
func (m *ControlledMockProvisioner) GetRequiredSchemas() []string {
820832
return []string{"party", "current-account"}
821833
}
@@ -1993,3 +2005,106 @@ func TestNewProvisioningWorker_RecoveryThresholdCustom(t *testing.T) {
19932005
require.NotNil(t, worker)
19942006
assert.Equal(t, customThreshold, worker.recoveryThreshold, "recoveryThreshold should be set to custom value")
19952007
}
2008+
2009+
// =============================================================================
2010+
// Startup Reconciliation Tests
2011+
// =============================================================================
2012+
2013+
// TestProvisioningWorker_StartupReconciliation verifies that worker.Start triggers
2014+
// migration reconciliation against all active tenants. This ensures new migrations
2015+
// (e.g., the identity audit_log/audit_outbox tables) are applied to existing tenant
2016+
// schemas without requiring a manual gRPC trigger.
2017+
func TestProvisioningWorker_StartupReconciliation(t *testing.T) {
2018+
_, repo := setupTestDB(t)
2019+
2020+
mockProv := &ControlledMockProvisioner{}
2021+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
2022+
config := Config{
2023+
PollInterval: 100 * time.Millisecond,
2024+
RecoveryThreshold: 5 * time.Minute,
2025+
}
2026+
worker, err := NewProvisioningWorker(repo, mockProv, config, logger)
2027+
require.NoError(t, err)
2028+
2029+
workerCtx, cancel := context.WithCancel(context.Background())
2030+
done := make(chan struct{})
2031+
go func() {
2032+
worker.Start(workerCtx)
2033+
close(done)
2034+
}()
2035+
2036+
// Reconciliation must run during Start, before the polling loop. Wait briefly
2037+
// for the call to land - the operation is synchronous within Start so this
2038+
// poll is checking ordering, not racing against rate-limited work.
2039+
err = await.AtMost(2 * time.Second).PollInterval(10 * time.Millisecond).Until(func() bool {
2040+
return mockProv.GetReconciliationCallCount() >= 1
2041+
})
2042+
require.NoError(t, err, "Start should invoke ReconcileMigrations exactly once")
2043+
2044+
cancel()
2045+
worker.Stop()
2046+
<-done
2047+
2048+
// Reconciliation should have run exactly once - on startup, not per poll.
2049+
assert.Equal(t, 1, mockProv.GetReconciliationCallCount(),
2050+
"reconciliation should run only on startup, not on every poll cycle")
2051+
}
2052+
2053+
// TestProvisioningWorker_StartupReconciliation_NonFatalErrors verifies that
2054+
// reconciliation errors are logged but do NOT prevent the worker from starting.
2055+
// A failing reconciliation is best-effort: it should not stop the worker from
2056+
// processing pending tenants.
2057+
func TestProvisioningWorker_StartupReconciliation_NonFatalErrors(t *testing.T) {
2058+
_, repo := setupTestDB(t)
2059+
ctx := context.Background()
2060+
2061+
// Create a pending tenant so we can verify the worker still processes it
2062+
// after reconciliation.
2063+
pendingTenant := &domain.Tenant{
2064+
ID: tenant.MustNewTenantID("post_reconcile_tenant"),
2065+
DisplayName: "Post Reconcile",
2066+
SettlementAsset: "GBP",
2067+
Status: domain.StatusProvisioningPending,
2068+
CreatedAt: time.Now(),
2069+
Version: 1,
2070+
}
2071+
require.NoError(t, repo.Create(ctx, pendingTenant))
2072+
2073+
// Use a real MockProvisioner so reconciliation returns valid (empty) results
2074+
// while ProvisionSchemas succeeds for the pending tenant.
2075+
mockProv := provisioner.NewMockProvisioner(nil)
2076+
2077+
var logBuffer safeBuffer
2078+
logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug}))
2079+
config := Config{
2080+
PollInterval: 50 * time.Millisecond,
2081+
RecoveryThreshold: 5 * time.Minute,
2082+
}
2083+
worker, err := NewProvisioningWorker(repo, mockProv, config, logger)
2084+
require.NoError(t, err)
2085+
2086+
workerCtx, cancel := context.WithCancel(context.Background())
2087+
done := make(chan struct{})
2088+
go func() {
2089+
worker.Start(workerCtx)
2090+
close(done)
2091+
}()
2092+
2093+
// The pending tenant should still be provisioned despite the reconciliation
2094+
// pass running first - proving startup isn't blocked by reconciliation.
2095+
err = await.AtMost(3 * time.Second).PollInterval(20 * time.Millisecond).Until(func() bool {
2096+
t, _ := repo.GetByID(ctx, pendingTenant.ID)
2097+
return t != nil && t.Status == domain.StatusActive
2098+
})
2099+
require.NoError(t, err, "pending tenant should be provisioned after startup reconciliation")
2100+
2101+
cancel()
2102+
worker.Stop()
2103+
<-done
2104+
2105+
// Reconciliation must have been called.
2106+
assert.Len(t, mockProv.ReconciliationCalls, 1,
2107+
"reconciliation should have been called once on startup")
2108+
assert.Nil(t, mockProv.ReconciliationCalls[0],
2109+
"startup reconciliation targets all tenants (nil tenantID)")
2110+
}

0 commit comments

Comments
 (0)