Skip to content

Commit dabe6b8

Browse files
fuziontechclaude
andcommitted
fix(controlplane): default passthrough sessions into the tenant catalog
On the remote-worker (MTCP) path, passthrough sessions skipped the entire session-init block, leaving the worker's DuckDB connection in its empty in-memory catalog. As a result `current_database()` returned `memory` and any unqualified DDL/DML executed against the ephemeral in-memory catalog instead of the tenant's DuckLake warehouse — verified live (6/6 fresh passthrough connections reported `current_database()=memory`). Standalone passthrough already lands in the right place via server.setDuckLakeDefault / setIcebergDefault in CreatePassthroughDBConnection; the remote-worker path had no equivalent. Add a passthrough branch that detects the attached `ducklake` catalog and issues the matching `USE` (preferring an iceberg-default user's configured catalog), so passthrough sessions default into the tenant catalog just like standalone and non-passthrough sessions. This does NOT enable logical-catalog mapping for passthrough: those sessions still talk raw DuckDB against the physical catalog name (`ducklake`), so SetLogicalCatalogMapping stays disabled for them. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent b774ce6 commit dabe6b8

3 files changed

Lines changed: 75 additions & 4 deletions

File tree

controlplane/control.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,9 +1126,12 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
11261126
}
11271127
}()
11281128

1129-
// Passthrough users skip pg_catalog initialization and DuckLake catalog
1130-
// detection — they bypass the PG compatibility layer entirely, so the
1131-
// metadata setup that drives logical catalog mapping is unused for them.
1129+
// Passthrough users skip pg_catalog initialization and logical-catalog
1130+
// mapping — they bypass the PG compatibility layer entirely. They still
1131+
// need a default catalog, though: without one the worker session stays in
1132+
// DuckDB's empty in-memory catalog, so current_database() reports "memory"
1133+
// and unqualified DDL/DML never reaches the warehouse (see the passthrough
1134+
// branch below).
11321135
var duckLakeAttached bool
11331136
if !passthroughUser {
11341137
initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout)
@@ -1170,6 +1173,31 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
11701173
slog.Warn("Failed to apply client connect-time search_path; using default.", "user", username, "org", orgID, "search_path", clientSearchPath, "error", err)
11711174
}
11721175
}
1176+
} else {
1177+
// Passthrough: no pg_catalog views and no logical-catalog rewriting, but
1178+
// the session must still land in the tenant's catalog instead of the
1179+
// empty in-memory one. Standalone passthrough does this via
1180+
// server.setDuckLakeDefault/setIcebergDefault; the remote-worker path
1181+
// has to issue the equivalent explicitly here.
1182+
initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout)
1183+
duckLakeAttached, err = sessionmeta.HasAttachedCatalog(initCtx, executor, "ducklake")
1184+
if err != nil {
1185+
initCancel()
1186+
slog.Error("Failed to detect ducklake catalog attachment.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod)
1187+
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect ducklake catalog attachment")
1188+
_ = writer.Flush()
1189+
return
1190+
}
1191+
if cmd := passthroughSessionDefaultCatalogCommand(defaultCatalog, duckLakeAttached); cmd != "" {
1192+
if _, err := executor.ExecContext(initCtx, cmd); err != nil {
1193+
initCancel()
1194+
slog.Error("Failed to apply passthrough session default catalog.", "user", username, "org", orgID, "command", cmd, "error", err, "worker", workerID, "worker_pod", workerPod)
1195+
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to apply default catalog")
1196+
_ = writer.Flush()
1197+
return
1198+
}
1199+
}
1200+
initCancel()
11731201
}
11741202

11751203
// Register the TCP connection so OnWorkerCrash can close it to unblock
@@ -1183,7 +1211,11 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
11831211
server.SetConnectionIcebergConfig(cc, icebergCfg)
11841212
}
11851213
}
1186-
server.SetLogicalCatalogMapping(cc, duckLakeAttached)
1214+
// Logical-catalog mapping (current_database masking + USE/qualified-name
1215+
// rewriting) is a non-passthrough feature; passthrough sessions talk raw
1216+
// DuckDB against the physical catalog name, so keep it disabled for them
1217+
// even though duckLakeAttached is now populated on the passthrough path.
1218+
server.SetLogicalCatalogMapping(cc, duckLakeAttached && !passthroughUser)
11871219
server.SetPassthrough(cc, passthroughUser)
11881220
if orgID != "" {
11891221
observeOrgPgSessionAccepted(orgID, passthroughUser)

controlplane/session_search_path.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@ func effectiveSessionDefaultCommand(clientSearchPath, defaultCatalog string) (st
2525
}
2626
}
2727

28+
// passthroughSessionDefaultCatalogCommand returns the connect-time command that
29+
// points a passthrough session at the tenant's catalog. Passthrough users skip
30+
// InitSessionDatabaseMetadata (whose defer issues `USE ducklake` for the
31+
// standard path), so without this the session stays in DuckDB's empty in-memory
32+
// catalog — current_database() reports "memory" and unqualified DDL/DML never
33+
// reaches the warehouse. Mirrors server.setIcebergDefault / setDuckLakeDefault
34+
// used by the standalone passthrough path.
35+
func passthroughSessionDefaultCatalogCommand(defaultCatalog string, duckLakeAttached bool) string {
36+
switch {
37+
case defaultCatalog == iceberg.CatalogName:
38+
return fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema)
39+
case duckLakeAttached:
40+
return "USE ducklake"
41+
default:
42+
return ""
43+
}
44+
}
45+
2846
func ensureMemoryMainInSearchPath(searchPath string) string {
2947
if strings.Contains(strings.ToLower(searchPath), "memory.main") {
3048
return searchPath

controlplane/session_search_path_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,24 @@ func TestEffectiveSessionDefaultCommandReturnsEmptyWhenUnset(t *testing.T) {
3131
t.Fatalf("source = %q, want empty", source)
3232
}
3333
}
34+
35+
func TestPassthroughSessionDefaultCatalogCommand(t *testing.T) {
36+
tests := []struct {
37+
name string
38+
defaultCatalog string
39+
duckLakeAttached bool
40+
want string
41+
}{
42+
{name: "ducklake attached defaults to ducklake", defaultCatalog: "", duckLakeAttached: true, want: "USE ducklake"},
43+
{name: "iceberg-default user prefers iceberg over ducklake", defaultCatalog: "iceberg", duckLakeAttached: true, want: "USE iceberg.public"},
44+
{name: "iceberg-default user with no ducklake", defaultCatalog: "iceberg", duckLakeAttached: false, want: "USE iceberg.public"},
45+
{name: "no ducklake and no configured catalog leaves session as-is", defaultCatalog: "", duckLakeAttached: false, want: ""},
46+
}
47+
for _, tt := range tests {
48+
t.Run(tt.name, func(t *testing.T) {
49+
if got := passthroughSessionDefaultCatalogCommand(tt.defaultCatalog, tt.duckLakeAttached); got != tt.want {
50+
t.Fatalf("command = %q, want %q", got, tt.want)
51+
}
52+
})
53+
}
54+
}

0 commit comments

Comments
 (0)