Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions destination/parquet/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package parquet

import (
"fmt"

"github.com/datazip-inc/olake/utils"
)

Expand All @@ -13,8 +15,39 @@ type Config struct {
Prefix string `json:"s3_path,omitempty"`
// S3 endpoint for custom S3-compatible services (like MinIO)
S3Endpoint string `json:"s3_endpoint,omitempty"`

// Parquet file optimization settings
Compression string `json:"compression,omitempty"` // Compression codec: snappy (default), gzip, zstd, lz4, none
MaxFileSize int64 `json:"max_file_size,omitempty"` // Maximum file size in bytes (default: 128MB)
RowGroupSize int `json:"row_group_size,omitempty"` // Row group size (default: 128MB worth of rows)
SortColumns string `json:"sort_columns,omitempty"` // Comma-separated list of columns to sort by
}

func (c *Config) Validate() error {
// Validate compression codec if specified
if c.Compression != "" {
validCompressions := []string{"snappy", "gzip", "zstd", "lz4", "none", "uncompressed"}
valid := false
for _, v := range validCompressions {
if c.Compression == v {
valid = true
break
}
}
if !valid {
return fmt.Errorf("invalid compression codec: %s. Valid options are: snappy, gzip, zstd, lz4, none, uncompressed", c.Compression)
}
}

// Validate max file size if specified
if c.MaxFileSize < 0 {
return fmt.Errorf("max_file_size must be a positive value")
}

// Validate row group size if specified
if c.RowGroupSize < 0 {
return fmt.Errorf("row_group_size must be a positive value")
}

return utils.Validate(c)
}
188 changes: 137 additions & 51 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
)

type FileMetadata struct {
fileName string
writer any
file source.ParquetFile
fileName string
writer any
file source.ParquetFile
recordCount int // Track number of records written
fileSize int64 // Track approximate file size
}

// Parquet destination writes Parquet files to a local path and optionally uploads them to S3.
Expand All @@ -55,6 +57,42 @@ func (p *Parquet) Spec() any {
return Config{}
}

// getCompressionCodec returns the compression codec based on config, defaults to Snappy
func (p *Parquet) getCompressionCodec() pqgo.Compress {
if p.config.Compression == "" || p.config.Compression == "snappy" {
return &pqgo.Snappy
}
switch p.config.Compression {
case "gzip":
return &pqgo.Gzip
case "zstd":
return &pqgo.Zstd
case "lz4":
return &pqgo.Lz4Raw
case "none", "uncompressed":
return &pqgo.Uncompressed
default:
logger.Warnf("Thread[%s]: unknown compression codec %s, using snappy", p.options.ThreadID, p.config.Compression)
return &pqgo.Snappy
}
}

// getMaxFileSize returns the max file size in bytes, defaults to 128MB
func (p *Parquet) getMaxFileSize() int64 {
if p.config.MaxFileSize > 0 {
return p.config.MaxFileSize
}
return 128 * 1024 * 1024 // 128MB default
}

// getRowGroupSize returns the row group size, defaults to parquet-go's default
func (p *Parquet) getRowGroupSize() int {
if p.config.RowGroupSize > 0 {
return p.config.RowGroupSize
}
return 0 // Let parquet-go use its default
}

// setup s3 client if credentials provided
func (p *Parquet) initS3Writer() error {
if p.config.Bucket == "" || p.config.Region == "" {
Expand Down Expand Up @@ -97,20 +135,35 @@ func (p *Parquet) createNewPartitionFile(basePath string) error {
return fmt.Errorf("failed to create parquet file writer: %s", err)
}

// Build writer options
writerOptions := []pqgo.WriterOption{
pqgo.Compression(p.getCompressionCodec()),
}

// Add row group size option if configured
if p.getRowGroupSize() > 0 {
writerOptions = append(writerOptions, pqgo.PageBufferSize(p.getRowGroupSize()))
}

// TODO: Implement column sorting once we have the sort columns configuration
// This would require schema analysis and potentially buffering data for sorting

writer := func() any {
if p.stream.NormalizationEnabled() {
return pqgo.NewGenericWriter[any](pqFile, p.schema.ToTypeSchema().ToParquet(), pqgo.Compression(&pqgo.Snappy))
return pqgo.NewGenericWriter[any](pqFile, p.schema.ToTypeSchema().ToParquet(), writerOptions...)
}
return pqgo.NewGenericWriter[types.RawRecord](pqFile, types.GetParquetRawSchema(), pqgo.Compression(&pqgo.Snappy))
return pqgo.NewGenericWriter[types.RawRecord](pqFile, types.GetParquetRawSchema(), writerOptions...)
}()

p.partitionedFiles[basePath] = &FileMetadata{
fileName: fileName,
file: pqFile,
writer: writer,
fileName: fileName,
file: pqFile,
writer: writer,
recordCount: 0,
fileSize: 0,
}

logger.Infof("Thread[%s]: created new partition file[%s]", p.options.ThreadID, filePath)
logger.Infof("Thread[%s]: created new partition file[%s] with compression[%s]", p.options.ThreadID, filePath, p.config.Compression)
return nil
}

Expand Down Expand Up @@ -154,10 +207,28 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
// Write writes a record to the Parquet file.
func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
// TODO: use batch writing feature of pq writer
maxFileSize := p.getMaxFileSize()

for _, record := range records {
record.OlakeTimestamp = time.Now().UTC()
partitionedPath := p.getPartitionedFilePath(record.Data, record.OlakeTimestamp)
partitionFile, exists := p.partitionedFiles[partitionedPath]

// Check if file exists and if it has exceeded max file size
if exists && maxFileSize > 0 && partitionFile.fileSize >= maxFileSize {
logger.Infof("Thread[%s]: File size limit reached (%d >= %d), rotating file for partition[%s]",
p.options.ThreadID, partitionFile.fileSize, maxFileSize, partitionedPath)

// Close and upload the current file
if err := p.closeAndUploadPartitionFile(partitionedPath, partitionFile); err != nil {
return fmt.Errorf("failed to rotate partition file: %s", err)
}

// Remove from map so a new file will be created
delete(p.partitionedFiles, partitionedPath)
exists = false
}

if !exists {
err := p.createNewPartitionFile(partitionedPath)
if err != nil {
Expand All @@ -179,6 +250,12 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
if err != nil {
return fmt.Errorf("failed to write in parquet file: %s", err)
}

// Update record count and approximate file size
partitionFile.recordCount++
// Rough estimate: assume each record is ~1KB on average
// This is a simplification; actual size would need file stat checks
partitionFile.fileSize += 1024
}

return nil
Expand Down Expand Up @@ -232,7 +309,8 @@ func (p *Parquet) Check(_ context.Context) error {
return nil
}

func (p *Parquet) closePqFiles() error {
// closeAndUploadPartitionFile closes a single partition file and uploads to S3 if configured
func (p *Parquet) closeAndUploadPartitionFile(basePath string, parquetFile *FileMetadata) error {
removeLocalFile := func(filePath, reason string) {
err := os.Remove(filePath)
if err != nil {
Expand All @@ -242,56 +320,64 @@ func (p *Parquet) closePqFiles() error {
logger.Debugf("Thread[%s]: Deleted file [%s], reason (%s).", p.options.ThreadID, filePath, reason)
}

for basePath, parquetFile := range p.partitionedFiles {
// construct full file path
filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)
// construct full file path
filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)

// Close writers
var err error
if p.stream.NormalizationEnabled() {
err = parquetFile.writer.(*pqgo.GenericWriter[any]).Close()
} else {
err = parquetFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Close()
}
// Close writers
var err error
if p.stream.NormalizationEnabled() {
err = parquetFile.writer.(*pqgo.GenericWriter[any]).Close()
} else {
err = parquetFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Close()
}
if err != nil {
return fmt.Errorf("failed to close writer: %s", err)
}

// Close file
if err := parquetFile.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %s", err)
}

logger.Infof("Thread[%s]: Finished writing file [%s] with %d records.", p.options.ThreadID, filePath, parquetFile.recordCount)

if p.s3Client != nil {
// Open file for S3 upload
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to close writer: %s", err)
return fmt.Errorf("failed to open file: %s", err)
}
defer file.Close()

// Close file
if err := parquetFile.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %s", err)
// Construct S3 key path
s3KeyPath := basePath
if p.config.Prefix != "" {
s3KeyPath = filepath.Join(p.config.Prefix, s3KeyPath)
}
s3KeyPath = filepath.Join(s3KeyPath, parquetFile.fileName)

logger.Infof("Thread[%s]: Finished writing file [%s].", p.options.ThreadID, filePath)
// Upload to S3
_, err = p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.config.Bucket),
Key: aws.String(s3KeyPath),
Body: file,
})
if err != nil {
return fmt.Errorf("failed to put object into s3: %s", err)
}

if p.s3Client != nil {
// Open file for S3 upload
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %s", err)
}
defer file.Close()
// Remove local file after successful upload
removeLocalFile(filePath, "uploaded to S3")
logger.Infof("Thread[%s]: successfully uploaded file to S3: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3KeyPath)
}

// Construct S3 key path
s3KeyPath := basePath
if p.config.Prefix != "" {
s3KeyPath = filepath.Join(p.config.Prefix, s3KeyPath)
}
s3KeyPath = filepath.Join(s3KeyPath, parquetFile.fileName)

// Upload to S3
_, err = p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.config.Bucket),
Key: aws.String(s3KeyPath),
Body: file,
})
if err != nil {
return fmt.Errorf("failed to put object into s3: %s", err)
}
return nil
}

// Remove local file after successful upload
removeLocalFile(filePath, "uploaded to S3")
logger.Infof("Thread[%s]: successfully uploaded file to S3: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3KeyPath)
func (p *Parquet) closePqFiles() error {
for basePath, parquetFile := range p.partitionedFiles {
if err := p.closeAndUploadPartitionFile(basePath, parquetFile); err != nil {
return err
}
}
// make map empty
Expand Down
26 changes: 26 additions & 0 deletions destination/parquet/resources/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,32 @@
"type": "string",
"title": "S3 Endpoint",
"description": "Specifies the endpoint URL for S3 compatible services (e.g., MinIO)"
},
"compression": {
"type": "string",
"title": "Compression Codec",
"description": "Compression algorithm to use for Parquet files. Options: snappy (default), gzip, zstd, lz4, none. Snappy offers good balance of speed and compression ratio.",
"enum": ["snappy", "gzip", "zstd", "lz4", "none", "uncompressed"],
"default": "snappy"
},
"max_file_size": {
"type": "integer",
"title": "Maximum File Size (bytes)",
"description": "Maximum size for each Parquet file in bytes. When a file reaches this size, a new file will be created. Default: 134217728 (128MB). Set to 0 to disable file rotation.",
"default": 134217728,
"minimum": 0
},
"row_group_size": {
"type": "integer",
"title": "Row Group Size",
"description": "Number of rows to include in each row group. Larger row groups can improve compression but require more memory. Leave empty to use parquet-go's default behavior.",
"minimum": 0
},
"sort_columns": {
"type": "string",
"title": "Sort Columns",
"description": "Comma-separated list of column names to sort data by before writing to Parquet. Sorting can improve query performance for filtered queries. Example: 'timestamp,user_id'. Note: This feature is planned for future implementation.",
"pattern": "^[a-zA-Z0-9_,\\s]*$"
}
},
"required": [
Expand Down