Skip to content

Commit cbfaee2

Browse files
authored
feat: Add database schema for operational gateway (#1304)
* feat: Add database schema for operational gateway Creates initial CockroachDB migrations for the operational gateway service: - provider_connections: stores provider endpoint configuration including auth config (JSONB), retry policy, rate limits, health status, and circuit breaker state. Composite primary key (tenant_id, connection_id). - instructions: outbox-pattern table for outbound directives with full lifecycle status tracking (PENDING → DISPATCHING → DELIVERED → ACKNOWLEDGED / FAILED / EXPIRED / CANCELLED). Partial index on (status, priority, scheduled_at) covers the dispatch worker hot path. - instruction_attempts: per-attempt audit log recording response codes, error messages, and duration. FK to instructions. Adds atlas.hcl configuration for local, ci, and production environments using the operational_gateway schema loader. * feat: Register operational-gateway in ServiceDatabases migration map Adds the operational-gateway service to ServiceDatabases in the migration runner so that migration discovery does not fail with "unknown service: no database mapping" during startup and E2E tests. Maps operational-gateway → meridian_operational_gateway database with dedicated meridian_operational_gateway_user credentials, following the same pattern as all other services. * fix: Address schema review feedback in operational gateway migration - Use UUID type for connection_id on provider_connections (proto defines it as uuid; VARCHAR(64) was imprecise and inconsistent with other services) - Change priority column from VARCHAR to SMALLINT (LOW=1, NORMAL=2, HIGH=3, CRITICAL=4) to ensure correct numeric ordering in the outbox dispatch index; VARCHAR DESC produced NORMAL>LOW>HIGH>CRITICAL which was inverted - Drop status from outbox index key columns; the partial WHERE clause already filters to PENDING/RETRYING so status as a leading key column was redundant - Add composite FK (tenant_id, provider_connection_id) → provider_connections to prevent cross-tenant orphaned instructions caught only at dispatch time - Add CHECK (attempt_count >= 0) and CHECK (max_attempts >= 1) to enforce valid retry counters at the DB layer - Make idx_instruction_attempts_instruction UNIQUE to prevent duplicate attempt ordinals that would corrupt retry/audit semantics - Regenerate atlas.sum for updated migration checksum --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent fd784f4 commit cbfaee2

4 files changed

Lines changed: 193 additions & 0 deletions

File tree

internal/migrations/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ var ServiceDatabases = map[string]ServiceDatabase{
7777
"reconciliation": {Database: "meridian_reconciliation", User: "meridian_reconciliation_user", Password: ""},
7878
"forecasting": {Database: "meridian_forecasting", User: "meridian_forecasting_user", Password: ""},
7979
"reference-data": {Database: "meridian_reference_data", User: "meridian_reference_data_user", Password: ""},
80+
"operational-gateway": {Database: "meridian_operational_gateway", User: "meridian_operational_gateway_user", Password: ""},
8081
}
8182

8283
// serviceMigration holds a single migration file for a service.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Atlas configuration for Operational Gateway Service
2+
// BIAN Service Domain: Operational Gateway
3+
// Manages outbound instructions to external providers and provider connection configurations.
4+
// Uses database-per-service architecture with unqualified table names.
5+
6+
data "external_schema" "gorm" {
7+
program = [
8+
"go",
9+
"run",
10+
"-mod=mod",
11+
"./utilities/atlas-loader",
12+
"--schema=operational_gateway"
13+
]
14+
}
15+
16+
env "local" {
17+
// Service-specific migration directory
18+
migration {
19+
dir = "file://services/operational-gateway/migrations"
20+
}
21+
22+
// Dev database - uses default public schema
23+
dev = "docker://postgres/16/dev"
24+
25+
// Source schema from GORM models via external loader
26+
src = data.external_schema.gorm.url
27+
28+
// Lint configuration to catch dangerous changes
29+
lint {
30+
destructive {
31+
error = true
32+
}
33+
data_depend {
34+
error = true
35+
}
36+
incompatible {
37+
error = true
38+
}
39+
}
40+
}
41+
42+
env "ci" {
43+
migration {
44+
dir = "file://services/operational-gateway/migrations"
45+
}
46+
47+
dev = "docker://postgres/16/dev"
48+
49+
src = data.external_schema.gorm.url
50+
51+
lint {
52+
destructive {
53+
error = true
54+
}
55+
data_depend {
56+
error = true
57+
}
58+
incompatible {
59+
error = true
60+
}
61+
}
62+
}
63+
64+
env "production" {
65+
// Production environment - apply only, never diff
66+
// URL points to service-specific database (meridian_operational_gateway)
67+
url = getenv("DATABASE_URL")
68+
69+
migration {
70+
dir = "file://services/operational-gateway/migrations"
71+
}
72+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
-- Operational Gateway Service Schema
2+
-- Manages outbound instructions to external providers and provider connection configurations.
3+
-- Uses unqualified table names (relies on database-per-service architecture).
4+
5+
-- provider_connections stores the configuration for connecting to external provider endpoints.
6+
-- Connections are reused across multiple instructions of matching instruction_types.
7+
CREATE TABLE provider_connections (
8+
tenant_id UUID NOT NULL,
9+
-- connection_id is a UUID identifier matching the proto field definition.
10+
connection_id UUID NOT NULL,
11+
provider_name VARCHAR(255) NOT NULL,
12+
provider_type VARCHAR(128) NOT NULL,
13+
-- protocol: HTTPS=1, GRPC=2, WEBHOOK=3, MQTT=4, AMQP=5
14+
protocol VARCHAR(20) NOT NULL CHECK (protocol IN ('HTTPS', 'GRPC', 'WEBHOOK', 'MQTT', 'AMQP')),
15+
base_url VARCHAR(2048) NOT NULL,
16+
-- auth_config stores the serialised authentication configuration (ApiKeyAuth, BasicAuth, OAuth2Auth, HMACAuth, MTLSAuth).
17+
-- Exactly one auth variant is populated; the auth_type discriminator field identifies which.
18+
auth_config JSONB NOT NULL,
19+
-- retry_policy stores the RetryPolicy fields: max_attempts, initial_backoff_seconds, max_backoff_seconds, backoff_multiplier.
20+
retry_policy JSONB NULL,
21+
-- rate_limit_config stores the RateLimit fields: requests_per_second, burst_size.
22+
rate_limit_config JSONB NULL,
23+
-- health_status: UNSPECIFIED=0, HEALTHY=1, DEGRADED=2, UNHEALTHY=3
24+
health_status VARCHAR(20) NOT NULL DEFAULT 'UNSPECIFIED' CHECK (health_status IN ('UNSPECIFIED', 'HEALTHY', 'DEGRADED', 'UNHEALTHY')),
25+
last_health_check_at TIMESTAMPTZ NULL,
26+
-- circuit_state tracks the circuit-breaker state for this connection.
27+
-- CLOSED (normal), OPEN (blocking dispatch), HALF_OPEN (probing recovery).
28+
circuit_state VARCHAR(20) NOT NULL DEFAULT 'CLOSED' CHECK (circuit_state IN ('CLOSED', 'OPEN', 'HALF_OPEN')),
29+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
30+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
31+
PRIMARY KEY (tenant_id, connection_id)
32+
);
33+
34+
-- Index for listing connections per tenant (common read path).
35+
CREATE INDEX idx_provider_connections_tenant ON provider_connections (tenant_id);
36+
37+
-- Index for health-based filtering (worker queries unhealthy connections for health checks).
38+
CREATE INDEX idx_provider_connections_health ON provider_connections (tenant_id, health_status);
39+
40+
-- instructions is the outbox table for outbound operational directives sent to external providers.
41+
-- Each instruction follows the outbox pattern: written atomically with the originating saga step,
42+
-- then picked up by the dispatch worker for async delivery.
43+
CREATE TABLE instructions (
44+
id UUID NOT NULL DEFAULT gen_random_uuid(),
45+
tenant_id UUID NOT NULL,
46+
-- instruction_type identifies the category of operation (e.g. "payment.initiate", "account.freeze").
47+
instruction_type VARCHAR(255) NOT NULL,
48+
-- provider_connection_id references the connection used to dispatch this instruction.
49+
-- FK enforced as composite (tenant_id, provider_connection_id) to prevent cross-tenant orphans.
50+
provider_connection_id UUID NOT NULL,
51+
-- correlation_id links all events across services for a single user request.
52+
correlation_id VARCHAR(255) NULL,
53+
-- causation_id identifies the event or command that caused this instruction to be created.
54+
causation_id VARCHAR(255) NULL,
55+
-- payload is the instruction-specific data serialised as JSON (google.protobuf.Struct).
56+
payload JSONB NOT NULL,
57+
-- metadata stores additional key-value pairs for routing, filtering, or audit purposes.
58+
metadata JSONB NULL,
59+
-- priority uses SMALLINT to allow correct numeric ordering in the dispatch index.
60+
-- LOW=1, NORMAL=2, HIGH=3, CRITICAL=4 (matching the Priority proto enum values).
61+
priority SMALLINT NOT NULL DEFAULT 2 CHECK (priority IN (1, 2, 3, 4)),
62+
-- status follows the InstructionStatus state machine defined in the proto.
63+
status VARCHAR(20) NOT NULL DEFAULT 'PENDING' CHECK (status IN ('PENDING', 'DISPATCHING', 'DELIVERED', 'ACKNOWLEDGED', 'RETRYING', 'FAILED', 'EXPIRED', 'CANCELLED')),
64+
scheduled_at TIMESTAMPTZ NULL,
65+
expires_at TIMESTAMPTZ NULL,
66+
attempt_count INTEGER NOT NULL DEFAULT 0 CHECK (attempt_count >= 0),
67+
max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts >= 1),
68+
next_retry_at TIMESTAMPTZ NULL,
69+
-- idempotency_key ensures exactly-once dispatch of each instruction.
70+
idempotency_key VARCHAR(255) NOT NULL,
71+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
72+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
73+
PRIMARY KEY (id),
74+
FOREIGN KEY (tenant_id, provider_connection_id) REFERENCES provider_connections (tenant_id, connection_id)
75+
);
76+
77+
-- Outbox polling index: dispatch worker polls for PENDING/RETRYING instructions ordered by priority DESC
78+
-- (CRITICAL=4 first) then by scheduled_at ASC. Partial index limits scan to actionable rows only.
79+
-- Using numeric priority (SMALLINT) ensures correct ordering: 4 > 3 > 2 > 1.
80+
CREATE INDEX idx_instructions_outbox ON instructions (priority DESC, scheduled_at ASC)
81+
WHERE status IN ('PENDING', 'RETRYING');
82+
83+
-- Tenant + type index: used by list and routing queries.
84+
CREATE INDEX idx_instructions_tenant_type ON instructions (tenant_id, instruction_type);
85+
86+
-- Correlation index: used for distributed tracing lookups.
87+
CREATE INDEX idx_instructions_correlation ON instructions (tenant_id, correlation_id)
88+
WHERE correlation_id IS NOT NULL;
89+
90+
-- Idempotency index: enforces exactly-once dispatch per tenant.
91+
CREATE UNIQUE INDEX idx_instructions_idempotency ON instructions (tenant_id, idempotency_key);
92+
93+
-- instruction_attempts records the outcome of each individual dispatch attempt.
94+
-- Stored separately to avoid unbounded row growth on the instructions table.
95+
CREATE TABLE instruction_attempts (
96+
id UUID NOT NULL DEFAULT gen_random_uuid(),
97+
-- instruction_id references the parent instruction.
98+
instruction_id UUID NOT NULL,
99+
-- attempt_number is the 1-based ordinal of this attempt.
100+
-- Uniqueness constraint prevents duplicate ordinals from corrupting retry/audit semantics.
101+
attempt_number INTEGER NOT NULL CHECK (attempt_number >= 1),
102+
dispatched_at TIMESTAMPTZ NOT NULL,
103+
completed_at TIMESTAMPTZ NULL,
104+
-- response_status_code is the HTTP or gRPC status code returned by the provider (0 if no response).
105+
response_status_code INTEGER NULL CHECK (response_status_code IS NULL OR (response_status_code >= 0 AND response_status_code <= 599)),
106+
-- response_body_preview is the first 1KB of the provider response body for diagnostics.
107+
response_body_preview VARCHAR(1024) NULL,
108+
-- error_message describes the error if this attempt failed.
109+
error_message TEXT NULL,
110+
-- duration_ms is how long the dispatch attempt took in milliseconds.
111+
duration_ms BIGINT NULL CHECK (duration_ms IS NULL OR duration_ms >= 0),
112+
PRIMARY KEY (id),
113+
FOREIGN KEY (instruction_id) REFERENCES instructions (id)
114+
);
115+
116+
-- Unique index on (instruction_id, attempt_number) ensures ordinals cannot be duplicated,
117+
-- while also serving as the primary lookup path for attempt retrieval.
118+
CREATE UNIQUE INDEX idx_instruction_attempts_instruction ON instruction_attempts (instruction_id, attempt_number);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
h1:vg1f6HFO/8K8367MMxUfk8zXiPMf/669AYQlGx+ahz8=
2+
20260301000001_operational_gateway_initial.sql h1:0k52bq50GAdIbOVr4HTX9BEzFkbbL/RA4EQGSI0HnnU=

0 commit comments

Comments
 (0)