Skip to content

Conversation

@badalprasadsingh
Copy link
Collaborator

@badalprasadsingh badalprasadsingh commented Sep 23, 2025

feat: Arrow-writer for Iceberg

This PR introduces Apache Arrow-based iceberg writer, that writes both data and delete files into object store and registers them into the Iceberg Table using Java`s API passing their file path.

Thus, supporting both:

  • full-refresh
  • cdc (equality-deletes)

The current implementation converts go types data into arrow.Record. Uses pqarrow library to write these arrow data into parquet files, flushing each file on exactly reaching the target file size.

It introduces:

Rolling Writer Support

  • rolling data file writers
  • rolling delete file writers

for both partitioned and unpartitioned data, with:

  • compression as zstd, compression level 1, etc.
  • configurable target file sizes

Fanout Partitioning Strategy

  • keeping multiple files open at the same time (no clustering or sorting required)

Transforms Logic

  • identity, year, month, week, day, hour, bucket, truncate, void; all iceberg transforms supported

How to run it?

In your destination.json (while using CLI) enable this toggle:

"arrow_writes": true

As your sync starts, you should see something like this in your logs:

INFO >>>> Arrow Writer Enabled >>>> >>>> >>>>

This indicates OLake is using the arrow writer successfully.

Currently supports:

  • schema-evolution
  • all iceberg catalogs (Glue, REST, Hadoop, JDBC, etc.)
  • all object stores (S3, ADLS, GCS, S3A, etc.)

@badalprasadsingh badalprasadsingh marked this pull request as ready for review September 29, 2025 03:15
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
…ed map to json marshalling

Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
Signed-off-by: badalprasadsingh <[email protected]>
)

// PartitionInfo represents a Iceberg partition column with its transform, preserving order
type PartitionInfo struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can not be in the writers.go as its specific to Iceberg.


// setup java client
func newIcebergClient(config *Config, partitionInfo []PartitionInfo, threadID string, check, upsert bool, destinationDatabase string) (*serverInstance, error) {
func newIcebergClient(config *Config, partitionInfo []destination.PartitionInfo, threadID string, check, upsert bool, destinationDatabase string) (*serverInstance, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why there are changes in this function. it should remain intact.

go.mongodb.org/mongo-driver v1.17.3
golang.org/x/tools v0.30.0
google.golang.org/grpc v1.71.3
google.golang.org/grpc v1.72.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not upgrade the grpc version. It breaks in passive cases for protobuf things

}

func (i *Iceberg) Check(ctx context.Context) error {
if i.UseArrowWrites() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


// note: java server parses time from long value which will in milliseconds
func (i *Iceberg) Write(ctx context.Context, records []types.RawRecord) error {
if i.UseArrowWrites() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not happening in setup?

}

if r.fileType == "delete" {
icebergSchemaJSON := fmt.Sprintf(`{"type":"struct","schema-id":0,"fields":[{"id":%d,"name":"_olake_id","required":true,"type":"string"}]}`, r.FieldId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "schema-id":0?

record.Release()

sizeSoFar := int64(0)
if r.currentBuffer != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this be nil, you are always setting it?

sizeSoFar += int64(r.currentBuffer.Len())
}
if r.currentWriter != nil {
sizeSoFar += r.currentWriter.RowGroupTotalBytesWritten()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets discuss why we are adding it twice?

return fmt.Errorf("failed to write delete arrow record to parquet: %w", err)
}

if uploadData != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets handle it in more proper manner, too many times we are calling uploadparquetfile function.

It can be converted to function and also we need to take a look at if we can call at the end just once.

}

rec, err := arrow_writer.CreateDelArrRecord(deletes, fieldId)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about if its a update operation? how is the delete file being created in the partition?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants