Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,25 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
return fields, nil
}

// Write writes a record to the Parquet file.
// Write writes records to the Parquet file using batch writing for optimal performance.
func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
// TODO: use batch writing feature of pq writer
for _, record := range records {
record.OlakeTimestamp = time.Now().UTC()
partitionedPath := p.getPartitionedFilePath(record.Data, record.OlakeTimestamp)
if len(records) == 0 {
return nil
}

// Group records by partition for efficient batch writing
partitionBatches := make(map[string][]types.RawRecord)
batchTimestamp := time.Now().UTC()

// Assign timestamp and group by partition
for i := range records {
records[i].OlakeTimestamp = batchTimestamp
partitionedPath := p.getPartitionedFilePath(records[i].Data, batchTimestamp)
partitionBatches[partitionedPath] = append(partitionBatches[partitionedPath], records[i])
}

// Write each partition's batch in a single operation
for partitionedPath, batch := range partitionBatches {
partitionFile, exists := p.partitionedFiles[partitionedPath]
if !exists {
err := p.createNewPartitionFile(partitionedPath)
Expand All @@ -170,14 +183,22 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
return fmt.Errorf("failed to create partition file for path[%s]", partitionedPath)
}

// Batch write all records for this partition
var err error
if p.stream.NormalizationEnabled() {
_, err = partitionFile.writer.(*pqgo.GenericWriter[any]).Write([]any{record.Data})
// Prepare batch data with all records
batchData := make([]any, len(batch))
for i, record := range batch {
batchData[i] = record.Data
}
_, err = partitionFile.writer.(*pqgo.GenericWriter[any]).Write(batchData)
} else {
_, err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Write([]types.RawRecord{record})
// Write entire batch in one operation
_, err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Write(batch)
}

if err != nil {
return fmt.Errorf("failed to write in parquet file: %s", err)
return fmt.Errorf("failed to write batch to parquet file: %s", err)
}
}

Expand Down
Loading