Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions destination/parquet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"github.com/datazip-inc/olake/utils"
)

const (
DefaultMaxFileSizeMB = 512
DefaultMaxRowsPerFile = 1000000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to add row based size limit

)

type Config struct {
Path string `json:"local_path,omitempty"` // Local file path (for local file system usage)
Bucket string `json:"s3_bucket,omitempty"`
Expand All @@ -13,8 +18,18 @@ type Config struct {
Prefix string `json:"s3_path,omitempty"`
// S3 endpoint for custom S3-compatible services (like MinIO)
S3Endpoint string `json:"s3_endpoint,omitempty"`
// The maximum file size in MB before rotation (default: 512MB)
MaxFileSizeMB int `json:"max_file_size_mb,omitempty"`
// The maximum number of rows per file before rotation (default: 1,000,000)
MaxRowsPerFile int `json:"max_rows_per_file,omitempty"`
}

func (c *Config) Validate() error {
if c.MaxFileSizeMB <= 0 {
c.MaxFileSizeMB = DefaultMaxFileSizeMB
}
if c.MaxRowsPerFile <= 0 {
c.MaxRowsPerFile = DefaultMaxRowsPerFile
}
return utils.Validate(c)
}
38 changes: 38 additions & 0 deletions destination/parquet/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package parquet

import "testing"

func TestConfigValidateDefaults(t *testing.T) {
config := &Config{}

if err := config.Validate(); err != nil {
t.Fatalf("Validate() failed: %v", err)
}

if config.MaxFileSizeMB != DefaultMaxFileSizeMB {
t.Errorf("Expected MaxFileSizeMB to be %d, got %d", DefaultMaxFileSizeMB, config.MaxFileSizeMB)
}

if config.MaxRowsPerFile != DefaultMaxRowsPerFile {
t.Errorf("Expected MaxRowsPerFile to be %d, got %d", DefaultMaxRowsPerFile, config.MaxRowsPerFile)
}
}

func TestConfigValidateCustomValues(t *testing.T) {
config := &Config{
MaxFileSizeMB: 256,
MaxRowsPerFile: 500000,
}

if err := config.Validate(); err != nil {
t.Fatalf("Validate() failed: %v", err)
}

if config.MaxFileSizeMB != 256 {
t.Errorf("Expected MaxFileSizeMB to be 256, got %d", config.MaxFileSizeMB)
}

if config.MaxRowsPerFile != 500000 {
t.Errorf("Expected MaxRowsPerFile to be 500000, got %d", config.MaxRowsPerFile)
}
}
168 changes: 103 additions & 65 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
)

type FileMetadata struct {
fileName string
writer any
file source.ParquetFile
fileName string
writer any
file source.ParquetFile
rowCount int64
estimatedSizeBytes int64
}

// Parquet destination writes Parquet files to a local path and optionally uploads them to S3.
Expand Down Expand Up @@ -105,15 +107,97 @@
}()

p.partitionedFiles[basePath] = &FileMetadata{
fileName: fileName,
file: pqFile,
writer: writer,
fileName: fileName,
file: pqFile,
writer: writer,
rowCount: 0,
estimatedSizeBytes: 0,
}

logger.Infof("Thread[%s]: created new partition file[%s]", p.options.ThreadID, filePath)
return nil
}

func estimateRecordSize(data map[string]any) int64 {
var size int64
for key, value := range data {
size += int64(len(key))
switch v := value.(type) {
case string:
size += int64(len(v))
case []byte:
size += int64(len(v))
case int, int32, int64, uint, uint32, uint64, float32, float64:
size += 8
case bool:
size += 1

Check warning on line 133 in destination/parquet/parquet.go

View workflow job for this annotation

GitHub Actions / golangci-lint

increment-decrement: should replace size += 1 with size++ (revive)
case nil:
size += 0
default:
size += 100
}
}
return size
Comment on lines +121 to +140
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calculating file size based on the variables used might cause problem, as there is no consideration of compression ratio here.

Instead can you research about if there is some function available in parquet go library which might be helpful to us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vaibhav-datazip,

So I initially tried to calculate the actual file size using os.Stat() but realized that it would be 0 until we called Flush() on the writer, which made it unreliable for rotation checks.
And I researched but didn't find any library to get the compressed file size as the buffered data writes only after Flush/Close.

New suggestion : We can periodically check actual file size every 1000 records (or a particular amount) by calling Flush() and then using os.Stat() to get the real compressed file size on disk. This way we measure the actual bytes after compression, not estimates. If the file exceeds the limit, we rotate to a new file.

Tested with Docker integration tests across 1MB, 5MB, and 10MB limits and it gives file sizes very close to those limits.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image Maybe this inbuilt function can help you, It can be used to calculate the file size in parquet writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out the File().Size() method!
I tested it, but it tracks the offset of bytes written to the writer, not including buffered data. During active writing, this returns a size that's behind the actual data we've ingested.

From what I understand the method seems designed for after the file is closed, not during active writing.

}

func (p *Parquet) shouldRotateFile(partitionFile *FileMetadata) bool {
if partitionFile.rowCount >= int64(p.config.MaxRowsPerFile) {
return true
}
estimatedSizeMB := partitionFile.estimatedSizeBytes / (1024 * 1024)
return estimatedSizeMB >= int64(p.config.MaxFileSizeMB)
}

func (p *Parquet) closeAndUploadFile(basePath string, partitionFile *FileMetadata) error {
filePath := filepath.Join(p.config.Path, basePath, partitionFile.fileName)

var err error
if p.stream.NormalizationEnabled() {
err = partitionFile.writer.(*pqgo.GenericWriter[any]).Close()
} else {
err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Close()
}
if err != nil {
return fmt.Errorf("failed to close writer: %s", err)
}

if err := partitionFile.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %s", err)
}

logger.Infof("Thread[%s]: Finished writing file [%s] with %d rows", p.options.ThreadID, filePath, partitionFile.rowCount)

if p.s3Client != nil {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %s", err)
}
defer file.Close()

s3KeyPath := basePath
if p.config.Prefix != "" {
s3KeyPath = filepath.Join(p.config.Prefix, s3KeyPath)
}
s3KeyPath = filepath.Join(s3KeyPath, partitionFile.fileName)

_, err = p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.config.Bucket),
Key: aws.String(s3KeyPath),
Body: file,
})
if err != nil {
return fmt.Errorf("failed to put object into s3: %s", err)
}

if err := os.Remove(filePath); err != nil {
logger.Warnf("Thread[%s]: Failed to delete file [%s]: %s", p.options.ThreadID, filePath, err)
}
logger.Infof("Thread[%s]: uploaded file to S3: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3KeyPath)
}

return nil
}

// Setup configures the parquet writer, including local paths, file names, and optional S3 setup.
func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema any, options *destination.Options) (any, error) {
p.options = options
Expand Down Expand Up @@ -170,6 +254,8 @@
return fmt.Errorf("failed to create partition file for path[%s]", partitionedPath)
}

partitionFile.estimatedSizeBytes += estimateRecordSize(record.Data)

var err error
if p.stream.NormalizationEnabled() {
_, err = partitionFile.writer.(*pqgo.GenericWriter[any]).Write([]any{record.Data})
Expand All @@ -179,6 +265,15 @@
if err != nil {
return fmt.Errorf("failed to write in parquet file: %s", err)
}

partitionFile.rowCount++

if p.shouldRotateFile(partitionFile) {
if err := p.closeAndUploadFile(partitionedPath, partitionFile); err != nil {
return fmt.Errorf("failed to rotate file: %s", err)
}
delete(p.partitionedFiles, partitionedPath)
}
}

return nil
Expand Down Expand Up @@ -233,68 +328,11 @@
}

func (p *Parquet) closePqFiles() error {
removeLocalFile := func(filePath, reason string) {
err := os.Remove(filePath)
if err != nil {
logger.Warnf("Thread[%s]: Failed to delete file [%s], reason (%s): %s", p.options.ThreadID, filePath, reason, err)
return
}
logger.Debugf("Thread[%s]: Deleted file [%s], reason (%s).", p.options.ThreadID, filePath, reason)
}

for basePath, parquetFile := range p.partitionedFiles {
// construct full file path
filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)

// Close writers
var err error
if p.stream.NormalizationEnabled() {
err = parquetFile.writer.(*pqgo.GenericWriter[any]).Close()
} else {
err = parquetFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Close()
}
if err != nil {
return fmt.Errorf("failed to close writer: %s", err)
}

// Close file
if err := parquetFile.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %s", err)
}

logger.Infof("Thread[%s]: Finished writing file [%s].", p.options.ThreadID, filePath)

if p.s3Client != nil {
// Open file for S3 upload
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %s", err)
}
defer file.Close()

// Construct S3 key path
s3KeyPath := basePath
if p.config.Prefix != "" {
s3KeyPath = filepath.Join(p.config.Prefix, s3KeyPath)
}
s3KeyPath = filepath.Join(s3KeyPath, parquetFile.fileName)

// Upload to S3
_, err = p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.config.Bucket),
Key: aws.String(s3KeyPath),
Body: file,
})
if err != nil {
return fmt.Errorf("failed to put object into s3: %s", err)
}

// Remove local file after successful upload
removeLocalFile(filePath, "uploaded to S3")
logger.Infof("Thread[%s]: successfully uploaded file to S3: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3KeyPath)
if err := p.closeAndUploadFile(basePath, parquetFile); err != nil {
return err
}
}
// make map empty
p.partitionedFiles = make(map[string]*FileMetadata)
return nil
}
Expand Down
26 changes: 17 additions & 9 deletions destination/parquet/resources/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,24 @@
"type": "string",
"title": "S3 Endpoint",
"description": "Specifies the endpoint URL for S3 compatible services (e.g., MinIO)"
},
"max_file_size_mb": {
"type": "integer",
"title": "Maximum File Size (MB)",
"description": "Maximum size of each Parquet file in megabytes before rotating to a new file. Default is 512 MB.",
"default": 512,
"minimum": 1
},
"max_rows_per_file": {
"type": "integer",
"title": "Maximum Rows Per File",
"description": "Maximum number of rows per Parquet file before rotating to a new file. Default is 1,000,000 rows.",
"default": 1000000,
"minimum": 1
}
},
"required": [
"s3_bucket",
"s3_region",
"s3_path"
]
"required": ["s3_bucket", "s3_region", "s3_path"]
}
},
"required": [
"writer"
]
}
"required": ["writer"]
}
Loading