Skip to content

Conversation

@zhangjun0x01
Copy link

@zhangjun0x01 zhangjun0x01 commented Sep 17, 2025

when we do a query without the equality delete fields, it will throw an exception Equality deletes need the relevant columns to be selected, this pr fix this issue.

steps:

  1. add the equality delete fields to projection columns when the query sql do not contain relevant columns .
  2. execute ApplyEqualityDeletes
  3. delete the equality delete fields from output data chunk for query result.

@zhangjun0x01
Copy link
Author

@Tishj ,By the way, how do I configure Iceberg to use relative paths for test cases in the data directory? I checked the configuration of Iceberg and it seems that there is no such configuration ,thanks

@Tishj
Copy link
Collaborator

Tishj commented Sep 18, 2025

@Tishj ,By the way, how do I configure Iceberg to use relative paths for test cases in the data directory? I checked the configuration of Iceberg and it seems that there is no such configuration ,thanks

I'm not sure I understand what you're asking, Iceberg works with absolute paths, that's a limitation of the format
We try to hack around that somewhat with allow_moved_paths, which I believe only works on iceberg_scan(...), not on an attached iceberg catalog

@zhangjun0x01
Copy link
Author

allow_moved_paths

in the data/persistent directory,all test cases use the relative paths in v1.metadata.json and xxx.avro , I do not know how to generate the data with relative paths

for example,the location field use relative paths , not /data/persistent/equality_deletes/warehouse/mydb/mytable

{
  "format-version" : 2,
  "table-uuid" : "7c269e29-15d2-48a6-bc83-4919d38e3041",
  "location" : "data/persistent/equality_deletes/warehouse/mydb/mytable",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1746122109432,
  "last-column-id" : 2,
  "current-schema-id" : 0,
xxxxxx

@Tishj
Copy link
Collaborator

Tishj commented Sep 18, 2025

@zhangjun0x01 check out some of the scripts, like scripts/persistent/partition_bool.py, those are the scripts used to generate the persistent data.

I imagine it's because the DATA_GENERATION_DIR is a relative path

@zhangjun0x01
Copy link
Author

@zhangjun0x01 check out some of the scripts, like scripts/persistent/partition_bool.py, those are the scripts used to generate the persistent data.

I imagine it's because the DATA_GENERATION_DIR is a relative path

I test it , when we create iceberg hadoop catalog with relative path, the table will use relative path.

@zhangjun0x01 zhangjun0x01 marked this pull request as draft September 25, 2025 09:02
@zhangjun0x01 zhangjun0x01 force-pushed the equality_delete branch 2 times, most recently from 94a498d to 3919d0c Compare September 26, 2025 10:09
@zhangjun0x01 zhangjun0x01 marked this pull request as ready for review September 26, 2025 10:35
@zhangjun0x01 zhangjun0x01 force-pushed the equality_delete branch 2 times, most recently from a5417a7 to 199f9c0 Compare September 29, 2025 08:22
@zhangjun0x01
Copy link
Author

hi,@Tishj ,could you help review again, thanks

@Tishj
Copy link
Collaborator

Tishj commented Oct 14, 2025

When CI is green I'll have a look, thanks 👍

@zhangjun0x01 zhangjun0x01 force-pushed the equality_delete branch 3 times, most recently from 05b95dd to 65fcba1 Compare October 15, 2025 05:33
@zhangjun0x01
Copy link
Author

When CI is green I'll have a look, thanks 👍

This is indeed quite strange. the test program works correctly on my computer, and it is fine on linux and windows.
let me check it carefully


zhangjun@zhangjundeMacBook-Air duckdb-iceberg % build/release/test/unittest test/sql/local/equality_deletes.test
Filters: test/sql/local/equality_deletes.test
[1/1] (100%): test/sql/local/equality_deletes.test                                                                                                                     
===============================================================================
All tests passed (51 assertions in 1 test case)

@Tishj
Copy link
Collaborator

Tishj commented Oct 15, 2025

The mytable_partitioned is the problem, you'll need to regenerate that table

For reference, this is what was used to generate mytable

// src/main/java/com/example/IcebergEqualDeleteExample.java
// ------------------------------------------------------
// Place this file under src/main/java/com/example/IcebergEqualDeleteExample.java
package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileAppender;

import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.PartitionSpec;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;

import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergEqualDeleteExample {

    public static void main(String[] args) {
        Map<String, String> props = new HashMap<>();
        props.put(CatalogProperties.WAREHOUSE_LOCATION, "data/persistent/equality_deletes/warehouse");
        props.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);

        Catalog catalog = CatalogUtil.buildIcebergCatalog("hadoop_catalog", props, new Configuration());
        TableIdentifier tableId = TableIdentifier.of("mydb", "mytable");

        Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()),
                                   Types.NestedField.optional(2, "name", Types.StringType.get()));

        if (!catalog.tableExists(tableId)) {
            catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), ImmutableMap.of(FORMAT_VERSION, "2"));
            System.out.println("Created table mydb.mytable");
        }

        Table table = catalog.loadTable(tableId);

        // Insert some records
        List<Record> records_1 =
            Arrays.asList(createRecord(table, 1, "a"), createRecord(table, 2, "b"), createRecord(table, 3, "b"));
        insertRecords(table, records_1);

        // Delete records where name="b"
        Map<String, Object> deleteConditions_1 = new HashMap<>();
        deleteConditions_1.put("name", "b");
        deleteRecordsByEquality(table, deleteConditions_1);

        // Insert some records
        List<Record> records_2 =
            Arrays.asList(createRecord(table, 1, "a"), createRecord(table, 2, "b"), createRecord(table, 1, "b"));
        insertRecords(table, records_2);

        // Delete records where id=1 AND name="a"
        Map<String, Object> deleteConditions_2 = new HashMap<>();
        deleteConditions_2.put("id", 1);
        deleteConditions_2.put("name", "a");
        deleteRecordsByEquality(table, deleteConditions_2);
    }

    private static Record createRecord(Table table, int id, String name) {
        Record record = GenericRecord.create(table.schema());
        record.setField("id", id);
        record.setField("name", name);
        return record;
    }

    private static void insertRecords(Table table, List<Record> records) {
        Transaction insertTxn = table.newTransaction();
        try {
            String dataPath = String.format("%s/data/data-%s.parquet", table.location(), UUID.randomUUID());
            OutputFile dataOut = table.io().newOutputFile(dataPath);

            FileAppender<Record> writer = Parquet.write(dataOut)
                                              .schema(table.schema())
                                              .createWriterFunc(GenericParquetWriter::buildWriter)
                                              .build();

            for (Record record : records) {
                writer.add(record);
            }

            writer.close();
            DataFile dataFile = DataFiles.builder(table.spec())
                                    .withPath(dataPath)
                                    .withFormat(FileFormat.PARQUET)
                                    .withFileSizeInBytes(dataOut.toInputFile().getLength())
                                    .withMetrics(writer.metrics())
                                    .withSplitOffsets(writer.splitOffsets())
                                    .build();

            insertTxn.newAppend().appendFile(dataFile).commit();
            insertTxn.commitTransaction();
            System.out.println("Data inserted successfully.");
        } catch (Exception e) {
            throw new RuntimeException("Failed to insert data", e);
        }
    }

    private static void deleteRecordsByEquality(Table table, Map<String, Object> fieldValues) {
        Transaction txn = table.newTransaction();
        try {
            List<String> eqFields = new ArrayList<>(fieldValues.keySet());
            Record deleteRec = GenericRecord.create(table.schema().select(eqFields));

            // Set all field values in the delete record
            fieldValues.forEach(deleteRec::setField);

            String deletePath = String.format("%s/data/delete-%s.parquet", table.location(), UUID.randomUUID());

            OutputFile out = table.io().newOutputFile(deletePath);

            EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
                                                      .forTable(table)
                                                      .withSpec(table.spec())
                                                      .rowSchema(table.schema().select(eqFields))
                                                      .createWriterFunc(GenericParquetWriter::buildWriter)
                                                      .equalityFieldIds(getFieldIds(table, eqFields))
                                                      .buildEqualityWriter();

            writer.write(deleteRec);
            writer.close();
            DeleteFile df = writer.toDeleteFile();

            txn.newRowDelta().addDeletes(df).commit();
            txn.commitTransaction();
            System.out.println("Equality delete file written successfully.");
        } catch (Exception e) {
            throw new RuntimeException("Failure writing equality deletes", e);
        }
    }

    private static List<Integer> getFieldIds(Table table, List<String> names) {
        return names.stream().map(n -> table.schema().findField(n).fieldId()).collect(Collectors.toList());
    }
}

@zhangjun0x01
Copy link
Author

The mytable_partitioned is the problem, you'll need to regenerate that table

For reference, this is what was used to generate mytable

// src/main/java/com/example/IcebergEqualDeleteExample.java
// ------------------------------------------------------
// Place this file under src/main/java/com/example/IcebergEqualDeleteExample.java
package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileAppender;

import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.PartitionSpec;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;

import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergEqualDeleteExample {

    public static void main(String[] args) {
        Map<String, String> props = new HashMap<>();
        props.put(CatalogProperties.WAREHOUSE_LOCATION, "data/persistent/equality_deletes/warehouse");
        props.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);

        Catalog catalog = CatalogUtil.buildIcebergCatalog("hadoop_catalog", props, new Configuration());
        TableIdentifier tableId = TableIdentifier.of("mydb", "mytable");

        Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()),
                                   Types.NestedField.optional(2, "name", Types.StringType.get()));

        if (!catalog.tableExists(tableId)) {
            catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), ImmutableMap.of(FORMAT_VERSION, "2"));
            System.out.println("Created table mydb.mytable");
        }

        Table table = catalog.loadTable(tableId);

        // Insert some records
        List<Record> records_1 =
            Arrays.asList(createRecord(table, 1, "a"), createRecord(table, 2, "b"), createRecord(table, 3, "b"));
        insertRecords(table, records_1);

        // Delete records where name="b"
        Map<String, Object> deleteConditions_1 = new HashMap<>();
        deleteConditions_1.put("name", "b");
        deleteRecordsByEquality(table, deleteConditions_1);

        // Insert some records
        List<Record> records_2 =
            Arrays.asList(createRecord(table, 1, "a"), createRecord(table, 2, "b"), createRecord(table, 1, "b"));
        insertRecords(table, records_2);

        // Delete records where id=1 AND name="a"
        Map<String, Object> deleteConditions_2 = new HashMap<>();
        deleteConditions_2.put("id", 1);
        deleteConditions_2.put("name", "a");
        deleteRecordsByEquality(table, deleteConditions_2);
    }

    private static Record createRecord(Table table, int id, String name) {
        Record record = GenericRecord.create(table.schema());
        record.setField("id", id);
        record.setField("name", name);
        return record;
    }

    private static void insertRecords(Table table, List<Record> records) {
        Transaction insertTxn = table.newTransaction();
        try {
            String dataPath = String.format("%s/data/data-%s.parquet", table.location(), UUID.randomUUID());
            OutputFile dataOut = table.io().newOutputFile(dataPath);

            FileAppender<Record> writer = Parquet.write(dataOut)
                                              .schema(table.schema())
                                              .createWriterFunc(GenericParquetWriter::buildWriter)
                                              .build();

            for (Record record : records) {
                writer.add(record);
            }

            writer.close();
            DataFile dataFile = DataFiles.builder(table.spec())
                                    .withPath(dataPath)
                                    .withFormat(FileFormat.PARQUET)
                                    .withFileSizeInBytes(dataOut.toInputFile().getLength())
                                    .withMetrics(writer.metrics())
                                    .withSplitOffsets(writer.splitOffsets())
                                    .build();

            insertTxn.newAppend().appendFile(dataFile).commit();
            insertTxn.commitTransaction();
            System.out.println("Data inserted successfully.");
        } catch (Exception e) {
            throw new RuntimeException("Failed to insert data", e);
        }
    }

    private static void deleteRecordsByEquality(Table table, Map<String, Object> fieldValues) {
        Transaction txn = table.newTransaction();
        try {
            List<String> eqFields = new ArrayList<>(fieldValues.keySet());
            Record deleteRec = GenericRecord.create(table.schema().select(eqFields));

            // Set all field values in the delete record
            fieldValues.forEach(deleteRec::setField);

            String deletePath = String.format("%s/data/delete-%s.parquet", table.location(), UUID.randomUUID());

            OutputFile out = table.io().newOutputFile(deletePath);

            EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
                                                      .forTable(table)
                                                      .withSpec(table.spec())
                                                      .rowSchema(table.schema().select(eqFields))
                                                      .createWriterFunc(GenericParquetWriter::buildWriter)
                                                      .equalityFieldIds(getFieldIds(table, eqFields))
                                                      .buildEqualityWriter();

            writer.write(deleteRec);
            writer.close();
            DeleteFile df = writer.toDeleteFile();

            txn.newRowDelta().addDeletes(df).commit();
            txn.commitTransaction();
            System.out.println("Equality delete file written successfully.");
        } catch (Exception e) {
            throw new RuntimeException("Failure writing equality deletes", e);
        }
    }

    private static List<Integer> getFieldIds(Table table, List<String> names) {
        return names.stream().map(n -> table.schema().findField(n).fieldId()).collect(Collectors.toList());
    }
}

I create the partitioned table and insert data by spark , and generate equality delete files by java api use this case , and I query the table mytable_partitioned by spark, it is correct, I will carefully check again what differences exist between this macOS test environment and my computer.

@zhangjun0x01
Copy link
Author

I regenerate the table mytable_partitioned all by java api, and use the macos 15.7.1 ( test case use macOS 15.6.1) , I still can not reproduce the error on my computer.

zhangjun@zhangjundeMacBook-Air duckdb-iceberg % build/release/test/unittest test/sql/local/equality_deletes.test
Filters: test/sql/local/equality_deletes.test
[1/1] (100%): test/sql/local/equality_deletes.test                                                                                                                     
===============================================================================
All tests passed (51 assertions in 1 test case)

zhangjun@zhangjundeMacBook-Air duckdb-iceberg % sw_vers
ProductName:            macOS
ProductVersion:         15.7.1
BuildVersion:           24G231

@zhangjun0x01
Copy link
Author

hi,@Tishj, could you help me test if the test/sql/local/equality_deletes.test can run correctly on your computer? thanks

@zhangjun0x01
Copy link
Author

hi,@Tishj, could you help me test if the test/sql/local/equality_deletes.test can run correctly on your computer? I have executed the operation repeatedly on my computer many times, but I still cannot reproduce this issue. thanks

this is the full code to generate the mytable_partitioned


import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.io.*;

import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.PartitionSpec;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.PropertyUtil;

import java.io.IOException;
import java.time.LocalDate;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergEqualDeleteExample {

    public static void main(String[] args) throws IOException {
        Map<String, String> props = new HashMap<>();
        props.put(CatalogProperties.WAREHOUSE_LOCATION, "data/persistent/equality_deletes/warehouse");
        props.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);

        Catalog catalog = CatalogUtil.buildIcebergCatalog("hadoop_catalog", props, new Configuration());
        TableIdentifier tableId = TableIdentifier.of("mydb", "mytable_partitioned");

        Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()),
                Types.NestedField.optional(2, "name", Types.StringType.get()),
                Types.NestedField.optional(3, "bir", Types.DateType.get()));

        if (!catalog.tableExists(tableId)) {
            PartitionSpec spec = PartitionSpec.builderFor(schema)
                    .identity("name")
                    .build();
            catalog.createTable(tableId, schema, spec, ImmutableMap.of(FORMAT_VERSION, "2"));
            System.out.println("Created table mydb.mytable_partitioned");
        }

        Table table = catalog.loadTable(tableId);
        TaskWriter<Record> taskWriter = createNewWriter(table);

        taskWriter.write(createRecord(table, 1, "a", LocalDate.parse("2025-01-01")));
        taskWriter.write(createRecord(table, 2, "b", LocalDate.parse("2025-01-02")));
        taskWriter.write(createRecord(table, 3, "c", LocalDate.parse("2025-01-03")));
        taskWriter.write(createRecord(table, 4, "d", LocalDate.parse("2025-01-04")));
        commit(table, taskWriter.complete().dataFiles());


        Map<String, Object> deleteConditions_1 = new HashMap<>();
        deleteConditions_1.put("name", "b");
        deleteRecordsByEquality(table, deleteConditions_1);

        Map<String, Object> deleteConditions_2 = new HashMap<>();
        deleteConditions_2.put("id", 3);
        deleteConditions_2.put("name", "c");
        deleteRecordsByEquality(table, deleteConditions_2);

        taskWriter = createNewWriter(table);
        taskWriter.write(createRecord(table, 5, "e", LocalDate.parse("2025-01-05")));
        taskWriter.write(createRecord(table, 6, "f", LocalDate.parse("2025-01-06")));
        commit(table, taskWriter.complete().dataFiles());

        Map<String, Object> deleteConditions_3 = new HashMap<>();
        deleteConditions_3.put("name", "f");
        deleteRecordsByEquality(table, deleteConditions_3);
    }

    private static void commit(Table table, DataFile[] dataFiles) {
        AppendFiles appendFiles = table.newAppend();
        Arrays.stream(dataFiles).forEach(appendFiles::appendFile);
        appendFiles.commit();
    }

    private static Record createRecord(Table table, int id, String name, LocalDate bir) {
        Record record = GenericRecord.create(table.schema());
        record.setField("id", id);
        record.setField("name", name);
        record.setField("bir", bir);
        return record;
    }


    private static TaskWriter<Record> createNewWriter(Table table) {
        Map<String, String> tableProps = Maps.newHashMap(table.properties());
        FileFormat format = FileFormat.fromString(TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);

        long targetFileSize =
                PropertyUtil.propertyAsLong(
                        tableProps,
                        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
                        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);

        Set<Integer> identifierFieldIds = table.schema().identifierFieldIds();

        FileAppenderFactory<Record> appenderFactory;
        if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
            appenderFactory =
                    new GenericAppenderFactory(table.schema(), table.spec(), null, null, null)
                            .setAll(tableProps);
        } else {
            appenderFactory =
                    new GenericAppenderFactory(
                            table.schema(),
                            table.spec(),
                            Ints.toArray(identifierFieldIds),
                            TypeUtil.select(table.schema(), Sets.newHashSet(identifierFieldIds)),
                            null)
                            .setAll(tableProps);
        }

        OutputFileFactory fileFactory =
                OutputFileFactory.builderFor(table, 1, System.currentTimeMillis())
                        .defaultSpec(table.spec())
                        .operationId(UUID.randomUUID().toString())
                        .format(format)
                        .build();
        TaskWriter taskWriter;

        try {
            if (table.spec().isUnpartitioned()) {
                taskWriter =
                        new UnpartitionedWriter<>(
                                table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize);
            } else {
                taskWriter =
                        new PartitionedAppendWriter(
                                table.spec(),
                                format,
                                appenderFactory,
                                fileFactory,
                                table.io(),
                                targetFileSize,
                                table.schema());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return taskWriter;
    }


    private static void deleteRecordsByEquality(Table table, Map<String, Object> fieldValues) {
        Transaction txn = table.newTransaction();
        try {
            List<String> eqFields = new ArrayList<>(fieldValues.keySet());
            Record deleteRec = GenericRecord.create(table.schema().select(eqFields));

            // Set all field values in the delete record
            fieldValues.forEach(deleteRec::setField);

            PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema().select(eqFields));
            partitionKey.partition(deleteRec);

            String partitionPath = table.spec().partitionToPath(partitionKey);
            String deletePath = String.format("%s/data/%s/delete-%s.parquet", table.location(), partitionPath, UUID.randomUUID());

            OutputFile out = table.io().newOutputFile(deletePath);

            EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
                    .forTable(table)
                    .withSpec(table.spec())
                    .rowSchema(table.schema().select(eqFields))
                    .withPartition(partitionKey)
                    .createWriterFunc(GenericParquetWriter::buildWriter)
                    .equalityFieldIds(getFieldIds(table, eqFields))
                    .buildEqualityWriter();

            writer.write(deleteRec);
            writer.close();
            DeleteFile df = writer.toDeleteFile();

            txn.newRowDelta().addDeletes(df).commit();
            txn.commitTransaction();
            System.out.println("Equality delete file written successfully.");
        } catch (Exception e) {
            throw new RuntimeException("Failure writing equality deletes", e);
        }
    }

    private static List<Integer> getFieldIds(Table table, List<String> names) {
        return names.stream().map(n -> table.schema().findField(n).fieldId()).collect(Collectors.toList());
    }

    private static class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {

        private final PartitionKey partitionKey;
        private final InternalRecordWrapper wrapper;

        PartitionedAppendWriter(
                PartitionSpec spec,
                FileFormat format,
                FileAppenderFactory<Record> appenderFactory,
                OutputFileFactory fileFactory,
                FileIO io,
                long targetFileSize,
                Schema schema) {
            super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
            this.partitionKey = new PartitionKey(spec, schema);
            this.wrapper = new InternalRecordWrapper(schema.asStruct());
        }

        @Override
        protected PartitionKey partition(Record row) {
            partitionKey.partition(wrapper.wrap(row));
            return partitionKey;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants