Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void processElement(
}

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
// in the rare case we get a batch that contains multiple partition specs, we
// will group
// data into manifest files and append.
// note: either way, we must use a single commit operation for atomicity.
if (containsMultiplePartitionSpecs(fileWriteResults)) {
Expand Down Expand Up @@ -163,11 +164,14 @@ private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteRes
update.commit();
}

// When a user updates their table partition spec during runtime, we can end up with
// a batch of files where some are written with the old spec and some are written with the new
// When a user updates their table partition spec during runtime, we can end up
// with
// a batch of files where some are written with the old spec and some are
// written with the new
// spec.
// A table commit is limited to a single partition spec.
// To handle this, we create a manifest file for each partition spec, and group data files
// To handle this, we create a manifest file for each partition spec, and group
// data files
// accordingly.
// Afterward, we append all manifests using a single commit operation.
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
Expand Down Expand Up @@ -211,14 +215,18 @@ private ManifestWriter<DataFile> createManifestWriter(
return ManifestFiles.write(spec, io.newOutputFile(location));
}

// If the process call fails immediately after a successful commit, it gets retried with
// If the process call fails immediately after a successful commit, it gets
// retried with
// the same data, possibly leading to data duplication.
// To mitigate, we skip the current batch of files if it matches the most recently committed
// To mitigate, we skip the current batch of files if it matches the most
// recently committed
// batch.
//
// TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the
// "last successful snapshot" might reflect commits from other sources. Ideally, we would make
// this stateful, but that is update incompatible.
// TODO(ahmedabu98): This does not cover concurrent writes from other pipelines,
// where the
// "last successful snapshot" might reflect commits from other sources. Ideally,
// we would make
// this stateful, but that is update incompatible.
// TODO(ahmedabu98): add load test pipelines with intentional periodic crashing
private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResults) {
if (table.currentSnapshot() == null) {
Expand All @@ -231,8 +239,11 @@ private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResul
// Check if the current batch is identical to the most recently committed batch.
// Upstream GBK means we always get the same batch of files on retry,
// so a single overlapping file means the whole batch is identical.
String sampleCommittedDataFilePath =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString();
Iterable<DataFile> addedDataFiles = table.currentSnapshot().addedDataFiles(table.io());
if (!addedDataFiles.iterator().hasNext()) {
return false;
}
String sampleCommittedDataFilePath = addedDataFiles.iterator().next().location().toString();
for (FileWriteResult result : fileWriteResults) {
if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ abstract static class Builder {
static SerializableDataFile from(DataFile f, String partitionPath) {

return SerializableDataFile.builder()
.setPath(f.path().toString())
.setPath(f.location().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import static org.hamcrest.MatcherAssert.assertThat;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AppendFilesToTablesTest implements Serializable {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Rule
public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default");

@Rule public transient TestPipeline testPipeline = TestPipeline.create();

@Test
public void testAppendAfterDelete() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse.location)
.build();

IcebergCatalogConfig catalog =
IcebergCatalogConfig.builder()
.setCatalogName("name")
.setCatalogProperties(catalogProps)
.build();

// 1. Create table and write some data
testPipeline
.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));

testPipeline.run().waitUntilFinish();

// 2. Delete the data
Table table = warehouse.loadTable(tableId);
DeleteFiles delete = table.newDelete();
// Delete all data files in the current snapshot
table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile);
delete.commit();

// 3. Write more data
testPipeline
.apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append More To Table", IcebergIO.writeRows(catalog).to(tableId));

testPipeline.run().waitUntilFinish();

// Verify data
// We mainly want to verify that no exception is thrown during the write.
// The exact content might depend on how delete operations interact with
// subsequent appends
// which is not the focus of this test.
table.refresh();
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.hasItem(Matchers.anything()));
Comment on lines +94 to +101
Copy link
Contributor

@ahmedabu98 ahmedabu98 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writtenRecords should match the contents of TestFixtures.FILE1SNAPSHOT2

If we're not making that check here, then i think we should remove these lines as they don't add anything to the test

}
}
Loading