diff --git a/destination/parquet/parquet.go b/destination/parquet/parquet.go index 9a4a470d..7b5c41ae 100644 --- a/destination/parquet/parquet.go +++ b/destination/parquet/parquet.go @@ -151,12 +151,25 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema return fields, nil } -// Write writes a record to the Parquet file. +// Write writes records to the Parquet file using batch writing for optimal performance. func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error { - // TODO: use batch writing feature of pq writer - for _, record := range records { - record.OlakeTimestamp = time.Now().UTC() - partitionedPath := p.getPartitionedFilePath(record.Data, record.OlakeTimestamp) + if len(records) == 0 { + return nil + } + + // Group records by partition for efficient batch writing + partitionBatches := make(map[string][]types.RawRecord) + batchTimestamp := time.Now().UTC() + + // Assign timestamp and group by partition + for i := range records { + records[i].OlakeTimestamp = batchTimestamp + partitionedPath := p.getPartitionedFilePath(records[i].Data, batchTimestamp) + partitionBatches[partitionedPath] = append(partitionBatches[partitionedPath], records[i]) + } + + // Write each partition's batch in a single operation + for partitionedPath, batch := range partitionBatches { partitionFile, exists := p.partitionedFiles[partitionedPath] if !exists { err := p.createNewPartitionFile(partitionedPath) @@ -170,14 +183,22 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error { return fmt.Errorf("failed to create partition file for path[%s]", partitionedPath) } + // Batch write all records for this partition var err error if p.stream.NormalizationEnabled() { - _, err = partitionFile.writer.(*pqgo.GenericWriter[any]).Write([]any{record.Data}) + // Prepare batch data with all records + batchData := make([]any, len(batch)) + for i, record := range batch { + batchData[i] = record.Data + } + _, err = partitionFile.writer.(*pqgo.GenericWriter[any]).Write(batchData) } else { - _, err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Write([]types.RawRecord{record}) + // Write entire batch in one operation + _, err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Write(batch) } + if err != nil { - return fmt.Errorf("failed to write in parquet file: %s", err) + return fmt.Errorf("failed to write batch to parquet file: %s", err) } }