Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions destination/iceberg/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package iceberg

import (
"context"
"fmt"
"time"

"github.com/apache/arrow-go/v18/arrow"
arrow_writer "github.com/datazip-inc/olake/destination/iceberg/arrow"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we importing same package when we are already in the package?

"github.com/datazip-inc/olake/types"
)

type ArrowWriter struct {
iceberg *Iceberg

unpartitionedWriter *arrow_writer.RollingWriter
partitionedWriter *arrow_writer.Fanout
deleteFileWriter *arrow_writer.RollingWriter

fields []arrow.Field
isNormalized bool
}

func (i *Iceberg) NewArrowWriter() (*ArrowWriter, error) {
arrowWriter := &ArrowWriter{
iceberg: i,
isNormalized: i.stream.NormalizationEnabled(),
}

if arrowWriter.isNormalized {
arrowWriter.fields = arrow_writer.CreateNormFields(i.schema)
} else {
arrowWriter.fields = arrow_writer.CreateDeNormFields()
}

return arrowWriter, nil
}

func (aw *ArrowWriter) ArrowWrites(ctx context.Context, records []types.RawRecord) error {
if len(records) == 0 {
return nil
}

now := time.Now().UTC()
for i := range records {
records[i].OlakeTimestamp = now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this should already be set?

}

arrowFields := aw.fields

if len(aw.iceberg.partitionInfo) != 0 {
if aw.partitionedWriter == nil {
aw.partitionedWriter = arrow_writer.NewFanoutWriter(context.Background(), aw.iceberg.partitionInfo, aw.iceberg.schema)
aw.partitionedWriter.Normalization = aw.iceberg.stream.NormalizationEnabled()
aw.partitionedWriter.FilenameGen = aw.iceberg.GenerateFilename
}

uploadDataList, deleteRecords, err := aw.partitionedWriter.Write(ctx, records, arrowFields)
if err != nil {
return fmt.Errorf("failed to write partitioned data using fanout writer: %v", err)
}

for _, uploadData := range uploadDataList {
if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep it outside in Iceberg itself. Iceberg driving this makes more sense. Lets keep Arrow only for creating Parquets.

uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return fmt.Errorf("failed to upload file %s via Iceberg FileIO: %w", uploadData.Filename, err)
}
if aw.iceberg.createdFilePaths == nil {
aw.iceberg.createdFilePaths = make([][]string, 0)
}
aw.iceberg.createdFilePaths = append(aw.iceberg.createdFilePaths, []string{uploadData.FileType, storagePath})
}
}

if len(deleteRecords) > 0 {
fieldId, err := aw.iceberg.GetFieldId(ctx, "_olake_id")
if err != nil {
return fmt.Errorf("failed to get field ID for _olake_id: %w", err)
}

if aw.deleteFileWriter == nil {
aw.deleteFileWriter = arrow_writer.NewRollingWriter(context.Background(), "", "delete")
aw.deleteFileWriter.FieldId = fieldId
aw.deleteFileWriter.FilenameGen = aw.iceberg.GenerateFilename
}

deletes := make([]types.RawRecord, 0, len(deleteRecords))
for _, rec := range deleteRecords {
r := types.RawRecord{OlakeID: rec.OlakeID}
deletes = append(deletes, r)
}

rec, err := arrow_writer.CreateDelArrRecord(deletes, fieldId)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about if its a update operation? how is the delete file being created in the partition?

return fmt.Errorf("failed to create delete record: %w", err)
}

defer rec.Release()

uploadData, err := aw.deleteFileWriter.Write(rec)
if err != nil {
return fmt.Errorf("failed to write delete arrow record to parquet: %w", err)
}

if uploadData != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets handle it in more proper manner, too many times we are calling uploadparquetfile function.

It can be converted to function and also we need to take a look at if we can call at the end just once.

storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return fmt.Errorf("failed to upload delete file via Iceberg FileIO: %w", err)
}
if aw.iceberg.createdFilePaths == nil {
aw.iceberg.createdFilePaths = make([][]string, 0)
}

aw.iceberg.createdFilePaths = append(aw.iceberg.createdFilePaths, []string{uploadData.FileType, storagePath})
}
}
} else {
if aw.unpartitionedWriter == nil {
aw.unpartitionedWriter = arrow_writer.NewRollingWriter(context.Background(), "", "data")
aw.unpartitionedWriter.FilenameGen = aw.iceberg.GenerateFilename
}

deletes := make([]types.RawRecord, 0)

for _, rec := range records {
if rec.OperationType == "d" || rec.OperationType == "u" {
r := types.RawRecord{OlakeID: rec.OlakeID}
deletes = append(deletes, r)
}
}

if len(deletes) > 0 {
fieldId, err := aw.iceberg.GetFieldId(ctx, "_olake_id")
if err != nil {
return fmt.Errorf("failed to get field ID for _olake_id: %w", err)
}

if aw.deleteFileWriter == nil {
aw.deleteFileWriter = arrow_writer.NewRollingWriter(context.Background(), "", "delete")
aw.deleteFileWriter.FieldId = fieldId
aw.deleteFileWriter.FilenameGen = aw.iceberg.GenerateFilename
}

rec, err := arrow_writer.CreateDelArrRecord(deletes, fieldId)
if err != nil {
return fmt.Errorf("failed to create delete record: %w", err)
}

defer rec.Release()

uploadData, err := aw.deleteFileWriter.Write(rec)
if err != nil {
return fmt.Errorf("failed to write arrow record to parquet: %w", err)
}

if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return fmt.Errorf("failed to upload delete file via Iceberg FileIO: %w", err)
}
if aw.iceberg.createdFilePaths == nil {
aw.iceberg.createdFilePaths = make([][]string, 0)
}

aw.iceberg.createdFilePaths = append(aw.iceberg.createdFilePaths, []string{uploadData.FileType, storagePath})
}
}

rec, err := arrow_writer.CreateArrowRecordWithFields(records, arrowFields, aw.isNormalized)
if err != nil {
return fmt.Errorf("failed to create arrow record: %w", err)
}

defer rec.Release()

uploadData, err := aw.unpartitionedWriter.Write(rec)
if err != nil {
return fmt.Errorf("failed to write arrow record to parquet: %w", err)
}

if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return fmt.Errorf("failed to upload data file via Iceberg FileIO: %w", err)
}
if aw.iceberg.createdFilePaths == nil {
aw.iceberg.createdFilePaths = make([][]string, 0)
}
aw.iceberg.createdFilePaths = append(aw.iceberg.createdFilePaths, []string{uploadData.FileType, storagePath})
}
}

return nil
}

func (aw *ArrowWriter) Close() ([][]string, error) {
ctx := context.Background()
outputFilePaths := make([][]string, 0)

if aw.deleteFileWriter != nil {
uploadData, err := aw.deleteFileWriter.Close()
if err != nil {
return outputFilePaths, err
}
if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return outputFilePaths, fmt.Errorf("failed to upload delete file on close: %w", err)
}

outputFilePaths = append(outputFilePaths, []string{uploadData.FileType, storagePath})
}
}

if aw.partitionedWriter != nil {
uploadDataList, err := aw.partitionedWriter.Close()
if err != nil {
return outputFilePaths, err
}

for _, uploadData := range uploadDataList {
if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return outputFilePaths, fmt.Errorf("failed to upload file on close: %w", err)
}
outputFilePaths = append(outputFilePaths, []string{uploadData.FileType, storagePath})
}
}
}

if aw.unpartitionedWriter != nil {
uploadData, err := aw.unpartitionedWriter.Close()
if err != nil {
return outputFilePaths, err
}

if uploadData != nil {
storagePath, err := aw.iceberg.UploadParquetFile(ctx, uploadData.FileData, uploadData.FileType,
uploadData.PartitionKey, uploadData.Filename, uploadData.EqualityFieldId)
if err != nil {
return outputFilePaths, fmt.Errorf("failed to upload data file on close: %w", err)
}
outputFilePaths = append(outputFilePaths, []string{uploadData.FileType, storagePath})
}
}

return outputFilePaths, nil
}
Loading