Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Make manifest file writes and commits more efficient #32666

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* Optimized Spark Runner parDo transform evaluator (Java) ([#32537](https://github.com/apache/beam/issues/32537))
* [Managed Iceberg] More efficient manifest file writes/commits ([#32666](https://github.com/apache/beam/issues/32666))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand All @@ -31,6 +32,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -73,6 +75,12 @@ private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
private final Counter dataFilesCommitted =
Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted");
private final Distribution committedDataFileByteSize =
Metrics.distribution(RecordWriter.class, "committedDataFileByteSize");
private final Distribution committedDataFileRecordCount =
Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount");

private final IcebergCatalogConfig catalogConfig;

Expand All @@ -94,18 +102,28 @@ public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
if (!element.getValue().iterator().hasNext()) {
String tableStringIdentifier = element.getKey();
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendManifest(writtenFile.getManifestFile());
long numFiles = 0;
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.spec());
update.appendFile(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
numFiles++;
}
// this commit will create a ManifestFile. we don't need to manually create one.
update.commit();
dataFilesCommitted.inc(numFiles);

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

Expand All @@ -32,12 +31,11 @@
abstract class FileWriteResult {

private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
private transient @MonotonicNonNull ManifestFile cachedManifestFile;
private transient @MonotonicNonNull DataFile cachedDataFile;

abstract String getTableIdentifierString();

@SuppressWarnings("mutable")
abstract byte[] getManifestFileBytes();
abstract SerializableDataFile getSerializableDataFile();

@SchemaIgnore
public TableIdentifier getTableIdentifier() {
Expand All @@ -48,15 +46,11 @@ public TableIdentifier getTableIdentifier() {
}

@SchemaIgnore
public ManifestFile getManifestFile() {
if (cachedManifestFile == null) {
try {
cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
} catch (IOException exc) {
throw new RuntimeException("Error decoding manifest file bytes");
}
public DataFile getDataFile(PartitionSpec spec) {
if (cachedDataFile == null) {
cachedDataFile = getSerializableDataFile().createDataFile(spec);
}
return cachedManifestFile;
return cachedDataFile;
}

public static Builder builder() {
Expand All @@ -68,18 +62,13 @@ abstract static class Builder {

abstract Builder setTableIdentifierString(String tableIdString);

abstract Builder setManifestFileBytes(byte[] manifestFileBytes);
abstract Builder setSerializableDataFile(SerializableDataFile dataFile);

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
}

@SchemaIgnore
public Builder setManifestFile(ManifestFile manifestFile) throws IOException {
return setManifestFileBytes(ManifestFiles.encode(manifestFile));
}

public abstract FileWriteResult build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public WriteRows to(DynamicDestinations destinations) {
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
* to the respective table. Each append operation creates a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -38,9 +37,8 @@
class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final Distribution dataFileByteSize =
Metrics.distribution(RecordWriter.class, "dataFileByteSize");
Metrics.counter(RecordWriter.class, "activeIcebergWriters");
private final Counter dataFilesWritten = Metrics.counter(RecordWriter.class, "dataFilesWritten");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -128,7 +126,7 @@ public void close() throws IOException {
dataFile.recordCount(),
dataFile.fileSizeInBytes(),
absoluteFilename);
dataFileByteSize.update(dataFile.fileSizeInBytes());
dataFilesWritten.inc();
}

public long bytesWritten() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
Expand All @@ -38,17 +36,12 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;

/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
Expand All @@ -66,19 +59,13 @@
*
* <ol>
* <li>Close all underlying {@link RecordWriter}s
* <li>Collect all {@link DataFile}s
* <li>Create a new {@link ManifestFile} referencing these {@link DataFile}s
* <li>Collect all {@link DataFile}s as {@link SerializableDataFile}s (a more Beam-friendly type)
* </ol>
*
* <p>After closing, the resulting {@link ManifestFile}s can be retrieved using {@link
* #getManifestFiles()}.
* <p>After closing, the resulting {@link SerializableDataFile}s can be retrieved using {@link
* #getSerializableDataFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
private final Counter dataFilesWritten =
Metrics.counter(RecordWriterManager.class, "dataFilesWritten");
private final Counter manifestFilesWritten =
Metrics.counter(RecordWriterManager.class, "manifestFilesWritten");

/**
* Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per
* partition and manages them in a {@link Cache}.
Expand All @@ -90,11 +77,9 @@ class DestinationState {
private final PartitionSpec spec;
private final org.apache.iceberg.Schema schema;
private final PartitionKey partitionKey;
private final String tableLocation;
private final FileIO fileIO;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
private final List<DataFile> dataFiles = Lists.newArrayList();
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Cache<PartitionKey, RecordWriter> writers;
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();

Expand All @@ -103,8 +88,6 @@ class DestinationState {
this.schema = table.schema();
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.tableLocation = table.location();
this.fileIO = table.io();
this.table = table;

// build a cache of RecordWriters.
Expand All @@ -128,8 +111,7 @@ class DestinationState {
e);
}
openWriters--;
dataFiles.add(recordWriter.getDataFile());
dataFilesWritten.inc();
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
})
.build();
}
Expand Down Expand Up @@ -191,13 +173,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
e);
}
}

private String getManifestFileLocation(PaneInfo paneInfo) {
return FileFormat.AVRO.addExtension(
String.format(
"%s/metadata/%s-%s-%s.manifest",
tableLocation, filePrefix, stateToken, paneInfo.getIndex()));
}
}

private final Catalog catalog;
Expand All @@ -209,8 +184,8 @@ private String getManifestFileLocation(PaneInfo paneInfo) {
@VisibleForTesting
final Map<WindowedValue<IcebergDestination>, DestinationState> destinations = Maps.newHashMap();

private final Map<WindowedValue<IcebergDestination>, List<ManifestFile>> totalManifestFiles =
Maps.newHashMap();
private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();

private boolean isClosed = false;

Expand Down Expand Up @@ -249,7 +224,6 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
public void close() throws IOException {
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
windowedDestinationAndState : destinations.entrySet()) {
WindowedValue<IcebergDestination> windowedDestination = windowedDestinationAndState.getKey();
DestinationState state = windowedDestinationAndState.getValue();

// removing writers from the state's cache will trigger the logic to collect each writer's
Expand All @@ -259,21 +233,8 @@ public void close() throws IOException {
continue;
}

OutputFile outputFile =
state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane()));

ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(state.spec, outputFile)) {
openWriter.addAll(state.dataFiles);
manifestWriter = openWriter;
}
ManifestFile manifestFile = manifestWriter.toManifestFile();
manifestFilesWritten.inc();

totalManifestFiles
.computeIfAbsent(windowedDestination, dest -> Lists.newArrayList())
.add(manifestFile);

totalSerializableDataFiles.put(
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
state.dataFiles.clear();
}
destinations.clear();
Expand All @@ -285,15 +246,16 @@ public void close() throws IOException {
}

/**
* Returns a list of accumulated windowed {@link ManifestFile}s for each windowed {@link
* Returns a list of accumulated serializable {@link DataFile}s for each windowed {@link
* IcebergDestination}. The {@link RecordWriterManager} must first be closed before this is
* called.
*/
public Map<WindowedValue<IcebergDestination>, List<ManifestFile>> getManifestFiles() {
public Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
getSerializableDataFiles() {
checkState(
isClosed,
"Please close this %s before retrieving its manifest files.",
"Please close this %s before retrieving its data files.",
getClass().getSimpleName());
return totalManifestFiles;
return totalSerializableDataFiles;
}
}
Loading
Loading