Skip to content

Commit 0f02f8b

Browse files
committed
materialize-motherduck: use materialize-sql-v2
Updates the materialize-motherduck connector to use materialize-sql-v2. This is the first of many of these conversions, which are relatively simple. Starting with this one single connector makes sense right now since it gets enough usage to hopefully flush out any bugs before converting some of the more extensively used materializations.
1 parent 9a16d67 commit 0f02f8b

File tree

8 files changed

+83
-102
lines changed

8 files changed

+83
-102
lines changed

materialize-motherduck/.snapshots/TestSQLGeneration

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
--- Begin ""."a-schema".key_value createTargetTable ---
1+
--- Begin db."a-schema".key_value createTargetTable ---
22

3-
CREATE TABLE IF NOT EXISTS ""."a-schema".key_value (
3+
CREATE TABLE IF NOT EXISTS db."a-schema".key_value (
44
key1 BIGINT NOT NULL,
55
key2 BOOLEAN NOT NULL,
66
"key!binary" VARCHAR NOT NULL,
@@ -22,16 +22,16 @@ CREATE TABLE IF NOT EXISTS ""."a-schema".key_value (
2222
"stringNumber" DOUBLE,
2323
flow_document JSON NOT NULL
2424
);
25-
--- End ""."a-schema".key_value createTargetTable ---
25+
--- End db."a-schema".key_value createTargetTable ---
2626

27-
--- Begin ""."a-schema".delta_updates createTargetTable ---
27+
--- Begin db."a-schema".delta_updates createTargetTable ---
2828

29-
CREATE TABLE IF NOT EXISTS ""."a-schema".delta_updates (
29+
CREATE TABLE IF NOT EXISTS db."a-schema".delta_updates (
3030
"theKey" VARCHAR NOT NULL,
3131
"aValue" BIGINT,
3232
flow_published_at TIMESTAMP WITH TIME ZONE NOT NULL
3333
);
34-
--- End ""."a-schema".delta_updates createTargetTable ---
34+
--- End db."a-schema".delta_updates createTargetTable ---
3535

3636
--- Begin Fence Update ---
3737
UPDATE path."to".checkpoints
@@ -42,9 +42,9 @@ UPDATE path."to".checkpoints
4242
AND fence = 123;
4343
--- End Fence Update ---
4444

45-
--- Begin ""."a-schema".key_value loadQuery ---
45+
--- Begin db."a-schema".key_value loadQuery ---
4646
SELECT 0 AS binding, l.flow_document AS doc
47-
FROM ""."a-schema".key_value AS l
47+
FROM db."a-schema".key_value AS l
4848
JOIN read_json(
4949
['s3://bucket/file1', 's3://bucket/file2'],
5050
format='newline_delimited',
@@ -58,10 +58,10 @@ JOIN read_json(
5858
ON l.key1 = r.key1 AND l.key1 >= 10 AND l.key1 <= 100
5959
AND l.key2 = r.key2
6060
AND l."key!binary" = r."key!binary" AND l."key!binary" >= 'aGVsbG8K' AND l."key!binary" <= 'Z29vZGJ5ZQo='
61-
--- End ""."a-schema".key_value loadQuery ---
61+
--- End db."a-schema".key_value loadQuery ---
6262

63-
--- Begin ""."a-schema".key_value storeDeleteQuery ---
64-
DELETE FROM ""."a-schema".key_value AS l
63+
--- Begin db."a-schema".key_value storeDeleteQuery ---
64+
DELETE FROM db."a-schema".key_value AS l
6565
USING read_json(
6666
['s3://bucket/file1', 's3://bucket/file2'],
6767
format='newline_delimited',
@@ -92,10 +92,10 @@ USING read_json(
9292
WHERE l.key1 = r.key1 AND l.key1 >= 10 AND l.key1 <= 100
9393
AND l.key2 = r.key2
9494
AND l."key!binary" = r."key!binary" AND l."key!binary" >= 'aGVsbG8K' AND l."key!binary" <= 'Z29vZGJ5ZQo=';
95-
--- End ""."a-schema".key_value storeDeleteQuery ---
95+
--- End db."a-schema".key_value storeDeleteQuery ---
9696

97-
--- Begin ""."a-schema".key_value storeQuery ---
98-
INSERT INTO ""."a-schema".key_value BY NAME
97+
--- Begin db."a-schema".key_value storeQuery ---
98+
INSERT INTO db."a-schema".key_value BY NAME
9999
SELECT * FROM read_json(
100100
['s3://bucket/file1', 's3://bucket/file2'],
101101
format='newline_delimited',
@@ -123,10 +123,10 @@ SELECT * FROM read_json(
123123
flow_document: 'JSON NOT NULL'
124124
}
125125
) WHERE flow_document != '"delete"';
126-
--- End ""."a-schema".key_value storeQuery ---
126+
--- End db."a-schema".key_value storeQuery ---
127127

128-
--- Begin ""."a-schema".delta_updates storeQuery ---
129-
INSERT INTO ""."a-schema".delta_updates BY NAME
128+
--- Begin db."a-schema".delta_updates storeQuery ---
129+
INSERT INTO db."a-schema".delta_updates BY NAME
130130
SELECT * FROM read_json(
131131
['s3://bucket/file1', 's3://bucket/file2'],
132132
format='newline_delimited',
@@ -137,6 +137,6 @@ SELECT * FROM read_json(
137137
flow_published_at: 'TIMESTAMP WITH TIME ZONE NOT NULL'
138138
}
139139
);
140-
--- End ""."a-schema".delta_updates storeQuery ---
140+
--- End db."a-schema".delta_updates storeQuery ---
141141

142142

materialize-motherduck/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ COPY flow-bin/flowctl-go /usr/local/bin/flowctl-go
1717
COPY go ./go
1818
COPY materialize-boilerplate ./materialize-boilerplate
1919
COPY materialize-motherduck ./materialize-motherduck
20-
COPY materialize-sql ./materialize-sql
20+
COPY materialize-sql-v2 ./materialize-sql-v2
2121

2222
# Test and build the connector.
23-
RUN go test -tags nozstd -v ./materialize-sql/...
23+
RUN go test -tags nozstd -v ./materialize-sql-v2/...
2424
RUN go test -tags nozstd -v ./materialize-motherduck/...
2525
RUN go build -tags nozstd -v -o ./connector ./materialize-motherduck/...
2626

materialize-motherduck/client.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,32 @@ import (
1010
"github.com/estuary/connectors/go/blob"
1111
cerrors "github.com/estuary/connectors/go/connector-errors"
1212
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
13-
sql "github.com/estuary/connectors/materialize-sql"
13+
sql "github.com/estuary/connectors/materialize-sql-v2"
1414

1515
_ "github.com/marcboeker/go-duckdb/v2"
1616
)
1717

1818
var _ sql.SchemaManager = (*client)(nil)
1919

2020
type client struct {
21-
db *stdsql.DB
22-
cfg *config
23-
ep *sql.Endpoint
21+
db *stdsql.DB
22+
ep *sql.Endpoint[config]
2423
}
2524

26-
func newClient(ctx context.Context, ep *sql.Endpoint) (sql.Client, error) {
27-
cfg := ep.Config.(*config)
28-
29-
db, err := cfg.db(ctx)
25+
func newClient(ctx context.Context, ep *sql.Endpoint[config]) (sql.Client, error) {
26+
db, err := ep.Config.db(ctx)
3027
if err != nil {
3128
return nil, err
3229
}
3330

3431
return &client{
35-
db: db,
36-
cfg: cfg,
37-
ep: ep,
32+
db: db,
33+
ep: ep,
3834
}, nil
3935
}
4036

41-
func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boilerplate.InfoSchema, error) {
42-
return sql.StdFetchInfoSchema(ctx, c.db, c.ep.Dialect, c.cfg.Database, resourcePaths)
37+
func (c *client) PopulateInfoSchema(ctx context.Context, is *boilerplate.InfoSchema, resourcePaths [][]string) error {
38+
return sql.StdPopulateInfoSchema(ctx, is, c.db, c.ep.Dialect, c.ep.Config.Database, resourcePaths)
4339
}
4440

4541
func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error {
@@ -99,7 +95,7 @@ func (c *client) ListSchemas(ctx context.Context) ([]string, error) {
9995
// StdListSchemasFn won't work if there are schemas with the same name in other databases.
10096
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(
10197
"select schema_name from information_schema.schemata where catalog_name = %s",
102-
duckDialect.Literal(c.cfg.Database),
98+
duckDialect.Literal(c.ep.Config.Database),
10399
))
104100
if err != nil {
105101
return nil, fmt.Errorf("querying schemata: %w", err)
@@ -119,15 +115,13 @@ func (c *client) ListSchemas(ctx context.Context) ([]string, error) {
119115
return out, nil
120116
}
121117

122-
func (c *client) CreateSchema(ctx context.Context, schemaName string) error {
118+
func (c *client) CreateSchema(ctx context.Context, schemaName string) (string, error) {
123119
return sql.StdCreateSchema(ctx, c.db, duckDialect, schemaName)
124120
}
125121

126-
func preReqs(ctx context.Context, conf any, tenant string) *cerrors.PrereqErr {
122+
func preReqs(ctx context.Context, cfg config, tenant string) *cerrors.PrereqErr {
127123
errs := &cerrors.PrereqErr{}
128124

129-
cfg := conf.(*config)
130-
131125
db, err := cfg.db(ctx)
132126
if err != nil {
133127
errs.Err(err)

materialize-motherduck/config.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/estuary/connectors/go/blob"
1313
schemagen "github.com/estuary/connectors/go/schema-gen"
1414
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
15-
sql "github.com/estuary/connectors/materialize-sql"
1615
"github.com/invopop/jsonschema"
1716
"google.golang.org/api/option"
1817
)
@@ -34,7 +33,7 @@ type advancedConfig struct {
3433
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
3534
}
3635

37-
func (c *config) Validate() error {
36+
func (c config) Validate() error {
3837
var requiredProperties = [][]string{
3938
{"token", c.Token},
4039
{"database", c.Database},
@@ -107,6 +106,14 @@ func (c *config) Validate() error {
107106
return nil
108107
}
109108

109+
func (c config) DefaultNamespace() string {
110+
return c.Schema
111+
}
112+
113+
func (c config) FeatureFlags() (string, map[string]bool) {
114+
return c.Advanced.FeatureFlags, featureFlagDefaults
115+
}
116+
110117
type stagingBucketType string
111118

112119
const (
@@ -225,16 +232,6 @@ type tableConfig struct {
225232
database string
226233
}
227234

228-
func newTableConfig(ep *sql.Endpoint) sql.Resource {
229-
return &tableConfig{
230-
// Default to the endpoint schema. This will be over-written by a present `schema` property
231-
// within `raw`.
232-
Schema: ep.Config.(*config).Schema,
233-
database: ep.Config.(*config).Database,
234-
}
235-
}
236-
237-
// Validate the resource configuration.
238235
func (r tableConfig) Validate() error {
239236
if r.Table == "" {
240237
return fmt.Errorf("missing table")
@@ -243,10 +240,15 @@ func (r tableConfig) Validate() error {
243240
return nil
244241
}
245242

246-
func (c tableConfig) Path() sql.TablePath {
247-
return []string{c.database, c.Schema, c.Table}
243+
func (r tableConfig) WithDefaults(cfg config) tableConfig {
244+
if r.Schema == "" {
245+
r.Schema = cfg.Schema
246+
}
247+
r.database = cfg.Database
248+
249+
return r
248250
}
249251

250-
func (c tableConfig) DeltaUpdates() bool {
251-
return c.Delta
252+
func (r tableConfig) Parameters() ([]string, bool, error) {
253+
return []string{r.database, r.Schema, r.Table}, r.Delta, nil
252254
}

materialize-motherduck/driver.go

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ import (
99
"strings"
1010

1111
"github.com/estuary/connectors/go/blob"
12-
"github.com/estuary/connectors/go/common"
1312
m "github.com/estuary/connectors/go/protocols/materialize"
1413
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
1514
enc "github.com/estuary/connectors/materialize-boilerplate/stream-encode"
16-
sql "github.com/estuary/connectors/materialize-sql"
15+
sql "github.com/estuary/connectors/materialize-sql-v2"
1716
pf "github.com/estuary/flow/go/protocols/flow"
1817
pm "github.com/estuary/flow/go/protocols/materialize"
1918
"github.com/google/uuid"
@@ -25,46 +24,39 @@ import (
2524
_ "github.com/marcboeker/go-duckdb/v2"
2625
)
2726

28-
func newDuckDriver() *sql.Driver {
29-
return &sql.Driver{
27+
func newDuckDriver() *sql.Driver[config, tableConfig] {
28+
return &sql.Driver[config, tableConfig]{
3029
DocumentationURL: "https://go.estuary.dev/materialize-motherduck",
31-
EndpointSpecType: new(config),
32-
ResourceSpecType: new(tableConfig),
33-
StartTunnel: func(ctx context.Context, conf any) error { return nil },
34-
NewEndpoint: func(ctx context.Context, raw json.RawMessage, tenant string) (*sql.Endpoint, error) {
35-
var cfg = new(config)
36-
if err := pf.UnmarshalStrict(raw, cfg); err != nil {
37-
return nil, fmt.Errorf("could not parse endpoint configuration: %w", err)
38-
}
39-
30+
StartTunnel: func(ctx context.Context, cfg config) error { return nil },
31+
NewEndpoint: func(ctx context.Context, cfg config, tenant string, featureFlags map[string]bool) (*sql.Endpoint[config], error) {
4032
log.WithFields(log.Fields{
4133
"database": cfg.Database,
4234
}).Info("opening database")
4335

44-
var featureFlags = common.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
45-
if cfg.Advanced.FeatureFlags != "" {
46-
log.WithField("flags", featureFlags).Info("parsed feature flags")
47-
}
48-
49-
return &sql.Endpoint{
36+
return &sql.Endpoint[config]{
5037
Config: cfg,
5138
Dialect: duckDialect,
5239
MetaCheckpoints: sql.FlowCheckpointsTable([]string{cfg.Database, cfg.Schema}),
5340
NewClient: newClient,
5441
CreateTableTemplate: tplCreateTargetTable,
55-
NewResource: newTableConfig,
5642
NewTransactor: newTransactor,
5743
Tenant: tenant,
5844
ConcurrentApply: false,
59-
FeatureFlags: featureFlags,
45+
Options: boilerplate.MaterializeOptions{
46+
ExtendedLogging: true,
47+
AckSchedule: &boilerplate.AckScheduleOption{
48+
Config: cfg.Schedule,
49+
Jitter: []byte(cfg.Token),
50+
},
51+
},
6052
}, nil
6153
},
6254
PreReqs: preReqs,
6355
}
6456
}
6557

6658
type transactor struct {
67-
cfg *config
59+
cfg config
6860

6961
fence sql.Fence
7062
conn *stdsql.Conn
@@ -79,28 +71,28 @@ type transactor struct {
7971

8072
func newTransactor(
8173
ctx context.Context,
82-
ep *sql.Endpoint,
74+
ep *sql.Endpoint[config],
8375
fence sql.Fence,
8476
bindings []sql.Table,
8577
open pm.Request_Open,
8678
is *boilerplate.InfoSchema,
8779
be *boilerplate.BindingEvents,
88-
) (_ m.Transactor, _ *boilerplate.MaterializeOptions, err error) {
89-
cfg := ep.Config.(*config)
80+
) (_ m.Transactor, err error) {
81+
var cfg = ep.Config
9082

9183
db, err := cfg.db(ctx)
9284
if err != nil {
93-
return nil, nil, err
85+
return nil, err
9486
}
9587

9688
conn, err := db.Conn(ctx)
9789
if err != nil {
98-
return nil, nil, fmt.Errorf("creating connection: %w", err)
90+
return nil, fmt.Errorf("creating connection: %w", err)
9991
}
10092

10193
bucket, bucketPath, err := cfg.toBucketAndPath(ctx)
10294
if err != nil {
103-
return nil, nil, err
95+
return nil, err
10496
}
10597

10698
t := &transactor{
@@ -124,15 +116,7 @@ func newTransactor(
124116
})
125117
}
126118

127-
opts := &boilerplate.MaterializeOptions{
128-
ExtendedLogging: true,
129-
AckSchedule: &boilerplate.AckScheduleOption{
130-
Config: cfg.Schedule,
131-
Jitter: []byte(cfg.Token),
132-
},
133-
}
134-
135-
return t, opts, nil
119+
return t, nil
136120
}
137121

138122
type binding struct {

0 commit comments

Comments
 (0)