diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index eb73c1d86f..fdbf6fd51c 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -84,6 +84,9 @@ dependencies { api(project(":polaris-core")) + api(project(":polaris-storage-files-api")) + api(project(":polaris-storage-files-impl")) + api(project(":polaris-relational-jdbc")) api(project(":polaris-extensions-auth-opa")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index eb0a3477e6..fc4d861368 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -91,6 +91,7 @@ mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5 opentelemetry-bom = { module = "io.opentelemetry:opentelemetry-bom", version = "1.57.0" } opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version= "2.20.1-alpha" } opentelemetry-semconv = { module = "io.opentelemetry.semconv:opentelemetry-semconv", version = "1.37.0" } +nessie-bom = { module = "org.projectnessie.nessie:nessie-bom", version = "0.106.0" } picocli = { module = "info.picocli:picocli-codegen", version.ref = "picocli" } picocli-codegen = { module = "info.picocli:picocli-codegen", version.ref = "picocli" } postgresql = { module = "org.postgresql:postgresql", version = "42.7.8" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 3706c6f132..9e3b375430 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -45,6 +45,9 @@ polaris-extensions-federation-hive=extensions/federation/hive polaris-extensions-auth-opa=extensions/auth/opa/impl polaris-extensions-auth-opa-tests=extensions/auth/opa/tests +polaris-storage-files-api=storage/files/api +polaris-storage-files-impl=storage/files/impl + polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator polaris-config-docs-site=tools/config-docs/site diff --git a/storage/files/README.md b/storage/files/README.md new file mode 100644 index 0000000000..fe67fedc16 --- /dev/null +++ b/storage/files/README.md @@ -0,0 +1,70 @@ + + +# Polaris object store operations + +API and implementations to perform long-running operations against object stores, mostly to purge files. + +Functionalities to scan an object store and to purge files are separated. Filter mechanisms are used to +select the files to be deleted (purged). + +There are implementations to identify the files referenced by a particular Iceberg table or view metadata, including +statistics files, manifest lists of all snapshots, the manifest files and the data/delete files. + +The file operations perform no effort to identify duplicates during the identification of files referenced by +a table or view metadata. +This means that, for example, a data file referenced in multiple manifest files will be returned twice. + +Purge operations are performed in one or multiple bulk delete operations. +The implementation takes care of not including the same file more than once within a single bulk delete operation. + +One alternative implementation purges all files within the base location of a table or view metadata. + +All implemented operations are designed to be resilient against failures as those are expected to be run as +maintenance operations or as part of such. +The operations are implemented to continue in case of errors and eventually succeed instead of failing eagerly. +Maintenance operations are usually not actively observed, and manually fixing consistency issues in object +stores is not a straightforward task for users. + +# Potential future enhancements + +The operations provided by `FileOperations` are meant for maintenance operations, which are not +time- or performance-critical. +It is more important that the operations are resilient against failures, do not add unnecessary CPU or heap pressure +and eventually succeed. +Further, maintenance operations should not eat up too much I/O bandwidth to not interfere with other user-facing +operations. + +Depending on the overall load of the system, it might be worth running some operations in parallel. + +# Code architecture + +The code is split in two modules. One for the (Polaris internal) API interfaces and one for the implementations. + +Tests against various object store implementations are included as unit tests using an on-heap object-store-mock +and as integration tests against test containers for S3, GCS and ADLS. +The object-store-mock used in unit tests is also used to validate the low heap-pressure required by the +implementations. + +The actual object store interaction of the current implementation is delegated to Iceberg `FileIO` implementations. +Only `FileIO` implementations that support prefix-operations (`SupportsPrefixOperations` interface) and +bulk-operations (`SupportsBulkOperations` interface) are currently supported. +The `FileIO` implementations `S3FileIO`, `GCSFileIO` and `ADLSFileIO` support both. +Beside the necessary `FileIO` usage in `FileOperationsFactory`, none of the API functions refer to `FileIO` +to allow the API to be implemented against other, more tailored object store backend implementations. diff --git a/storage/files/api/build.gradle.kts b/storage/files/api/build.gradle.kts new file mode 100644 index 0000000000..0124f88cf2 --- /dev/null +++ b/storage/files/api/build.gradle.kts @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-server") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + + implementation(libs.guava) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + compileOnly(libs.jakarta.annotation.api) +} + +tasks.named("javadoc") { dependsOn("jandex") } diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileFilter.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileFilter.java new file mode 100644 index 0000000000..2f8c954a71 --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileFilter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import java.util.function.Predicate; + +@FunctionalInterface +public interface FileFilter extends Predicate { + + static FileFilter alwaysTrue() { + return fileSpec -> true; + } + + static FileFilter alwaysFalse() { + return fileSpec -> false; + } +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java new file mode 100644 index 0000000000..f500b1db9e --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import jakarta.annotation.Nonnull; +import java.util.stream.Stream; + +/** + * Object storage file operations, used to find files below a given prefix, to purge files, to + * identify referenced files, etc. + * + *

All functions of this interface rather yield incomplete results and continue over throwing + * exceptions. + */ +public interface FileOperations { + /** + * Find files that match the given prefix and filter. + * + *

Whether existing but inaccessible files are included in the result depends on the object + * store. + * + *

Call sites should consider rate-limiting the scan operations, for example, by using Guava's + * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on + * the returned stream. + * + * @param prefix full object storage URI prefix, including scheme and bucket. + * @param filter file filter + * @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link + * FileSpec#size()} attributes populated with the information provided by the object store. + * The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link + * FileSpec#guessTypeFromName() guessed}. + */ + Stream findFiles(@Nonnull String prefix, @Nonnull FileFilter filter); + + /** + * Identifies all files referenced by the given table-metadata. + * + *

In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, the returned stream will just not include those. + * + *

Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param tableMetadataLocation Iceberg table-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have to be derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate); + + /** + * Identifies all files referenced by the given view-metadata. + * + *

In case "container" files like the metadata are not readable, the returned stream will just + * not include those. + * + *

Rate-limiting the returned stream is recommended when identifying multiple tables and/or + * views. Rate-limiting on a single invocation may not be effective as expected. + * + * @param viewMetadataLocation Iceberg view-metadata location + * @param deduplicate if true, attempt to deduplicate files by their location, adding additional + * heap pressure to the operation. Implementations may ignore this parameter or may not + * deduplicate all identified files. + * @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()} + * attribute is usually not populated, as it would have been derived from user-provided + * information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated + * based on where a file appears during identification. + */ + Stream identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate); + + /** + * Purges all files that are referenced by the given table-metadata, respecting the given filter. + * + *

In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + *

This is effectively a convenience for {@code + * purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergTableFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given table-metadata, purge only + * files that match the given filter. + * + *

In case "container" files, like the metadata, manifest-list or manifest files, are not + * readable, those files are just ignored. + * + *

This is effectively a convenience for {@code + * purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are referenced by the given view-metadata, respecting the given filter. * + * + *

In case "container" files like the metadata are not readable, those files are just ignored. + * + *

This is effectively a convenience for {@code + * purge(identifyIcebergViewFiles(tableMetadataLocation).filter(fileFilter))} + * + * @see #purge(Stream, PurgeSpec) + * @see #identifyIcebergViewFiles(String, boolean) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that are within the base location of the given view-metadata, purge only files + * that match the given filter. * + * + *

In case "container" files like the metadata are not readable, those files are just ignored. + * + *

This is effectively a convenience for {@code + * purge(findFiles(viewMetadata.baseLocation()).filter(fileFilter))} + * + * @see #purge(Stream, PurgeSpec) + * @see #findFiles(String, FileFilter) + */ + PurgeStats purgeIcebergViewBaseLocation( + @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec); + + /** + * Purges all files that match the given stream of locations. The {@link Stream} will be fully + * consumed. + * + *

This is a convenience for {@link #purgeFiles(Stream, PurgeSpec) + * purgeFiles(locationStream.map(FileSpec::location))} + */ + PurgeStats purge(@Nonnull Stream locationStream, PurgeSpec purgeSpec); + + /** + * Purges all files from the given stream of locations. The {@link Stream} will be fully consumed. + * + *

Non-existing files and other deletion errors will not let the call fail, which makes it + * resilient against transient or irrelevant errors. + */ + PurgeStats purgeFiles(@Nonnull Stream locationStream, PurgeSpec purgeSpec); +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperationsFactory.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperationsFactory.java new file mode 100644 index 0000000000..168459ba1e --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperationsFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import org.apache.iceberg.io.FileIO; + +/** + * Factory to create {@link FileOperations} instances to perform object storage related maintenance + * operations. + */ +public interface FileOperationsFactory { + /** + * Create a {@link FileOperations} instance for the given {@link FileIO} instance. + * + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ + FileOperations createFileOperations(FileIO fileIO); +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java new file mode 100644 index 0000000000..8a71d42c58 --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.polaris.immutables.PolarisImmutable; + +/** + * Describes a single file/object in an object storage. + * + *

Not all attributes are populated by every {@link FileOperations} function. + */ +@PolarisImmutable +public interface FileSpec { + + /** The full object storage URI. */ + String location(); + + /** + * The type of the file, if known. + * + * @see #guessTypeFromName() + */ + Optional fileType(); + + /** The size of the file in bytes, if available. */ + OptionalLong size(); + + /** The creation timestamp in milliseconds since the epoch, if available. */ + OptionalLong createdAtMillis(); + + static Builder builder() { + return ImmutableFileSpec.builder(); + } + + static Builder fromLocation(String location) { + return builder().location(location); + } + + static Builder fromLocationAndSize(String location, long size) { + var b = fromLocation(location); + if (size > 0L) { + b.size(size); + } + return b; + } + + default FileType guessTypeFromName() { + var location = location(); + var lastSlash = location.lastIndexOf('/'); + var fileName = lastSlash > 0 ? location.substring(lastSlash + 1) : location; + + if (fileName.contains(".metadata.json")) { + return FileType.ICEBERG_METADATA; + } else if (fileName.startsWith("snap-")) { + return FileType.ICEBERG_MANIFEST_LIST; + } else if (fileName.contains("-m")) { + return FileType.ICEBERG_MANIFEST_FILE; + } + return FileType.UNKNOWN; + } + + interface Builder { + @CanIgnoreReturnValue + Builder fileType(FileType fileType); + + @CanIgnoreReturnValue + Builder location(String location); + + @CanIgnoreReturnValue + Builder size(long size); + + @CanIgnoreReturnValue + Builder createdAtMillis(long createdAtMillis); + + FileSpec build(); + } +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java new file mode 100644 index 0000000000..0df61534be --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import org.apache.iceberg.ContentFile; + +public enum FileType { + UNKNOWN(false, false), + ICEBERG_METADATA(true, false), + ICEBERG_STATISTICS(false, false), + ICEBERG_MANIFEST_LIST(false, false), + ICEBERG_MANIFEST_FILE(false, false), + ICEBERG_DATA_FILE(false, true), + ICEBERG_DELETE_FILE(false, true), + ; + + private final boolean metadata; + private final boolean dataOrDelete; + + FileType(boolean metadata, boolean dataOrDelete) { + this.metadata = metadata; + this.dataOrDelete = dataOrDelete; + } + + @SuppressWarnings("UnnecessaryDefault") + public static FileType fromContentFile(ContentFile> contentFile) { + return switch (contentFile.content()) { + case DATA -> ICEBERG_DATA_FILE; + case EQUALITY_DELETES, POSITION_DELETES -> ICEBERG_DELETE_FILE; + default -> UNKNOWN; + }; + } + + public boolean metadata() { + return metadata; + } + + public boolean dataOrDelete() { + return dataOrDelete; + } +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java new file mode 100644 index 0000000000..d2f8d86a34 --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.OptionalDouble; +import java.util.function.Consumer; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@SuppressWarnings("unused") +@PolarisImmutable +public interface PurgeSpec { + PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build(); + + @Value.Default + default FileFilter fileFilter() { + return FileFilter.alwaysTrue(); + } + + PurgeSpec withFileFilter(FileFilter fileFilter); + + /** + * Delete batch size for purge/batch-deletion operations. Implementations may opt to ignore this + * parameter and enforce a reasonable or required different limit. + */ + @Value.Default + default int deleteBatchSize() { + return 250; + } + + PurgeSpec withDeleteBatchSize(int deleteBatchSize); + + /** + * Callback being invoked right before a file location is being submitted to be purged. + * + *

Due to API constraints of {@link + * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable)} it's barely possible to + * identify files that failed a deletion. + */ + @Value.Default + default Consumer purgeIssuedCallback() { + return location -> {}; + } + + PurgeSpec withPurgeIssuedCallback(Consumer purgeIssuedCallback); + + /** + * Optional rate-limit on the number of individual file-deletions per second. + * + *

This setting is usually similar to using {@link #batchDeletesPerSecond()} times {@link + * #deleteBatchSize()}, unless the implementaiton opted to choose a different batch size. + * + * @see #batchDeletesPerSecond() + */ + OptionalDouble fileDeletesPerSecond(); + + PurgeSpec withFileDeletesPerSecond(OptionalDouble fileDeletesPerSecond); + + PurgeSpec withFileDeletesPerSecond(double fileDeletesPerSecond); + + /** + * Optional rate-limit on batch-delete operations per second + * + * @see #fileDeletesPerSecond() + */ + OptionalDouble batchDeletesPerSecond(); + + PurgeSpec withBatchDeletesPerSecond(OptionalDouble batchDeletesPerSecond); + + PurgeSpec withBatchDeletesPerSecond(double batchDeletesPerSecond); + + static ImmutablePurgeSpec.Builder builder() { + return ImmutablePurgeSpec.builder(); + } + + @Value.Check + default void check() { + checkState(deleteBatchSize() > 0, "deleteBatchSize must be positive"); + } +} diff --git a/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java new file mode 100644 index 0000000000..d8af4d9f88 --- /dev/null +++ b/storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.api; + +import java.time.Duration; +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +public interface PurgeStats { + Duration duration(); + + /** + * The number of purged files. + * + *

The returned value may be wrong and include non-existing files. + */ + long purgedFiles(); + + /** + * Number of files that were not purged. + * + *

The returned value may be wrong and not include non-existing files. + */ + long failedPurges(); +} diff --git a/storage/files/impl/build.gradle.kts b/storage/files/impl/build.gradle.kts new file mode 100644 index 0000000000..c7082e633c --- /dev/null +++ b/storage/files/impl/build.gradle.kts @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-server") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(project(":polaris-storage-files-api")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") + + implementation(libs.guava) + implementation(libs.slf4j.api) + + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.validation.api) + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.enterprise.cdi.api) + + implementation(platform(libs.nessie.bom)) + implementation("org.projectnessie.nessie:nessie-storage-uri") + + runtimeOnly("org.apache.iceberg:iceberg-aws") + runtimeOnly(platform(libs.awssdk.bom)) + runtimeOnly("software.amazon.awssdk:sts") + runtimeOnly("software.amazon.awssdk:iam-policy-builder") + runtimeOnly("software.amazon.awssdk:s3") + runtimeOnly("software.amazon.awssdk:kms") + + runtimeOnly("org.apache.iceberg:iceberg-azure") + runtimeOnly(platform(libs.azuresdk.bom)) + runtimeOnly("com.azure:azure-storage-blob") + runtimeOnly("com.azure:azure-storage-common") + runtimeOnly("com.azure:azure-identity") + runtimeOnly("com.azure:azure-storage-file-datalake") + + runtimeOnly("org.apache.iceberg:iceberg-gcp") + runtimeOnly(platform(libs.google.cloud.storage.bom)) + runtimeOnly("com.google.cloud:google-cloud-storage") + + testFixturesApi(project(":polaris-storage-files-api")) + + testFixturesApi(platform(libs.nessie.bom)) + testImplementation("org.projectnessie.nessie:nessie-object-storage-mock") + testFixturesApi("org.projectnessie.nessie:nessie-catalog-format-iceberg") + + testFixturesApi("com.fasterxml.jackson.core:jackson-core") + testFixturesApi("com.fasterxml.jackson.core:jackson-databind") + testFixturesApi(platform(libs.jackson.bom)) + testRuntimeOnly("org.junit.platform:junit-platform-launcher") + + testFixturesApi(platform(libs.iceberg.bom)) + testFixturesApi("org.apache.iceberg:iceberg-api") + testFixturesApi("org.apache.iceberg:iceberg-core") + testFixturesApi("org.apache.iceberg:iceberg-aws") + testFixturesApi("org.apache.iceberg:iceberg-azure") + testFixturesApi("org.apache.iceberg:iceberg-gcp") + + testFixturesRuntimeOnly("software.amazon.awssdk:url-connection-client") + + compileOnly(libs.jakarta.annotation.api) +} + +tasks.named("javadoc") { dependsOn("jandex") } + +tasks.withType { + isFailOnError = false + options.memberLevel = JavadocMemberLevel.PACKAGE +} + +testing { + suites { + val intTest by + registering(JvmTestSuite::class) { + dependencies { + implementation(project(":polaris-storage-files-api")) + + implementation("org.projectnessie.nessie:nessie-azurite-testcontainer") + implementation("org.projectnessie.nessie:nessie-gcs-testcontainer") + implementation(project(":polaris-minio-testcontainer")) + + implementation("org.apache.iceberg:iceberg-aws") + implementation("org.apache.iceberg:iceberg-gcp") + implementation("org.apache.iceberg:iceberg-azure") + } + } + } +} diff --git a/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/BaseITFileOperationsImpl.java b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/BaseITFileOperationsImpl.java new file mode 100644 index 0000000000..0ae332b935 --- /dev/null +++ b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/BaseITFileOperationsImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import java.util.stream.IntStream; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.junit.jupiter.api.Test; + +public abstract class BaseITFileOperationsImpl extends BaseFileOperationsImpl { + + /** Verify that batch-deletions do not fail in case some files do not exist. */ + @Test + public void batchDeleteNonExistentFiles() throws Exception { + try (var fileIO = initializedFileIO()) { + var prefix = prefix() + "batchDeleteNonExistentFiles/"; + + write(fileIO, prefix + "1", new byte[1]); + write(fileIO, prefix + "3", new byte[1]); + write(fileIO, prefix + "5", new byte[1]); + + var fileOps = new FileOperationsImpl(fileIO); + var result = + fileOps.purgeFiles( + IntStream.range(0, 10).mapToObj(i -> prefix + i), PurgeSpec.DEFAULT_INSTANCE); + soft.assertThat(result) + .extracting(PurgeStats::purgedFiles, PurgeStats::failedPurges) + // Iceberg does not yield the correct number of purged files, 3/7 in this test (via + // `BulkDeletionFailureException`) in case those do not exist. + .containsExactly(10L, 0L); + + result = + fileOps.purgeFiles( + IntStream.range(20, 40).mapToObj(i -> prefix + i), PurgeSpec.DEFAULT_INSTANCE); + soft.assertThat(result) + .extracting(PurgeStats::purgedFiles, PurgeStats::failedPurges) + // Iceberg does not yield the correct number of purged files, 0/20 in this test (via + // `BulkDeletionFailureException`) in case those do not exist. + .containsExactly(20L, 0L); + + result = + fileOps.purgeFiles( + IntStream.range(40, 60).mapToObj(i -> prefix + "40"), PurgeSpec.DEFAULT_INSTANCE); + soft.assertThat(result) + .extracting(PurgeStats::purgedFiles, PurgeStats::failedPurges) + // Iceberg does not yield the correct number of purged files, 0/1 in this test (via + // `BulkDeletionFailureException`) in case those do not exist. + .containsExactly(1L, 0L); + } + } + + @Test + public void purgeIcebergTable() throws Exception { + try (var fileIO = initializedFileIO()) { + var prefix = prefix() + "purgeIcebergTable/"; + var fixtures = new IcebergFixtures(prefix, 3, 3, 3); + var tableMetadataPath = fixtures.prefix + "foo.metadata.json"; + + write(fileIO, tableMetadataPath, fixtures.tableMetadataBytes); + for (var snapshotId = 1; snapshotId <= fixtures.numSnapshots; snapshotId++) { + write( + fileIO, + fixtures.manifestListPath(snapshotId), + fixtures.serializedManifestList(snapshotId)); + + for (int mf = 0; mf < fixtures.numManifestFiles; mf++) { + var manifestFilePath = fixtures.manifestFilePath(snapshotId, mf); + write( + fileIO, + manifestFilePath, + fixtures.serializedManifestFile(snapshotId, mf, manifestFilePath)); + } + } + + var fileOps = new FileOperationsImpl(fileIO); + + var purgeStats = fileOps.purgeIcebergTable(tableMetadataPath, PurgeSpec.DEFAULT_INSTANCE); + + assertThat(purgeStats.purgedFiles()) + // 1st "1" --> metadata-json + // 2nd "1" --> manifest-list + // 3rd "1" --> manifest-file + .isEqualTo( + 1 + + fixtures.numSnapshots + * (1 + (long) fixtures.numManifestFiles * (1 + fixtures.numDataFiles))); + } + } + + @Test + public void icebergIntegration() throws Exception { + try (var fileIO = initializedFileIO()) { + icebergIntegration(fileIO, icebergProperties()); + } + } + + public FileIO initializedFileIO() { + var fileIO = createFileIO(); + fileIO.initialize(icebergProperties()); + return fileIO; + } + + protected abstract FileIO createFileIO(); + + protected abstract Map icebergProperties(); +} diff --git a/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithADLS.java b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithADLS.java new file mode 100644 index 0000000000..1da107d058 --- /dev/null +++ b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithADLS.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import java.util.Map; +import org.apache.iceberg.azure.adlsv2.ADLSFileIO; +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.testing.azurite.Azurite; +import org.projectnessie.testing.azurite.AzuriteAccess; +import org.projectnessie.testing.azurite.AzuriteExtension; + +@ExtendWith(AzuriteExtension.class) +public class ITFileOperationsImplWithADLS extends BaseITFileOperationsImpl { + + @Azurite static AzuriteAccess azuriteAccess; + + @Override + protected Map icebergProperties() { + return azuriteAccess.icebergProperties(); + } + + @Override + protected String prefix() { + return azuriteAccess.location(""); + } + + @Override + protected FileIO createFileIO() { + return new ADLSFileIO(); + } + + @Override + @Disabled("Azurite is incompatible with ADLS v2 list-prefix REST endpoint") + public void icebergIntegration() throws Exception { + super.icebergIntegration(); + } +} diff --git a/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithGCS.java b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithGCS.java new file mode 100644 index 0000000000..90807784e8 --- /dev/null +++ b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithGCS.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import java.util.Map; +import org.apache.iceberg.gcp.gcs.GCSFileIO; +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.testing.gcs.Gcs; +import org.projectnessie.testing.gcs.GcsAccess; +import org.projectnessie.testing.gcs.GcsExtension; + +@ExtendWith(GcsExtension.class) +public class ITFileOperationsImplWithGCS extends BaseITFileOperationsImpl { + + @Gcs static GcsAccess gcsAccess; + + @Override + protected Map icebergProperties() { + return gcsAccess.icebergProperties(); + } + + @Override + protected String prefix() { + return gcsAccess.bucketUri().toString(); + } + + @Override + protected FileIO createFileIO() { + return new GCSFileIO(); + } +} diff --git a/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithS3.java b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithS3.java new file mode 100644 index 0000000000..008580d289 --- /dev/null +++ b/storage/files/impl/src/intTest/java/org/apache/polaris/storage/files/impl/ITFileOperationsImplWithS3.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.test.minio.Minio; +import org.apache.polaris.test.minio.MinioAccess; +import org.apache.polaris.test.minio.MinioExtension; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(MinioExtension.class) +public class ITFileOperationsImplWithS3 extends BaseITFileOperationsImpl { + + @Minio(region = "eu-central-1") + private static MinioAccess minioAccess; + + @Override + protected Map icebergProperties() { + var properties = new HashMap<>(minioAccess.icebergProperties()); + properties.put("client.region", "eu-central-1"); + return properties; + } + + @Override + protected FileIO createFileIO() { + return new S3FileIO(); + } + + @Override + protected String prefix() { + return format("s3://%s/", minioAccess.bucket()); + } +} diff --git a/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsFactoryImpl.java b/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsFactoryImpl.java new file mode 100644 index 0000000000..f509d7d7f4 --- /dev/null +++ b/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsFactoryImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileOperationsFactory; + +/** CDI application-scoped implementation of {@link FileOperationsFactory}. */ +@ApplicationScoped +class FileOperationsFactoryImpl implements FileOperationsFactory { + + private final Clock clock; + + @Inject + @SuppressWarnings("CdiInjectionPointsInspection") + FileOperationsFactoryImpl(Clock clock) { + this.clock = clock; + } + + @Override + public FileOperations createFileOperations(FileIO fileIO) { + return new FileOperationsImpl(fileIO); + } +} diff --git a/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java b/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java new file mode 100644 index 0000000000..eed1697330 --- /dev/null +++ b/storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import com.google.common.collect.Streams; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileOperations; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.ImmutablePurgeStats; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.projectnessie.storage.uri.StorageUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link + * org.apache.iceberg.io.SupportsBulkOperations} and {@link + * org.apache.iceberg.io.SupportsPrefixOperations}. + */ +record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FileOperationsImpl.class); + + @Override + public Stream findFiles(@Nonnull String prefix, @Nonnull FileFilter filter) { + var prefixUri = StorageUri.of(prefix).resolve("/"); + if (fileIO instanceof SupportsPrefixOperations prefixOps) { + return Streams.stream(prefixOps.listPrefix(prefix).iterator()) + .filter(Objects::nonNull) + .map( + fileInfo -> { + var location = StorageUri.of(fileInfo.location()); + if (!location.isAbsolute()) { + // ADLSFileIO does _not_ include the prefix, but GCSFileIO and S3FileIO do. + location = prefixUri.resolve(location); + } + return FileSpec.builder() + .location(location.toString()) + .size(fileInfo.size()) + .createdAtMillis(fileInfo.createdAtMillis()) + .build(); + }) + .filter(filter); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting prefix operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @Override + public Stream identifyIcebergTableFiles( + @Nonnull String tableMetadataLocation, boolean deduplicate) { + var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + var metadata = metadataOpt.get(); + + var metadataFileSpec = + FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + var metadataFiles = Stream.of(metadataFileSpec); + + var statisticsFiles = metadata.statisticsFiles(); + if (statisticsFiles != null) { + var statisticsFileSpecs = + statisticsFiles.stream() + .map( + statisticsFile -> + FileSpec.fromLocationAndSize( + statisticsFile.path(), statisticsFile.fileSizeInBytes()) + .fileType(FileType.ICEBERG_STATISTICS) + .build()); + metadataFiles = Stream.concat(statisticsFileSpecs, metadataFiles); + } + + var previousFiles = metadata.previousFiles(); + if (previousFiles != null) { + metadataFiles = + Stream.concat( + metadataFiles, + previousFiles.stream() + .filter( + metadataLogEntry -> + metadataLogEntry.file() != null && !metadataLogEntry.file().isEmpty()) + .map( + metadataLogEntry -> + FileSpec.fromLocation(metadataLogEntry.file()) + .fileType(FileType.ICEBERG_METADATA) + .build())); + } + + var specsById = metadata.specsById(); + + var addPredicate = deduplicator(deduplicate); + + var manifestsAndDataFiles = + metadata.snapshots().stream() + // Newest snapshots first + .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), s1.timestampMillis())) + .flatMap( + snapshot -> identifyIcebergTableSnapshotFiles(snapshot, specsById, addPredicate)); + + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + return Stream.concat(manifestsAndDataFiles, metadataFiles); + } + + static Predicate deduplicator(boolean deduplicate) { + if (!deduplicate) { + return x -> true; + } + var set = new LinkedHashSet(); + return location -> { + synchronized (set) { + if (set.size() > 100_000) { + // limit the heap pressure of the deduplication set to 100,000 elements + set.removeFirst(); + } + return set.add(location); + } + }; + } + + Stream identifyIcebergTableSnapshotFiles( + @Nonnull Snapshot snapshot, + Map specsById, + Predicate addPredicate) { + var manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null && !addPredicate.test(manifestListLocation)) { + return Stream.empty(); + } + + return identifyIcebergManifests(manifestListLocation, snapshot, specsById, addPredicate); + } + + Stream identifyIcebergManifests( + String manifestListLocation, + Snapshot snapshot, + Map specsById, + Predicate addPredicate) { + + var manifestListFileSpecStream = Stream.empty(); + + if (manifestListLocation != null && !manifestListLocation.isEmpty()) { + var manifestListFileSpec = + FileSpec.fromLocation(manifestListLocation) + .fileType(FileType.ICEBERG_MANIFEST_LIST) + .build(); + manifestListFileSpecStream = Stream.of(manifestListFileSpec); + } + + try { + var allManifestsFiles = + snapshot.allManifests(fileIO).stream() + .filter(manifestFile -> addPredicate.test(manifestFile.path())) + .flatMap( + manifestFile -> + identifyIcebergManifestDataFiles(manifestFile, specsById, addPredicate)); + + // Return "dependencies" before the "metadata" itself, so a failed/aborted purge can be + // resumed. + return Stream.concat(allManifestsFiles, manifestListFileSpecStream); + } catch (Exception e) { + LOGGER.warn("Failure reading manifest list file {}: {}", manifestListLocation, e.toString()); + LOGGER.debug("Failure reading manifest list file {}", manifestListLocation); + return manifestListFileSpecStream; + } + } + + @SuppressWarnings("UnnecessaryDefault") + private Stream identifyIcebergManifestDataFiles( + ManifestFile manifestFile, + Map specsById, + Predicate addPredicate) { + + var manifestFileSpec = + FileSpec.fromLocationAndSize(manifestFile.path(), manifestFile.length()) + .fileType(FileType.ICEBERG_MANIFEST_FILE) + .build(); + + try (var contentFilesIter = + switch (manifestFile.content()) { + case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator(); + case DELETES -> + ManifestFiles.readDeleteManifest(manifestFile, fileIO, specsById).iterator(); + default -> { + LOGGER.warn( + "Unsupported content type {} in manifest {}", + manifestFile.content(), + manifestFile.path()); + yield CloseableIterator.>>empty(); + } + }) { + + // Cannot leverage streaming here and eagerly build a list, as the manifest-file reader needs + // to be closed. + var files = new ArrayList(); + while (contentFilesIter.hasNext()) { + var contentFile = contentFilesIter.next(); + if (addPredicate.test(contentFile.location())) { + files.add( + FileSpec.fromLocationAndSize(contentFile.location(), contentFile.fileSizeInBytes()) + .fileType(FileType.fromContentFile(contentFile)) + .build()); + } + } + // Return "dependencies" before the "metadata" itself, so the probability of being able to + // resume a failed/aborted purge is higher. + files.add(manifestFileSpec); + + return files.stream(); + } catch (IOException e) { + LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), e.toString()); + LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e); + return Stream.of(manifestFileSpec); + } + } + + @Override + public Stream identifyIcebergViewFiles( + @Nonnull String viewMetadataLocation, boolean deduplicate) { + var metadataOpt = readViewMetadataFailsafe(viewMetadataLocation); + if (metadataOpt.isEmpty()) { + return Stream.empty(); + } + + var metadataFileSpec = + FileSpec.fromLocation(viewMetadataLocation).fileType(FileType.ICEBERG_METADATA).build(); + + return Stream.of(metadataFileSpec); + } + + @Override + public PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergTableFiles(tableMetadataLocation, true).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergTableBaseLocation( + @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readTableMetadataFailsafe(tableMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var files = + identifyIcebergViewFiles(viewMetadataLocation, false).filter(purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purgeIcebergViewBaseLocation( + @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) { + var metadata = readViewMetadataFailsafe(viewMetadataLocation); + if (metadata.isEmpty()) { + return ImmutablePurgeStats.builder() + .duration(Duration.ZERO) + .purgedFiles(0L) + .failedPurges(1) + .build(); + } + + var baseLocation = metadata.get().location(); + var files = findFiles(baseLocation, purgeSpec.fileFilter()); + return purge(files, purgeSpec); + } + + @Override + public PurgeStats purge(@Nonnull Stream locationStream, PurgeSpec purgeSpec) { + return purgeFiles(locationStream.map(FileSpec::location), purgeSpec); + } + + @Override + public PurgeStats purgeFiles(@Nonnull Stream locationStream, PurgeSpec purgeSpec) { + if (fileIO instanceof SupportsBulkOperations bulkOps) { + var startedNanos = System.nanoTime(); + + var iter = locationStream.iterator(); + + var batcher = new PurgeBatcher(purgeSpec, bulkOps); + while (iter.hasNext()) { + batcher.add(iter.next()); + } + batcher.flush(); + + return ImmutablePurgeStats.builder() + .purgedFiles(batcher.purged) + .failedPurges(batcher.failed) + .duration(Duration.ofNanos(System.nanoTime() - startedNanos)) + .build(); + } + + throw new IllegalStateException( + format( + "An Iceberg FileIO supporting bulk operations is required, but the given %s does not", + fileIO.getClass().getName())); + } + + @SuppressWarnings("UnstableApiUsage") + static final class PurgeBatcher { + private final PurgeSpec purgeSpec; + private final SupportsBulkOperations bulkOps; + + private final int deleteBatchSize; + // Using a `Set` prevents duplicate paths in a single bulk-deletion. + + private final Set batch = new HashSet<>(); + + private final Runnable fileDeleteRateLimiter; + private final Runnable batchDeleteRateLimiter; + + long purged = 0L; + + long failed = 0L; + + PurgeBatcher(PurgeSpec purgeSpec, SupportsBulkOperations bulkOps) { + var implSpecificLimit = implSpecificDeleteBatchLimit(bulkOps); + + this.deleteBatchSize = Math.min(implSpecificLimit, Math.max(purgeSpec.deleteBatchSize(), 1)); + + this.purgeSpec = purgeSpec; + this.bulkOps = bulkOps; + + fileDeleteRateLimiter = createLimiter(purgeSpec.fileDeletesPerSecond()); + batchDeleteRateLimiter = createLimiter(purgeSpec.batchDeletesPerSecond()); + } + + private static Runnable createLimiter(OptionalDouble optionalDouble) { + if (optionalDouble.isEmpty()) { + // unlimited + return () -> {}; + } + var limiter = RateLimiter.create(optionalDouble.getAsDouble()); + return limiter::acquire; + } + + void add(String location) { + fileDeleteRateLimiter.run(); + batch.add(location); + + if (batch.size() >= deleteBatchSize) { + flush(); + } + } + + void flush() { + int size = batch.size(); + if (size > 0) { + batch.forEach(purgeSpec.purgeIssuedCallback()); + try { + batchDeleteRateLimiter.run(); + bulkOps.deleteFiles(batch); + purged += size; + } catch (BulkDeletionFailureException e) { + // Object stores do delete the files that exist, but a BulkDeletionFailureException is + // still being thrown. + // However, not all FileIO implementations behave the same way as some don't throw in the + // non-existent-case. + var batchFailed = e.numberFailedObjects(); + purged += size - batchFailed; + failed += batchFailed; + } finally { + batch.clear(); + } + } + } + } + + /** Figure out the hard coded max batch size limit for a particular FileIO implementation. */ + static int implSpecificDeleteBatchLimit(SupportsBulkOperations bulkOps) { + var className = bulkOps.getClass().getName(); + return switch (className) { + // See https://aws.amazon.com/blogs/aws/amazon-s3-multi-object-deletion/ + case "S3FileIO" -> 1000; + // See https://cloud.google.com/storage/docs/batch + case "GCSFileIO" -> 100; + // ADLS limited to 50, because the implementation, as of Iceberg 1.10, uses one thread per + // file to be deleted (no specialized bulk deletion). + case "ADLSFileIO" -> 50; + // Use a reasonable(?) default for all other FileIO implementations. + default -> 50; + }; + } + + private Optional readTableMetadataFailsafe(String tableMetadataLocation) { + try { + var inputFile = fileIO.newInputFile(tableMetadataLocation); + return Optional.of(TableMetadataParser.read(inputFile)); + } catch (Exception e) { + LOGGER.warn( + "Failure reading table metadata file {}: {}", tableMetadataLocation, e.toString()); + LOGGER.debug("Failure reading table metadata file {}", tableMetadataLocation, e); + return Optional.empty(); + } + } + + private Optional readViewMetadataFailsafe(String viewMetadataLocation) { + try { + var inputFile = fileIO.newInputFile(viewMetadataLocation); + return Optional.of(ViewMetadataParser.read(inputFile)); + } catch (Exception e) { + LOGGER.warn("Failure reading view metadata file {}: {}", viewMetadataLocation, e.toString()); + LOGGER.debug("Failure reading view metadata file {}", viewMetadataLocation, e); + return Optional.empty(); + } + } +} diff --git a/storage/files/impl/src/main/resources/META-INF/beans.xml b/storage/files/impl/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/storage/files/impl/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/BaseTestFileOperationsImpl.java b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/BaseTestFileOperationsImpl.java new file mode 100644 index 0000000000..b7c70943d2 --- /dev/null +++ b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/BaseTestFileOperationsImpl.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.projectnessie.objectstoragemock.HeapStorageBucket.newHeapStorageBucket; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.objectstoragemock.Bucket; +import org.projectnessie.objectstoragemock.InterceptingBucket; +import org.projectnessie.objectstoragemock.MockObject; +import org.projectnessie.objectstoragemock.ObjectStorageMock; + +public abstract class BaseTestFileOperationsImpl extends BaseFileOperationsImpl { + @ParameterizedTest + @MethodSource + public void purgeIcebergTable( + int numSnapshots, int numManifestFiles, int numDataFiles, int deleteBatchSize) + throws Exception { + + var fixtures = new IcebergFixtures(prefix(), numSnapshots, numManifestFiles, numDataFiles); + + var baseBucket = Bucket.builder().build(); + var interceptingBucket = new InterceptingBucket(baseBucket); + interceptingBucket.setRetriever( + path -> { + if ("foo.metadata.json".equals(path)) { + return mockObjectForBytes(fixtures.tableMetadataBytes); + } + // manifest list example: 00000/snap-0.avro + if (path.endsWith(".avro")) { + var firstSlash = path.indexOf('/'); + var snapshot = Integer.parseInt(path.substring(0, firstSlash)); + if (path.contains("/snap-")) { + var manifestListBytes = fixtures.serializedManifestList(snapshot); + return mockObjectForBytes(manifestListBytes); + } else { + var nextSlash = path.indexOf('/', firstSlash + 1); + var manifest = Integer.parseInt(path.substring(firstSlash + 1, nextSlash)); + var manifestBytes = fixtures.serializedManifestFile(snapshot, manifest, path); + return mockObjectForBytes(manifestBytes); + } + } + if (path.endsWith(".parquet")) { + // Need some data file here + return Optional.of( + MockObject.builder() + .contentType("application/parquet") + .contentLength(1) + .lastModified(0L) + .build()); + } + return Optional.empty(); + }); + var deletes = ConcurrentHashMap.newKeySet(); + interceptingBucket.setDeleter(path -> Optional.of(deletes.add(path))); + + var objectStorageMock = + ObjectStorageMock.builder().putBuckets(bucket(), interceptingBucket).build(); + + try (var server = objectStorageMock.start(); + var fileIO = initializedFileIO(server)) { + fileIO.initialize(icebergProperties(server)); + + var prefix = prefix(); + + var fileOps = new FileOperationsImpl(fileIO); + + var purgeStats = + fileOps.purgeIcebergTable( + prefix + "foo.metadata.json", + PurgeSpec.DEFAULT_INSTANCE.withDeleteBatchSize(deleteBatchSize)); + + assertThat(purgeStats.purgedFiles()) + // 1st "1" --> metadata-json + // 2nd "1" --> manifest-list + // 3rd "1" --> manifest-file + .isEqualTo( + 1 + + fixtures.numSnapshots + * (1 + (long) fixtures.numManifestFiles * (1 + fixtures.numDataFiles))) + .isEqualTo(deletes.size()); + } + } + + static Stream purgeIcebergTable() { + return Stream.of( + // one snapshot, three manifest files, 5 data files per manifest file + Arguments.of(1, 3, 5, 10), + // five snapshot, 7 manifest files per snapshot, 13 data files per manifest file + Arguments.of(5, 7, 13, 10), + // five snapshot, 7 manifest files per snapshot, 1000 data files per manifest file, + // batch delete size 500 + Arguments.of(5, 7, 1_000, 500)); + } + + static Optional mockObjectForBytes(byte[] bytes) { + return Optional.of( + MockObject.builder() + .contentType("application/json") + .lastModified(System.currentTimeMillis()) + .contentLength(bytes.length) + .writer( + (range, output) -> { + var off = range != null ? (int) range.start() : 0; + var end = + range == null || range.end() == Long.MAX_VALUE + ? bytes.length + : (int) range.end() + 1; + var len = Math.min(end - off, bytes.length - off); + output.write(bytes, off, len); + }) + .build()); + } + + @ParameterizedTest + @ValueSource(ints = {500}) + public void someFiles(int numFiles) throws Exception { + Set keys = + IntStream.range(0, numFiles) + .mapToObj(i -> format("path/%d/%d", i % 100, i)) + .collect(Collectors.toCollection(ConcurrentHashMap::newKeySet)); + + var mockObject = MockObject.builder().build(); + var objectStorageMock = + ObjectStorageMock.builder() + .putBuckets( + bucket(), + Bucket.builder() + .lister( + (String prefix, String offset) -> + keys.stream() + .map( + key -> + new Bucket.ListElement() { + @Override + public String key() { + return key; + } + + @Override + public MockObject object() { + return mockObject; + } + })) + .object(key -> keys.contains(key) ? mockObject : null) + .deleter(keys::remove) + .build()) + .build(); + + try (var server = objectStorageMock.start(); + var fileIO = initializedFileIO(server)) { + + var prefix = prefix(); + + var fileOps = new FileOperationsImpl(fileIO); + + try (Stream files = fileOps.findFiles(prefix, FileFilter.alwaysTrue())) { + assertThat(files).hasSize(numFiles); + } + + int deletes = numFiles / 10; + assertThat( + fileOps.purgeFiles( + IntStream.range(0, deletes) + .mapToObj(i -> format(prefix + "path/%d/%d", i % 100, i)), + PurgeSpec.DEFAULT_INSTANCE)) + .extracting(PurgeStats::purgedFiles) + .isEqualTo((long) deletes); + + try (Stream files = fileOps.findFiles(prefix, FileFilter.alwaysTrue())) { + assertThat(files).hasSize(numFiles - deletes); + } + } + } + + @ParameterizedTest + @ValueSource( + ints = {50_000 + // 150_000 (disabled for unit test runtime reasons) + // 50_000_000 would work, too, but takes way too long for a unit test + }) + public void manyFiles(int numFiles) throws Exception { + var mockObject = MockObject.builder().build(); + var pathPrefix = "x".repeat(1000) + "/"; + var objectStorageMock = + ObjectStorageMock.builder() + .putBuckets( + bucket(), + Bucket.builder() + .lister( + (String prefix, String offset) -> { + var off = + offset != null + ? Integer.parseInt(offset.substring(pathPrefix.length())) + : 0; + return IntStream.range(off, numFiles) + .mapToObj( + i -> + new Bucket.ListElement() { + @Override + public String key() { + return pathPrefix + format("%010d", i); + } + + @Override + public MockObject object() { + return mockObject; + } + }); + }) + .object(key -> mockObject) + .build()) + .build(); + + try (var server = objectStorageMock.start(); + var fileIO = initializedFileIO(server)) { + + var prefix = prefix(); + + var fileOps = new FileOperationsImpl(fileIO); + + try (Stream files = fileOps.findFiles(prefix, FileFilter.alwaysTrue())) { + assertThat(files.count()).isEqualTo(numFiles); + } + } + } + + @Test + public void icebergIntegration() throws Exception { + var objectStorageMock = + ObjectStorageMock.builder().putBuckets(bucket(), newHeapStorageBucket().bucket()).build(); + + try (var server = objectStorageMock.start(); + var fileIO = initializedFileIO(server)) { + var properties = icebergProperties(server); + + icebergIntegration(fileIO, properties); + } + } + + public FileIO initializedFileIO(ObjectStorageMock.MockServer server) { + var fileIO = createFileIO(); + fileIO.initialize(icebergProperties(server)); + return fileIO; + } + + protected abstract FileIO createFileIO(); + + protected abstract String bucket(); + + protected abstract Map icebergProperties(ObjectStorageMock.MockServer server); +} diff --git a/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithADLS.java b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithADLS.java new file mode 100644 index 0000000000..3ff881bff1 --- /dev/null +++ b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithADLS.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import java.util.Map; +import org.apache.iceberg.azure.adlsv2.ADLSFileIO; +import org.apache.iceberg.io.FileIO; +import org.projectnessie.objectstoragemock.ObjectStorageMock; + +public class TestFileOperationsImplWithADLS extends BaseTestFileOperationsImpl { + + @Override + protected String bucket() { + return "$root"; + } + + @Override + protected Map icebergProperties(ObjectStorageMock.MockServer server) { + return Map.of( + "adls.connection-string.account", + server.getAdlsGen2BaseUri().toString(), + "adls.auth.shared-key.account.name", + "account@account.dfs.core.windows.net", + "adls.auth.shared-key.account.key", + "key"); + } + + @Override + protected String prefix() { + return format("abfs://%s@account/", bucket()); + } + + @Override + protected FileIO createFileIO() { + return new ADLSFileIO(); + } +} diff --git a/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithGCS.java b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithGCS.java new file mode 100644 index 0000000000..7ea194cacc --- /dev/null +++ b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithGCS.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import java.util.Map; +import org.apache.iceberg.gcp.gcs.GCSFileIO; +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.Disabled; +import org.projectnessie.objectstoragemock.ObjectStorageMock; + +@Disabled( + "Requires implementation of the /batch/storage/v1 endpoint in object-storage-mock. " + + "That consumes a multipart/mixed content, which contains a series of serialized HTTP requests.") +public class TestFileOperationsImplWithGCS extends BaseTestFileOperationsImpl { + + @Override + protected String bucket() { + return "bucket"; + } + + @Override + protected Map icebergProperties(ObjectStorageMock.MockServer server) { + String uri = server.getGcsBaseUri().toString(); + uri = uri.substring(0, uri.length() - 1); + return Map.of( + "gcs.project-id", + "my-project", + // MUST NOT end with a trailing slash, otherwise code like + // com.google.cloud.storage.spi.v1.HttpStorageRpc.DefaultRpcBatch.submit inserts an + // ambiguous empty path segment ("//"). + "gcs.service.host", + uri, + "gcs.no-auth", + "true"); + } + + @Override + protected String prefix() { + return format("gs://%s/", bucket()); + } + + @Override + protected FileIO createFileIO() { + return new GCSFileIO(); + } +} diff --git a/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithS3.java b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithS3.java new file mode 100644 index 0000000000..ad12385f2a --- /dev/null +++ b/storage/files/impl/src/test/java/org/apache/polaris/storage/files/impl/TestFileOperationsImplWithS3.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; + +import java.util.Map; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.io.FileIO; +import org.projectnessie.objectstoragemock.ObjectStorageMock; + +public class TestFileOperationsImplWithS3 extends BaseTestFileOperationsImpl { + + @Override + protected String bucket() { + return "bucket"; + } + + @Override + protected Map icebergProperties(ObjectStorageMock.MockServer server) { + return Map.of( + "s3.access-key-id", "accessKey", + "s3.secret-access-key", "secretKey", + "s3.endpoint", server.getS3BaseUri().toString(), + // must enforce path-style access because S3Resource has the bucket name in its path + "s3.path-style-access", "true", + "client.region", "eu-central-1", + "http-client.type", "urlconnection"); + } + + @Override + protected String prefix() { + return format("s3://%s/", bucket()); + } + + @Override + protected FileIO createFileIO() { + return new S3FileIO(); + } +} diff --git a/storage/files/impl/src/testFixtures/java/org/apache/iceberg/IcebergBridge.java b/storage/files/impl/src/testFixtures/java/org/apache/iceberg/IcebergBridge.java new file mode 100644 index 0000000000..a0c9ef4df5 --- /dev/null +++ b/storage/files/impl/src/testFixtures/java/org/apache/iceberg/IcebergBridge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; + +public class IcebergBridge { + public static Snapshot mockSnapshot( + long sequenceNumber, + long snapshotId, + Long parentId, + long timestamp, + String operation, + Map summary, + Integer schemaId, + String manifestList, + long addedRows) { + return new BaseSnapshot( + sequenceNumber, + snapshotId, + parentId, + timestamp, + operation, + summary, + schemaId, + manifestList, + null, + addedRows, + null); + } +} diff --git a/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/BaseFileOperationsImpl.java b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/BaseFileOperationsImpl.java new file mode 100644 index 0000000000..1d39624e53 --- /dev/null +++ b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/BaseFileOperationsImpl.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.types.Types; +import org.apache.polaris.storage.files.api.FileFilter; +import org.apache.polaris.storage.files.api.FileSpec; +import org.apache.polaris.storage.files.api.FileType; +import org.apache.polaris.storage.files.api.PurgeSpec; +import org.apache.polaris.storage.files.api.PurgeStats; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public abstract class BaseFileOperationsImpl { + @InjectSoftAssertions protected SoftAssertions soft; + + protected void icebergIntegration(FileIO fileIO, Map properties) + throws Exception { + var tables = new ConcurrentHashMap(); + + var prefix = prefix() + "ns/some_table/"; + + var ops = new TableOps(tables, fileIO, TableIdentifier.of(Namespace.of("ns"), "some_table")); + var schema = new Schema(Types.NestedField.required(3, "id", Types.IntegerType.get())); + var metadata = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), prefix, properties); + ops.commit(null, metadata); + + var table = new BaseTable(ops, "some_table", x -> {}); + var dataFile1 = dataFile(fileIO, prefix + "data-file-1.parquet"); + table.newFastAppend().appendFile(dataFile1).commit(); + var dataFile2a = dataFile(fileIO, prefix + "data-file-2a.parquet"); + var dataFile2b = dataFile(fileIO, prefix + "data-file-2b.parquet"); + table.newFastAppend().appendFile(dataFile2a).appendFile(dataFile2b).commit(); + var dataFile3a = dataFile(fileIO, prefix + "data-file-3a.parquet"); + var dataFile3b = dataFile(fileIO, prefix + "data-file-3b.parquet"); + var dataFile3c = dataFile(fileIO, prefix + "data-file-3c.parquet"); + table + .newFastAppend() + .appendFile(dataFile3a) + .appendFile(dataFile3b) + .appendFile(dataFile3c) + .commit(); + + var metadataLocation = table.operations().current().metadataFileLocation(); + + var fileOps = new FileOperationsImpl(fileIO); + + var identifiedFiles = fileOps.identifyIcebergTableFiles(metadataLocation, true).toList(); + // 4 table-metadata in total (identify can only yield the given metadata and all mentioned in + // the metadata-log) + // 3 manifest lists + // 3 manifest files + // 6 data files + soft.assertThat(identifiedFiles) + .hasSize(16) + .extracting(FileSpec::location) + .contains( + dataFile1.location(), + dataFile2a.location(), + dataFile2b.location(), + dataFile3a.location(), + dataFile3b.location(), + dataFile3c.location(), + metadataLocation); + soft.assertThat( + identifiedFiles.stream() + .collect( + Collectors.groupingBy( + fileSpec -> fileSpec.fileType().orElse(FileType.UNKNOWN), + Collectors.counting()))) + .containsAllEntriesOf( + Map.of( + FileType.ICEBERG_METADATA, + 4L, + FileType.ICEBERG_MANIFEST_LIST, + 3L, + FileType.ICEBERG_MANIFEST_FILE, + 3L, + FileType.ICEBERG_DATA_FILE, + 6L)); + + var foundFiles = fileOps.findFiles(prefix, FileFilter.alwaysTrue()).toList(); + soft.assertThat(foundFiles) + .extracting(FileSpec::location) + .containsExactlyInAnyOrderElementsOf( + identifiedFiles.stream().map(FileSpec::location).collect(Collectors.toList())); + soft.assertThat( + foundFiles.stream() + .collect(Collectors.groupingBy(FileSpec::guessTypeFromName, Collectors.counting()))) + .containsAllEntriesOf( + Map.of( + FileType.ICEBERG_METADATA, + 4L, + FileType.ICEBERG_MANIFEST_LIST, + 3L, + FileType.ICEBERG_MANIFEST_FILE, + 3L, + FileType.UNKNOWN, + 6L)); + + soft.assertThat(fileOps.purgeIcebergTable(metadataLocation, PurgeSpec.DEFAULT_INSTANCE)) + .extracting(PurgeStats::purgedFiles) + .isEqualTo(16L); + soft.assertThat(fileOps.findFiles(prefix, FileFilter.alwaysTrue())).isEmpty(); + } + + DataFile dataFile(FileIO fileIO, String path) throws Exception { + var dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(path) + .withFileSizeInBytes(10L) + .withRecordCount(2L) + .build(); + write(fileIO, path, new byte[(int) dataFile.fileSizeInBytes()]); + return dataFile; + } + + protected void write(FileIO fileIO, String name, byte[] bytes) throws Exception { + var outFile = fileIO.newOutputFile(name); + try (var out = outFile.create()) { + out.write(bytes); + } + } + + protected abstract String prefix(); +} diff --git a/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/IcebergFixtures.java b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/IcebergFixtures.java new file mode 100644 index 0000000000..a6b059336e --- /dev/null +++ b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/IcebergFixtures.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.IcebergBridge; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.types.Types; +import org.projectnessie.catalog.formats.iceberg.IcebergSpec; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataContent; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataFile; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergFileFormat; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestContent; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntry; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntryStatus; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFile; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriter; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriterSpec; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriter; +import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriterSpec; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema; + +public class IcebergFixtures { + public final Schema schema; + public final IcebergSchema nessieIcebergSchema; + public final PartitionSpec spec; + public final TableMetadata tableMetadata; + public final String tableMetadataString; + public final byte[] tableMetadataBytes; + + public final String prefix; + public final int numSnapshots; + public final int numManifestFiles; + public final int numDataFiles; + + public IcebergFixtures(String prefix, int numSnapshots, int numManifestFiles, int numDataFiles) { + this.prefix = prefix; + this.numSnapshots = numSnapshots; + this.numManifestFiles = numManifestFiles; + this.numDataFiles = numDataFiles; + + schema = new Schema(1, Types.NestedField.required(1, "foo", Types.StringType.get())); + try { + nessieIcebergSchema = + new ObjectMapper().readValue(SchemaParser.toJson(schema), IcebergSchema.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + spec = PartitionSpec.unpartitioned(); + + var tableMetadataBuilder = + TableMetadata.buildFrom( + TableMetadata.newTableMetadata(schema, spec, prefix, Map.of()).withUUID()); + for (var snapshotId = 1; snapshotId <= numSnapshots; snapshotId++) { + var manifestList = manifestListPath(snapshotId); + var snapshot = + IcebergBridge.mockSnapshot( + snapshotId + 1, + snapshotId + 1, + snapshotId > 0 ? (long) snapshotId : null, + System.currentTimeMillis(), + "APPEND", + Map.of(), + schema.schemaId(), + manifestList, + (long) numManifestFiles * numManifestFiles); + tableMetadataBuilder.addSnapshot(snapshot); + } + tableMetadata = tableMetadataBuilder.build(); + + tableMetadataString = TableMetadataParser.toJson(tableMetadata); + tableMetadataBytes = tableMetadataString.getBytes(UTF_8); + } + + public String manifestListPath(int snapshotId) { + return format("%s%05d/snap-%d.avro", prefix, snapshotId, snapshotId); + } + + public byte[] serializedManifestList(long snapshotId) { + var output = new ByteArrayOutputStream(); + try (var manifestListWriter = + IcebergManifestListWriter.openManifestListWriter( + IcebergManifestListWriterSpec.builder() + .snapshotId(snapshotId) + .sequenceNumber(snapshotId) + .parentSnapshotId(snapshotId > 0 ? snapshotId - 1 : null) + .partitionSpec(IcebergPartitionSpec.UNPARTITIONED_SPEC) + .spec(IcebergSpec.V2) + .schema(nessieIcebergSchema) + .build(), + output)) { + for (int i = 0; i < numManifestFiles; i++) { + var manifestPath = manifestFilePath(snapshotId, i); + var manifestFile = + IcebergManifestFile.builder() + .addedFilesCount(numDataFiles) + .addedRowsCount((long) numDataFiles) + .existingFilesCount(0) + .existingRowsCount(0L) + .deletedFilesCount(0) + .deletedRowsCount(0L) + .content(IcebergManifestContent.DATA) + .manifestLength(1024) + .minSequenceNumber(snapshotId) + .sequenceNumber(snapshotId) + .manifestPath(manifestPath) + .partitionSpecId(IcebergPartitionSpec.UNPARTITIONED_SPEC.specId()) + .addedSnapshotId(snapshotId) + .build(); + manifestListWriter.append(manifestFile); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return output.toByteArray(); + } + + public String manifestFilePath(long snapshotId, int file) { + return format("%s%05d/%05d/xyz-m-manifest.avro", prefix, snapshotId, file); + } + + public byte[] serializedManifestFile(long snapshotId, int manifest, String path) { + var output = new ByteArrayOutputStream(); + var spec = IcebergPartitionSpec.UNPARTITIONED_SPEC; + var partition = new GenericData.Record(spec.avroSchema(nessieIcebergSchema, "r102")); + try (var manifestFileWriter = + IcebergManifestFileWriter.openManifestFileWriter( + IcebergManifestFileWriterSpec.builder() + .addedSnapshotId(snapshotId) + .manifestPath(path) + .content(IcebergManifestContent.DATA) + .minSequenceNumber(snapshotId) + .sequenceNumber(snapshotId) + .partitionSpec(spec) + .schema(nessieIcebergSchema) + .spec(IcebergSpec.V2) + .build(), + output)) { + for (int i = 0; i < numDataFiles; i++) { + var manifestPath = format("%s%05d/%05d/%05d/data.parquet", prefix, snapshotId, manifest, i); + manifestFileWriter.append( + IcebergManifestEntry.builder() + .dataFile( + IcebergDataFile.builder() + .fileFormat(IcebergFileFormat.PARQUET) + .filePath(manifestPath) + .specId(spec.specId()) + .content(IcebergDataContent.DATA) + .fileSizeInBytes(1024) + .partition(partition) + .recordCount(1) + .build()) + .sequenceNumber(snapshotId) + .fileSequenceNumber(snapshotId) + .snapshotId(snapshotId) + .status(IcebergManifestEntryStatus.ADDED) + .build()); + } + manifestFileWriter.finish(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return output.toByteArray(); + } +} diff --git a/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/TableOps.java b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/TableOps.java new file mode 100644 index 0000000000..bc6705d021 --- /dev/null +++ b/storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/TableOps.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.storage.files.impl; + +import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; + +/** Helper for Iceberg integration tests. */ +class TableOps extends BaseMetastoreTableOperations { + private final ConcurrentMap tables; + private final FileIO fileIO; + private final TableIdentifier tableIdentifier; + + TableOps( + ConcurrentMap tables, + FileIO fileIO, + TableIdentifier tableIdentifier) { + this.tables = tables; + this.fileIO = fileIO; + this.tableIdentifier = tableIdentifier; + } + + @Override + protected void doRefresh() { + var latestLocation = tables.get(tableIdentifier); + if (latestLocation == null) { + disableRefresh(); + } else { + refreshFromMetadataLocation(latestLocation); + } + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + var newLocation = writeNewMetadataIfRequired(base == null, metadata); + var oldLocation = base == null ? null : base.metadataFileLocation(); + tables.compute( + tableIdentifier, + (k, existingLocation) -> { + if (!Objects.equals(existingLocation, oldLocation)) { + if (null == base) { + throw new AlreadyExistsException("Table already exists: %s", tableName()); + } + + if (null == existingLocation) { + throw new NoSuchTableException("Table does not exist: %s", tableName()); + } + + throw new CommitFailedException( + "Cannot commit to table %s metadata location from %s to %s " + + "because it has been concurrently modified to %s", + tableIdentifier, oldLocation, newLocation, existingLocation); + } + return newLocation; + }); + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String tableName() { + return ""; + } +} diff --git a/storage/files/impl/src/testFixtures/resources/logback-test.xml b/storage/files/impl/src/testFixtures/resources/logback-test.xml new file mode 100644 index 0000000000..9620b84c17 --- /dev/null +++ b/storage/files/impl/src/testFixtures/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + +