Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8edaf37
Add DuckLake round-trip + iceberg integration tests
benben May 19, 2026
b2b1d13
Move kubeconfig safety check before destructive setupMultiTenant
benben May 19, 2026
d68732c
Make safety-guard messages spell out the danger and the right path
benben May 19, 2026
769568e
Fix CI bootstrap: defer mandatory kubeconfig load until after setup
benben May 19, 2026
fdf1381
Iceberg test fails openly instead of silently skipping
benben May 19, 2026
8a7fdc9
Wire OIDC + iceberg env vars on k8s-integration-tests job
benben May 19, 2026
c9f84ed
Point CI at renamed iceberg role ARN
benben May 19, 2026
b7fc3f5
Disable Delta catalog on iceberg-test tenant fixture
benben May 19, 2026
d0841e5
[debug] Probe iceberg REST endpoint from CI as OIDC role
benben May 19, 2026
28e1f37
Propagate optional session_token through static-secret S3 cred path
benben May 19, 2026
dbbfd38
Iceberg test: USE catalog.schema then unqualified CREATE TABLE
benben May 19, 2026
99b00f5
Iceberg test: probe-level coverage via external table
benben May 20, 2026
a32b61b
Iceberg test: drop probe-table dependency
benben May 20, 2026
8c201f6
Iceberg test: poll for catalog attach to absorb activation race
benben May 20, 2026
bb7db44
Iceberg test: dump control-plane + worker logs on activation failure
benben May 20, 2026
15a68d8
Iceberg test: expand activation diagnostics to include iceberg_* columns
benben May 20, 2026
ab46d8b
Iceberg test: seed iceberg_backend=s3_tables for activation
benben May 20, 2026
79616e1
Iceberg test: drop schema-columns dump from activation diagnostics
benben May 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,26 @@ jobs:
needs: unit-tests
runs-on: ubuntu-24.04-arm
timeout-minutes: 30
# id-token: write lets this job request a GitHub OIDC token, which
# aws-actions/configure-aws-credentials trades for STS-vended AWS
# credentials by assuming github-duckgres-iceberg-ci-testing-role in mw-dev.
# The role's trust policy is scoped to repo:PostHog/duckgres:*; its
# IAM policy is scoped to the iceberg test buckets only. Provisioned
# by PostHog/posthog-cloud-infra#8124.
permissions:
id-token: write
contents: read
env:
DUCKGRES_KIND_CLUSTER_NAME: duckgres
DUCKGRES_KIND_NODE_IMAGE: kindest/node:v1.31.0@sha256:53df588e04085fd41ae12de0c3fe4c72f7013bba32a20e7325357a1ac94ba865
# Iceberg integration test (tests/k8s/iceberg_test.go) fails openly
# when any of these is unset — see its godoc for the rationale. The
# AWS_* credentials are populated by configure-aws-credentials below
# via OIDC; the three iceberg-specific values are bucket coordinates
# provisioned in mw-dev.
DUCKGRES_K8S_ICEBERG_TABLE_BUCKET_ARN: arn:aws:s3tables:us-east-1:373313242555:bucket/posthog-duckgres-iceberg-test-mw-dev
DUCKGRES_K8S_ICEBERG_REGION: us-east-1
DUCKGRES_K8S_ICEBERG_DATA_BUCKET: posthog-duckgres-iceberg-test-data-mw-dev

services:
postgres:
Expand Down Expand Up @@ -301,5 +318,16 @@ jobs:
sleep $((attempt * 5))
done
exit 1
- name: Configure AWS credentials via OIDC
# Trades the GitHub-issued OIDC token for STS credentials by
# assuming github-duckgres-iceberg-ci-testing-role in mw-dev. Exposes
# AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / AWS_SESSION_TOKEN
# in the job env, which iceberg_test.go reads via os.Getenv.
# Pinned to the same commit cloud-infra workflows use.
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708 # v4.0.2
with:
aws-region: us-east-1
role-to-assume: arn:aws:iam::373313242555:role/github-duckgres-iceberg-ci-testing-role
role-duration-seconds: 3600
- name: Run Kubernetes integration tests
run: just test-k8s-integration
21 changes: 15 additions & 6 deletions controlplane/shared_worker_activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,21 @@ func (a *SharedWorkerActivator) buildDuckLakeConfigFromConfigStore(ctx context.C

switch {
case warehouse.S3Credentials.Name != "":
accessKey, secretKey, err := a.readS3Credentials(ctx, warehouse.S3Credentials)
accessKey, secretKey, sessionToken, err := a.readS3Credentials(ctx, warehouse.S3Credentials)
if err != nil {
return server.DuckLakeConfig{}, fmt.Errorf("s3 credentials: %w", err)
}
dl.S3Provider = "config"
dl.S3AccessKey = accessKey
dl.S3SecretKey = secretKey
// session_token is optional in the secret payload — long-term IAM
// user keys don't have one. STS-vended temporary credentials
// (AccessKeyId starting with ASIA…) require it: AWS rejects the
// signing identity without the token and the iceberg REST endpoint
// returns 403. Letting the field through lets sandbox/CI fixtures
// that source creds from STS use the same secret-ref schema as
// production's long-term keys.
dl.S3SessionToken = sessionToken
case strings.EqualFold(warehouse.S3.Provider, "aws"):
roleARN := warehouse.WorkerIdentity.IAMRoleARN
if roleARN == "" {
Expand Down Expand Up @@ -586,23 +594,24 @@ func (a *SharedWorkerActivator) readSecretValue(ctx context.Context, ref configs
return string(raw), nil
}

func (a *SharedWorkerActivator) readS3Credentials(ctx context.Context, ref configstore.SecretRef) (string, string, error) {
func (a *SharedWorkerActivator) readS3Credentials(ctx context.Context, ref configstore.SecretRef) (string, string, string, error) {
value, err := a.readSecretValue(ctx, ref)
if err != nil {
return "", "", err
return "", "", "", err
}

var payload struct {
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
SessionToken string `json:"session_token"`
}
if err := json.Unmarshal([]byte(value), &payload); err != nil {
return "", "", fmt.Errorf("parse s3 credential payload: %w", err)
return "", "", "", fmt.Errorf("parse s3 credential payload: %w", err)
}
if payload.AccessKeyID == "" || payload.SecretAccessKey == "" {
return "", "", fmt.Errorf("s3 credential payload requires access_key_id and secret_access_key")
return "", "", "", fmt.Errorf("s3 credential payload requires access_key_id and secret_access_key")
}
return payload.AccessKeyID, payload.SecretAccessKey, nil
return payload.AccessKeyID, payload.SecretAccessKey, payload.SessionToken, nil
}

func buildDuckLakeMetadataStoreDSN(host string, port int, username, password, database string) string {
Expand Down
185 changes: 185 additions & 0 deletions duckdbservice/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,188 @@ func TestConcurrentActivateTenantSamePayloadBothSucceed(t *testing.T) {
t.Errorf("workerID = %d, want %d", pool.workerID, payload.WorkerID)
}
}

// TestReuseExistingActivationRefreshesIcebergAlongsideS3 is the regression
// net for PR #563 (commit 12a9304): on hot-idle reclaim with rotated STS
// credentials, the iceberg_sigv4 secret must be refreshed alongside the
// DuckLake S3 secret. Before the fix, only refreshS3Secret was invoked,
// so long-lived workers' iceberg queries 403'd after STS expiry while
// DuckLake queries kept working — a class of bug that's invisible to
// tenants without iceberg enabled and silent for hours on tenants that
// have it but query it rarely.
//
// We assert two things the original fix actually changed: (1) the iceberg
// refresh function runs at all when Iceberg.Enabled=true on the payload,
// and (2) it receives the NEW credentials from the rotated payload, not
// the stale ones from the existing activation. Either failing reproduces
// the production bug.
func TestReuseExistingActivationRefreshesIcebergAlongsideS3(t *testing.T) {
mainDB, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open main duckdb: %v", err)
}
defer func() { _ = mainDB.Close() }()
controlDB, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open control duckdb: %v", err)
}
defer func() { _ = controlDB.Close() }()

pool := &SessionPool{
sessions: make(map[string]*Session),
stopRefresh: make(map[string]func()),
duckLakeSem: make(chan struct{}, 1),
warmupDone: make(chan struct{}),
sharedWarmMode: true,
warmupDB: mainDB,
controlDB: controlDB,
activation: &activatedTenantRuntime{
payload: ActivationPayload{
WorkerControlMetadata: server.WorkerControlMetadata{OwnerEpoch: 1},
OrgID: "analytics",
DuckLake: server.DuckLakeConfig{
MetadataStore: "postgres:host=meta port=5432 user=u password=p dbname=d",
ObjectStore: "s3://analytics/",
S3AccessKey: "OLD_AK",
S3SecretKey: "OLD_SK",
S3SessionToken: "OLD_TOK",
},
Iceberg: server.IcebergConfig{
Enabled: true,
TableBucket: "arn:aws:s3tables:us-east-1:000000000000:bucket/analytics",
Region: "us-east-1",
Namespace: "main",
},
},
db: mainDB,
},
ownerEpoch: 1,
}
close(pool.warmupDone)

var s3Calls int
var icebergCalls int
var icebergKeyID, icebergSecret, icebergToken string
var icebergCfg server.IcebergConfig
pool.refreshS3Secret = func(db *sql.DB, dlCfg server.DuckLakeConfig, sem chan struct{}) error {
s3Calls++
return nil
}
pool.refreshIcebergSecret = func(db *sql.DB, ic server.IcebergConfig, sem chan struct{}, keyID, secret, sessionToken string) error {
icebergCalls++
icebergCfg = ic
icebergKeyID = keyID
icebergSecret = secret
icebergToken = sessionToken
return nil
}

newPayload := ActivationPayload{
WorkerControlMetadata: server.WorkerControlMetadata{OwnerEpoch: 2},
OrgID: "analytics",
DuckLake: server.DuckLakeConfig{
MetadataStore: "postgres:host=meta port=5432 user=u password=p dbname=d",
ObjectStore: "s3://analytics/",
S3AccessKey: "NEW_AK",
S3SecretKey: "NEW_SK",
S3SessionToken: "NEW_TOK",
},
Iceberg: server.IcebergConfig{
Enabled: true,
TableBucket: "arn:aws:s3tables:us-east-1:000000000000:bucket/analytics",
Region: "us-east-1",
Namespace: "main",
},
}

if !pool.reuseExistingActivation(newPayload) {
t.Fatal("reuseExistingActivation returned false; expected hot-idle reclaim to succeed")
}

if s3Calls != 1 {
t.Errorf("refreshS3Secret called %d times, want 1", s3Calls)
}
if icebergCalls != 1 {
t.Fatalf("refreshIcebergSecret called %d times, want 1 — iceberg secret was NOT rotated alongside DuckLake (PR #563 regression)", icebergCalls)
}
if icebergKeyID != "NEW_AK" || icebergSecret != "NEW_SK" || icebergToken != "NEW_TOK" {
t.Errorf("iceberg refresh got stale credentials: keyID=%q secret=%q token=%q, want NEW_AK/NEW_SK/NEW_TOK",
icebergKeyID, icebergSecret, icebergToken)
}
if icebergCfg.TableBucket != newPayload.Iceberg.TableBucket || icebergCfg.Region != newPayload.Iceberg.Region {
t.Errorf("iceberg refresh got wrong config: %+v, want %+v", icebergCfg, newPayload.Iceberg)
}
}

// TestReuseExistingActivationSkipsIcebergRefreshWhenDisabled guards the
// inverse: tenants that haven't opted into iceberg must not have the
// iceberg refresh fn invoked at all on hot-idle reclaim. A regression
// that unconditionally fires the refresh would run a CREATE OR REPLACE
// SECRET against DuckDB for every reclaim, which is wasteful and — more
// importantly — would mask the real bug if the iceberg secret SQL form
// breaks for the disabled case (e.g. empty ARN/empty creds).
func TestReuseExistingActivationSkipsIcebergRefreshWhenDisabled(t *testing.T) {
mainDB, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open main duckdb: %v", err)
}
defer func() { _ = mainDB.Close() }()
controlDB, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open control duckdb: %v", err)
}
defer func() { _ = controlDB.Close() }()

pool := &SessionPool{
sessions: make(map[string]*Session),
stopRefresh: make(map[string]func()),
duckLakeSem: make(chan struct{}, 1),
warmupDone: make(chan struct{}),
sharedWarmMode: true,
warmupDB: mainDB,
controlDB: controlDB,
activation: &activatedTenantRuntime{
payload: ActivationPayload{
WorkerControlMetadata: server.WorkerControlMetadata{OwnerEpoch: 1},
OrgID: "billing",
DuckLake: server.DuckLakeConfig{
MetadataStore: "postgres:host=meta port=5432 user=u password=p dbname=d",
ObjectStore: "s3://billing/",
S3AccessKey: "OLD_AK",
S3SecretKey: "OLD_SK",
},
// Iceberg disabled — typical for tenants on DuckLake-only.
},
db: mainDB,
},
ownerEpoch: 1,
}
close(pool.warmupDone)

var icebergCalls int
pool.refreshS3Secret = func(db *sql.DB, dlCfg server.DuckLakeConfig, sem chan struct{}) error {
return nil
}
pool.refreshIcebergSecret = func(db *sql.DB, ic server.IcebergConfig, sem chan struct{}, keyID, secret, sessionToken string) error {
icebergCalls++
return nil
}

newPayload := ActivationPayload{
WorkerControlMetadata: server.WorkerControlMetadata{OwnerEpoch: 2},
OrgID: "billing",
DuckLake: server.DuckLakeConfig{
MetadataStore: "postgres:host=meta port=5432 user=u password=p dbname=d",
ObjectStore: "s3://billing/",
S3AccessKey: "NEW_AK",
S3SecretKey: "NEW_SK",
},
}

if !pool.reuseExistingActivation(newPayload) {
t.Fatal("reuseExistingActivation returned false; expected hot-idle reclaim to succeed")
}
if icebergCalls != 0 {
t.Errorf("refreshIcebergSecret called %d times for iceberg-disabled tenant, want 0", icebergCalls)
}
}
Loading
Loading