Skip to content

Commit 18e32c8

Browse files
authored
Support Iceberg DROP SCHEMA CASCADE fallback (#638)
* Support Iceberg drop schema cascade fallback * Address Iceberg drop schema fallback review * Fix Iceberg drop schema lint errors
1 parent 5b8f8a3 commit 18e32c8

8 files changed

Lines changed: 669 additions & 11 deletions

File tree

server/conn.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,17 +1448,9 @@ func (c *clientConn) handleQuery(body []byte) error {
14481448
runExec := func() (ExecResult, error) {
14491449
execResult, err := c.executor.ExecContext(ctx, query)
14501450
if err != nil {
1451-
// Retry ALTER TABLE as ALTER VIEW if target is a view
1452-
if isAlterTableNotTableError(err) {
1453-
if alteredQuery, ok := transpiler.ConvertAlterTableToAlterView(query); ok {
1454-
return c.executor.ExecContext(ctx, alteredQuery)
1455-
}
1456-
}
1457-
// Retry DROP TABLE as DROP VIEW if target is a view
1458-
if isDropTableOnViewError(err) {
1459-
if alteredQuery, ok := transpiler.ConvertDropTableToDropView(query); ok {
1460-
return c.executor.ExecContext(ctx, alteredQuery)
1461-
}
1451+
fallbackResult, handled, fallbackErr := c.execCompatibilityFallback(ctx, query, err)
1452+
if handled {
1453+
return fallbackResult, fallbackErr
14621454
}
14631455
}
14641456
return execResult, err

server/exec_fallback.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/posthog/duckgres/transpiler"
8+
)
9+
10+
func (c *clientConn) execCompatibilityFallback(ctx context.Context, query string, execErr error) (ExecResult, bool, error) {
11+
if isAlterTableNotTableError(execErr) {
12+
if alteredQuery, ok := transpiler.ConvertAlterTableToAlterView(query); ok {
13+
result, err := c.executor.ExecContext(ctx, alteredQuery)
14+
return result, true, err
15+
}
16+
}
17+
18+
if isDropTableOnViewError(execErr) {
19+
if alteredQuery, ok := transpiler.ConvertDropTableToDropView(query); ok {
20+
result, err := c.executor.ExecContext(ctx, alteredQuery)
21+
return result, true, err
22+
}
23+
}
24+
25+
if isIcebergDropSchemaCascadeUnsupported(execErr) {
26+
result, err := c.dropIcebergSchemaCascade(ctx, query)
27+
if err != nil {
28+
return nil, true, fmt.Errorf("iceberg DROP SCHEMA CASCADE fallback failed: %w", err)
29+
}
30+
return result, true, nil
31+
}
32+
33+
return nil, false, nil
34+
}

server/exec_fallback_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func TestExecCompatibilityFallbackReturnsIcebergFallbackError(t *testing.T) {
11+
execErr := errors.New("Not implemented Error: DROP SCHEMA <schema_name> CASCADE is not supported for Iceberg schemas currently")
12+
c := &clientConn{executor: &failingFallbackExecutor{}}
13+
14+
_, handled, err := c.execCompatibilityFallback(context.Background(), "DROP SCHEMA IF EXISTS stripe CASCADE", execErr)
15+
if !handled {
16+
t.Fatal("expected Iceberg fallback to handle unsupported DROP SCHEMA CASCADE error")
17+
}
18+
if err == nil || !strings.Contains(err.Error(), "iceberg DROP SCHEMA CASCADE fallback failed") {
19+
t.Fatalf("fallback error = %v, want contextual fallback failure", err)
20+
}
21+
}
22+
23+
type failingFallbackExecutor struct {
24+
noopProfiling
25+
}
26+
27+
func (e *failingFallbackExecutor) QueryContext(context.Context, string, ...any) (RowSet, error) {
28+
return nil, errors.New("settings query failed")
29+
}
30+
31+
func (e *failingFallbackExecutor) ExecContext(context.Context, string, ...any) (ExecResult, error) {
32+
return nil, errors.New("unexpected exec")
33+
}
34+
35+
func (e *failingFallbackExecutor) Query(string, ...any) (RowSet, error) {
36+
return nil, errors.New("not implemented")
37+
}
38+
39+
func (e *failingFallbackExecutor) Exec(string, ...any) (ExecResult, error) {
40+
return nil, errors.New("not implemented")
41+
}
42+
43+
func (e *failingFallbackExecutor) ConnContext(context.Context) (RawConn, error) {
44+
return nil, errors.New("not implemented")
45+
}
46+
47+
func (e *failingFallbackExecutor) PingContext(context.Context) error {
48+
return errors.New("not implemented")
49+
}
50+
51+
func (e *failingFallbackExecutor) Close() error {
52+
return nil
53+
}

server/iceberg_drop_schema.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/posthog/duckgres/server/iceberg"
9+
"github.com/posthog/duckgres/server/sqlcore"
10+
)
11+
12+
type dropSchemaCascadeTarget struct {
13+
Catalog string
14+
Schema string
15+
}
16+
17+
func parseDropSchemaCascadeTarget(query string) (dropSchemaCascadeTarget, bool) {
18+
s := strings.TrimSpace(sqlcore.StripLeadingComments(query))
19+
s = strings.TrimSpace(strings.TrimSuffix(s, ";"))
20+
21+
pos, ok := consumeSQLKeyword(s, 0, "DROP")
22+
if !ok {
23+
return dropSchemaCascadeTarget{}, false
24+
}
25+
pos, ok = consumeSQLKeyword(s, pos, "SCHEMA")
26+
if !ok {
27+
return dropSchemaCascadeTarget{}, false
28+
}
29+
30+
target := dropSchemaCascadeTarget{}
31+
if next, ok := consumeSQLKeyword(s, pos, "IF"); ok {
32+
next, ok = consumeSQLKeyword(s, next, "EXISTS")
33+
if !ok {
34+
return dropSchemaCascadeTarget{}, false
35+
}
36+
pos = next
37+
}
38+
39+
targetSQL, ok := trimTrailingSQLKeyword(s[pos:], "CASCADE")
40+
if !ok {
41+
return dropSchemaCascadeTarget{}, false
42+
}
43+
parts, ok := sqlcore.ParseQualifiedIdentifier(targetSQL)
44+
if !ok || len(parts) == 0 || len(parts) > 2 {
45+
return dropSchemaCascadeTarget{}, false
46+
}
47+
if len(parts) == 1 {
48+
target.Schema = parts[0]
49+
} else {
50+
target.Catalog = parts[0]
51+
target.Schema = parts[1]
52+
}
53+
if target.Schema == "" || target.Catalog == "" && len(parts) == 2 {
54+
return dropSchemaCascadeTarget{}, false
55+
}
56+
return target, true
57+
}
58+
59+
func consumeSQLKeyword(s string, pos int, keyword string) (int, bool) {
60+
pos = skipSQLSpace(s, pos)
61+
end := pos + len(keyword)
62+
if end > len(s) || !strings.EqualFold(s[pos:end], keyword) {
63+
return pos, false
64+
}
65+
if end < len(s) && isSQLIdentChar(s[end]) {
66+
return pos, false
67+
}
68+
return end, true
69+
}
70+
71+
func trimTrailingSQLKeyword(s string, keyword string) (string, bool) {
72+
s = strings.TrimSpace(s)
73+
end := len(s) - len(keyword)
74+
if end < 0 || !strings.EqualFold(s[end:], keyword) {
75+
return "", false
76+
}
77+
if end > 0 && !isSQLSpace(s[end-1]) {
78+
return "", false
79+
}
80+
target := strings.TrimSpace(s[:end])
81+
return target, target != ""
82+
}
83+
84+
func skipSQLSpace(s string, pos int) int {
85+
for pos < len(s) && isSQLSpace(s[pos]) {
86+
pos++
87+
}
88+
return pos
89+
}
90+
91+
func isSQLSpace(ch byte) bool {
92+
switch ch {
93+
case ' ', '\t', '\n', '\r', '\f':
94+
return true
95+
default:
96+
return false
97+
}
98+
}
99+
100+
func isSQLIdentChar(ch byte) bool {
101+
return ch == '_' || ch >= '0' && ch <= '9' || ch >= 'A' && ch <= 'Z' || ch >= 'a' && ch <= 'z'
102+
}
103+
104+
func isIcebergDropSchemaCascadeUnsupported(err error) bool {
105+
if err == nil {
106+
return false
107+
}
108+
return strings.Contains(err.Error(), "DROP SCHEMA <schema_name> CASCADE is not supported for Iceberg schemas currently")
109+
}
110+
111+
func (c *clientConn) dropIcebergSchemaCascade(ctx context.Context, query string) (ExecResult, error) {
112+
target, ok := parseDropSchemaCascadeTarget(query)
113+
if !ok {
114+
return nil, fmt.Errorf("not a DROP SCHEMA CASCADE statement")
115+
}
116+
117+
catalog := target.Catalog
118+
if catalog == "" {
119+
defaultCatalog, err := c.currentSearchPathCatalog(ctx)
120+
if err != nil {
121+
return nil, err
122+
}
123+
catalog = defaultCatalog
124+
}
125+
if !strings.EqualFold(catalog, iceberg.CatalogName) {
126+
return nil, fmt.Errorf("DROP SCHEMA CASCADE fallback only supports iceberg catalog, got %q", catalog)
127+
}
128+
129+
rows, err := c.executor.QueryContext(ctx, `
130+
SELECT table_name
131+
FROM information_schema.tables
132+
WHERE table_catalog = 'iceberg'
133+
AND table_schema = ?
134+
ORDER BY table_name
135+
`, target.Schema)
136+
if err != nil {
137+
return nil, fmt.Errorf("list iceberg schema tables: %w", err)
138+
}
139+
defer func() { _ = rows.Close() }()
140+
141+
var tables []string
142+
for rows.Next() {
143+
var table string
144+
if err := rows.Scan(&table); err != nil {
145+
return nil, fmt.Errorf("scan iceberg schema table: %w", err)
146+
}
147+
tables = append(tables, table)
148+
}
149+
if err := rows.Err(); err != nil {
150+
return nil, fmt.Errorf("list iceberg schema tables: %w", err)
151+
}
152+
153+
for _, table := range tables {
154+
dropTable := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s.%s",
155+
sqlcore.QuoteIdentifier(iceberg.CatalogName),
156+
sqlcore.QuoteIdentifier(target.Schema),
157+
sqlcore.QuoteIdentifier(table),
158+
)
159+
if _, err := c.executor.ExecContext(ctx, dropTable); err != nil {
160+
return nil, fmt.Errorf("drop iceberg table %s.%s: %w", target.Schema, table, err)
161+
}
162+
}
163+
164+
dropSchema := fmt.Sprintf("DROP SCHEMA IF EXISTS %s.%s",
165+
sqlcore.QuoteIdentifier(iceberg.CatalogName),
166+
sqlcore.QuoteIdentifier(target.Schema),
167+
)
168+
return c.executor.ExecContext(ctx, dropSchema)
169+
}
170+
171+
func (c *clientConn) currentSearchPathCatalog(ctx context.Context) (string, error) {
172+
rows, err := c.executor.QueryContext(ctx, `
173+
SELECT value
174+
FROM duckdb_settings()
175+
WHERE name = 'search_path'
176+
`)
177+
if err != nil {
178+
return "", fmt.Errorf("read search_path: %w", err)
179+
}
180+
defer func() { _ = rows.Close() }()
181+
if !rows.Next() {
182+
return "", rows.Err()
183+
}
184+
var searchPath string
185+
if err := rows.Scan(&searchPath); err != nil {
186+
return "", fmt.Errorf("scan search_path: %w", err)
187+
}
188+
if err := rows.Err(); err != nil {
189+
return "", err
190+
}
191+
return sqlcore.CatalogFromSearchPath(searchPath), nil
192+
}

0 commit comments

Comments
 (0)