Skip to content

Commit 31519f4

Browse files
authored
test: Move event_outbox to tenant schema in party cascade test (#2140)
* test: Move event_outbox to tenant schema in party cascade test Create event_outbox in the tenant schema to match the production schema placement and the pattern in internal-account/service/outbox_test.go. Adds tenant_id column and uses qualified schema name. Set database-level search_path via ALTER DATABASE so all connection pool connections (including the outbox worker's) resolve unqualified table names to the tenant schema. A session-level SET search_path only affects a single connection, which caused the load test to receive 0 Kafka events under concurrent load. * test: Address review feedback on event_outbox tenant schema setup - Use pq.QuoteIdentifier instead of %q for SQL identifier quoting, consistent with the other CREATE TABLE statements in the same function - Remove &events.EventOutbox{} from SetupPostgres: the public-schema auto-migrate is superseded by the explicit tenant-schema CREATE TABLE below, eliminating the duplicate table --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 0e04217 commit 31519f4

1 file changed

Lines changed: 18 additions & 7 deletions

File tree

services/party/service/kafka_cascade_integration_test.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ func setupKafkaCascadeTest(t *testing.T) *kafkaCascadeTestEnv {
101101
// Create the party.controlled.v1 topic
102102
createKafkaTopic(t, broker, topics.PartyControlledV1)
103103

104-
// Setup CockroachDB with party + event_outbox tables
104+
// Setup CockroachDB with party and audit tables in the public schema.
105+
// event_outbox is created explicitly in the tenant schema below.
105106
db, dbCleanup := testdb.SetupPostgres(t, []interface{}{
106107
&persistence.PartyEntity{},
107108
&audit.AuditOutbox{},
108-
&events.EventOutbox{},
109109
})
110110

111111
tid := tenant.TenantID(testTenantID)
@@ -157,17 +157,28 @@ func setupKafkaCascadeTest(t *testing.T) *kafkaCascadeTestEnv {
157157
event_payload BYTEA NOT NULL,
158158
correlation_id VARCHAR(100),
159159
causation_id VARCHAR(100),
160-
status VARCHAR(20) NOT NULL DEFAULT 'pending',
160+
status VARCHAR(20) NOT NULL,
161161
topic VARCHAR(200) NOT NULL,
162162
partition_key VARCHAR(200),
163-
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
164-
processed_at TIMESTAMP WITH TIME ZONE,
163+
created_at TIMESTAMPTZ NOT NULL,
164+
processed_at TIMESTAMPTZ,
165165
retry_count INTEGER NOT NULL DEFAULT 0,
166166
last_error TEXT,
167-
service_name VARCHAR(100) NOT NULL
167+
service_name VARCHAR(100) NOT NULL,
168+
tenant_id VARCHAR(100) NOT NULL
168169
)`, pq.QuoteIdentifier(schemaName))).Error)
169170

170-
require.NoError(t, db.Exec(fmt.Sprintf("SET search_path TO %s", pq.QuoteIdentifier(schemaName))).Error)
171+
// Set search_path at the database level so all pool connections (including the outbox
172+
// worker's) resolve unqualified table names to the tenant schema. A session-level
173+
// SET search_path only affects a single pool connection; the load test needs all
174+
// connections to share the same schema routing.
175+
require.NoError(t, db.Exec(fmt.Sprintf("ALTER DATABASE test_db SET search_path TO %s, public", pq.QuoteIdentifier(schemaName))).Error)
176+
// Close idle connections so they reconnect and pick up the new database default.
177+
sqlDB, err := db.DB()
178+
require.NoError(t, err)
179+
sqlDB.SetMaxIdleConns(0) // closes idle connections immediately
180+
// Also set it on the current session for any in-flight use of this connection.
181+
require.NoError(t, db.Exec(fmt.Sprintf("SET search_path TO %s, public", pq.QuoteIdentifier(schemaName))).Error)
171182

172183
ctx := tenant.WithTenant(bgCtx, tid)
173184

0 commit comments

Comments
 (0)