Skip to content

Commit 184c31b

Browse files
committed
filesink: support castToString field config for parquet
Adds support for the `castToString` field configuration option in `filesink`, allowing fields to be cast to string format when writing to parquet files. The current motivator is that sometimes people want to read UUID values as strings rather than the UUID logical type due to downstream compatibility issues. It didn't make much sense to me to add this to the CSV writer also, since CSV files are always strings for everything.
1 parent d3c6dee commit 184c31b

File tree

9 files changed

+47
-36
lines changed

9 files changed

+47
-36
lines changed

filesink/filesink.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,21 @@ func (r resource) Validate() error {
5555
return nil
5656
}
5757

58+
type fieldConfig struct {
59+
CastToString bool `json:"castToString"`
60+
}
61+
62+
func (fc fieldConfig) Validate() error {
63+
return nil
64+
}
65+
5866
var _ boilerplate.Connector = &FileDriver{}
5967

6068
// FileDriver contains the behaviors particular to a destination system and file format.
6169
type FileDriver struct {
6270
NewConfig func(raw json.RawMessage) (Config, error)
6371
NewStore func(ctx context.Context, config Config) (Store, error)
64-
NewEncoder func(config Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) StreamEncoder
72+
NewEncoder func(config Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (StreamEncoder, error)
6573
NewConstraints func(p *pf.Projection) *pm.Response_Validated_Constraint
6674
DocumentationURL func() string
6775
ConfigSchema func() ([]byte, error)
@@ -131,8 +139,6 @@ func (d FileDriver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *
131139
bindings := make([]binding, 0, len(open.Materialization.Bindings))
132140

133141
for _, b := range open.Materialization.Bindings {
134-
b := b // for the newEncoder closure
135-
136142
var res resource
137143
if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil {
138144
return nil, nil, nil, err
@@ -143,7 +149,7 @@ func (d FileDriver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *
143149
backfill: b.Backfill,
144150
path: res.Path,
145151
includeDoc: b.FieldSelection.Document != "",
146-
newEncoder: func(w io.WriteCloser) StreamEncoder {
152+
newEncoder: func(w io.WriteCloser) (StreamEncoder, error) {
147153
// Partial application of the driverCfg and b arguments to the FileDriver's NewEncoder
148154
// function, for convenience.
149155
return d.NewEncoder(driverCfg, b, w)
@@ -184,7 +190,7 @@ type binding struct {
184190
backfill uint32
185191
path string
186192
includeDoc bool
187-
newEncoder func(w io.WriteCloser) StreamEncoder
193+
newEncoder func(w io.WriteCloser) (StreamEncoder, error)
188194
}
189195

190196
// File keys are the full "path" to a file, usually applied as a key for an object in an object
@@ -244,7 +250,7 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
244250
var encoder StreamEncoder
245251
var group errgroup.Group
246252

247-
startFile := func(b binding) {
253+
startFile := func(b binding) error {
248254
// Start a new file upload. This may be called multiple times in a single transaction if
249255
// there is more data than can fit in a single file.
250256
r, w := io.Pipe()
@@ -261,7 +267,9 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
261267
return nil
262268
})
263269

264-
encoder = b.newEncoder(w)
270+
var err error
271+
encoder, err = b.newEncoder(w)
272+
return err
265273
}
266274

267275
finishFile := func() error {
@@ -290,7 +298,9 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
290298
lastBinding = it.Binding
291299

292300
if encoder == nil {
293-
startFile(b)
301+
if err := startFile(b); err != nil {
302+
return nil, fmt.Errorf("startFile for binding %d: %w", it.Binding, err)
303+
}
294304
}
295305

296306
row := make([]any, 0, len(it.Key)+len(it.Values)+1)

filesink/format.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,20 @@ func (c ParquetConfig) Validate() error {
2525
return nil
2626
}
2727

28-
func NewParquetStreamEncoder(cfg ParquetConfig, b *pf.MaterializationSpec_Binding, w io.WriteCloser) StreamEncoder {
29-
sch := enc.FieldsToParquetSchema(b.FieldSelection.AllFields(), b.Collection)
28+
func NewParquetStreamEncoder(cfg ParquetConfig, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (StreamEncoder, error) {
29+
sch := make(enc.ParquetSchema, 0, len(b.FieldSelection.AllFields()))
30+
for _, f := range b.FieldSelection.AllFields() {
31+
p := b.Collection.GetProjection(f)
32+
33+
var fc fieldConfig
34+
if raw, ok := b.FieldSelection.FieldConfigJsonMap[f]; ok {
35+
if err := pf.UnmarshalStrict(raw, &fc); err != nil {
36+
return nil, fmt.Errorf("unmarshaling field config for field %q: %w", f, err)
37+
}
38+
}
39+
40+
sch = append(sch, enc.ProjectionToParquetSchemaElement(*p, fc.CastToString))
41+
}
3042

3143
var opts []enc.ParquetOption
3244

@@ -41,7 +53,7 @@ func NewParquetStreamEncoder(cfg ParquetConfig, b *pf.MaterializationSpec_Bindin
4153
// compression.
4254
opts = append(opts, enc.WithParquetCompression(enc.Snappy))
4355

44-
return enc.NewParquetEncoder(w, sch, opts...)
56+
return enc.NewParquetEncoder(w, sch, opts...), nil
4557
}
4658

4759
type CsvConfig struct {

materialize-azure-blob-parquet/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var driver = filesink.FileDriver{
4747
NewStore: func(ctx context.Context, c filesink.Config) (filesink.Store, error) {
4848
return filesink.NewAzureBlob(ctx, c.(config).AzureBlobConfig)
4949
},
50-
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) filesink.StreamEncoder {
50+
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (filesink.StreamEncoder, error) {
5151
return filesink.NewParquetStreamEncoder(c.(config).ParquetConfig, b, w)
5252
},
5353
NewConstraints: func(p *pf.Projection) *materialize.Response_Validated_Constraint {

materialize-boilerplate/stream-encode/parquet_schema.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -217,18 +217,7 @@ func WithParquetUUIDAsString() ParquetSchemaOption {
217217
}
218218
}
219219

220-
func FieldsToParquetSchema(fields []string, collection pf.CollectionSpec, opts ...ParquetSchemaOption) ParquetSchema {
221-
out := make(ParquetSchema, 0, len(fields))
222-
223-
for _, f := range fields {
224-
p := collection.GetProjection(f)
225-
out = append(out, ProjectionToParquetSchemaElement(*p, opts...))
226-
}
227-
228-
return out
229-
}
230-
231-
func ProjectionToParquetSchemaElement(p pf.Projection, opts ...ParquetSchemaOption) ParquetSchemaElement {
220+
func ProjectionToParquetSchemaElement(p pf.Projection, castToString bool, opts ...ParquetSchemaOption) ParquetSchemaElement {
232221
cfg := parquetSchemaConfig{}
233222
for _, o := range opts {
234223
o(&cfg)
@@ -239,24 +228,24 @@ func ProjectionToParquetSchemaElement(p pf.Projection, opts ...ParquetSchemaOpti
239228
Required: !slices.Contains(p.Inference.Types, "null") && (p.Inference.Exists == pf.Inference_MUST || p.Inference.DefaultJson != nil),
240229
}
241230

231+
if castToString {
232+
out.DataType = LogicalTypeString
233+
return out
234+
}
235+
242236
if numFormat, ok := boilerplate.AsFormattedNumeric(&p); ok {
243237
if numFormat == boilerplate.StringFormatInteger {
244238
out.DataType = PrimitiveTypeInteger
245239
} else {
246240
out.DataType = PrimitiveTypeNumber
247241
}
248242

249-
if slices.Contains(p.Inference.Types, "null") {
250-
out.Required = false
251-
}
252-
253243
return out
254244
}
255245

256246
hadType := false
257247
for _, t := range p.Inference.Types {
258248
if t == "null" {
259-
out.Required = false
260249
continue
261250
}
262251

materialize-gcs-csv/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ var driver = filesink.FileDriver{
4747
NewStore: func(ctx context.Context, c filesink.Config) (filesink.Store, error) {
4848
return filesink.NewGCSStore(ctx, c.(config).GCSStoreConfig)
4949
},
50-
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) filesink.StreamEncoder {
51-
return filesink.NewCsvStreamEncoder(c.(config).CsvConfig, b, w)
50+
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (filesink.StreamEncoder, error) {
51+
return filesink.NewCsvStreamEncoder(c.(config).CsvConfig, b, w), nil
5252
},
5353
NewConstraints: func(p *pf.Projection) *materialize.Response_Validated_Constraint {
5454
return filesink.StdConstraints(p)

materialize-gcs-parquet/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var driver = filesink.FileDriver{
4747
NewStore: func(ctx context.Context, c filesink.Config) (filesink.Store, error) {
4848
return filesink.NewGCSStore(ctx, c.(config).GCSStoreConfig)
4949
},
50-
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) filesink.StreamEncoder {
50+
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (filesink.StreamEncoder, error) {
5151
return filesink.NewParquetStreamEncoder(c.(config).ParquetConfig, b, w)
5252
},
5353
NewConstraints: func(p *pf.Projection) *materialize.Response_Validated_Constraint {

materialize-s3-csv/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ var driver = filesink.FileDriver{
4747
NewStore: func(ctx context.Context, c filesink.Config) (filesink.Store, error) {
4848
return filesink.NewS3Store(ctx, c.(config).S3StoreConfig)
4949
},
50-
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) filesink.StreamEncoder {
51-
return filesink.NewCsvStreamEncoder(c.(config).CsvConfig, b, w)
50+
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (filesink.StreamEncoder, error) {
51+
return filesink.NewCsvStreamEncoder(c.(config).CsvConfig, b, w), nil
5252
},
5353
NewConstraints: func(p *pf.Projection) *materialize.Response_Validated_Constraint {
5454
return filesink.StdConstraints(p)

materialize-s3-iceberg/type_mapping.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func projectionToParquetSchemaElement(p pf.Projection, fc fieldConfig) (enc.Parq
8282
p.Inference.String_.Format = ""
8383
}
8484

85-
return enc.ProjectionToParquetSchemaElement(p, schemaOptions...), nil
85+
return enc.ProjectionToParquetSchemaElement(p, false, schemaOptions...), nil
8686
}
8787

8888
func parquetTypeToIcebergType(pqt enc.ParquetDataType) icebergType {

materialize-s3-parquet/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var driver = filesink.FileDriver{
4747
NewStore: func(ctx context.Context, c filesink.Config) (filesink.Store, error) {
4848
return filesink.NewS3Store(ctx, c.(config).S3StoreConfig)
4949
},
50-
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) filesink.StreamEncoder {
50+
NewEncoder: func(c filesink.Config, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (filesink.StreamEncoder, error) {
5151
return filesink.NewParquetStreamEncoder(c.(config).ParquetConfig, b, w)
5252
},
5353
NewConstraints: func(p *pf.Projection) *materialize.Response_Validated_Constraint {

0 commit comments

Comments
 (0)