Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ dependencies {

api(project(":polaris-core"))

api(project(":polaris-storage-files-api"))
api(project(":polaris-storage-files-impl"))

api(project(":polaris-relational-jdbc"))

api(project(":polaris-extensions-auth-opa"))
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5
opentelemetry-bom = { module = "io.opentelemetry:opentelemetry-bom", version = "1.57.0" }
opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version= "2.20.1-alpha" }
opentelemetry-semconv = { module = "io.opentelemetry.semconv:opentelemetry-semconv", version = "1.37.0" }
nessie-bom = { module = "org.projectnessie.nessie:nessie-bom", version = "0.106.0" }
picocli = { module = "info.picocli:picocli-codegen", version.ref = "picocli" }
picocli-codegen = { module = "info.picocli:picocli-codegen", version.ref = "picocli" }
postgresql = { module = "org.postgresql:postgresql", version = "42.7.8" }
Expand Down
3 changes: 3 additions & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ polaris-extensions-federation-hive=extensions/federation/hive
polaris-extensions-auth-opa=extensions/auth/opa/impl
polaris-extensions-auth-opa-tests=extensions/auth/opa/tests

polaris-storage-files-api=storage/files/api
polaris-storage-files-impl=storage/files/impl

polaris-config-docs-annotations=tools/config-docs/annotations
polaris-config-docs-generator=tools/config-docs/generator
polaris-config-docs-site=tools/config-docs/site
Expand Down
70 changes: 70 additions & 0 deletions storage/files/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Polaris object store operations

API and implementations to perform long-running operations against object stores, mostly to purge files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I wouldn't limit to purging files. I could see other object store operations which might be helpful in the future.

So, I'd remove "mostly to purge files."


Functionalities to scan an object store and to purge files are separated. Filter mechanisms are used to
select the files to be deleted (purged).

There are implementations to identify the files referenced by a particular Iceberg table or view metadata, including
statistics files, manifest lists of all snapshots, the manifest files and the data/delete files.

The file operations perform no effort to identify duplicates during the identification of files referenced by
a table or view metadata.
This means that, for example, a data file referenced in multiple manifest files will be returned twice.

Purge operations are performed in one or multiple bulk delete operations.
The implementation takes care of not including the same file more than once within a single bulk delete operation.

One alternative implementation purges all files within the base location of a table or view metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this alternative implementation inside of the implementation here?


All implemented operations are designed to be resilient against failures as those are expected to be run as
maintenance operations or as part of such.
The operations are implemented to continue in case of errors and eventually succeed instead of failing eagerly.
Maintenance operations are usually not actively observed, and manually fixing consistency issues in object
stores is not a straightforward task for users.

# Potential future enhancements

The operations provided by `FileOperations` are meant for maintenance operations, which are not
time- or performance-critical.
It is more important that the operations are resilient against failures, do not add unnecessary CPU or heap pressure
and eventually succeed.
Further, maintenance operations should not eat up too much I/O bandwidth to not interfere with other user-facing
operations.

Depending on the overall load of the system, it might be worth running some operations in parallel.

# Code architecture

The code is split in two modules. One for the (Polaris internal) API interfaces and one for the implementations.

Tests against various object store implementations are included as unit tests using an on-heap object-store-mock
and as integration tests against test containers for S3, GCS and ADLS.
The object-store-mock used in unit tests is also used to validate the low heap-pressure required by the
implementations.

The actual object store interaction of the current implementation is delegated to Iceberg `FileIO` implementations.
Only `FileIO` implementations that support prefix-operations (`SupportsPrefixOperations` interface) and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sort of FileIO implementations are not supported then? From what I'm seeing OSSFileIO, InMemoryFileIO, & EcsFileIO?

bulk-operations (`SupportsBulkOperations` interface) are currently supported.
The `FileIO` implementations `S3FileIO`, `GCSFileIO` and `ADLSFileIO` support both.
Beside the necessary `FileIO` usage in `FileOperationsFactory`, none of the API functions refer to `FileIO`
to allow the API to be implemented against other, more tailored object store backend implementations.
43 changes: 43 additions & 0 deletions storage/files/api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
id("polaris-server")
id("org.kordamp.gradle.jandex")
}

dependencies {
implementation(platform(libs.iceberg.bom))
implementation("org.apache.iceberg:iceberg-api")
implementation("org.apache.iceberg:iceberg-core")

implementation(libs.guava)

implementation(platform(libs.jackson.bom))
implementation("com.fasterxml.jackson.core:jackson-annotations")
implementation("com.fasterxml.jackson.core:jackson-core")
implementation("com.fasterxml.jackson.core:jackson-databind")

compileOnly(project(":polaris-immutables"))
annotationProcessor(project(":polaris-immutables", configuration = "processor"))

compileOnly(libs.jakarta.annotation.api)
}

tasks.named("javadoc") { dependsOn("jandex") }
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.storage.files.api;

import java.util.function.Predicate;

@FunctionalInterface
public interface FileFilter extends Predicate<FileSpec> {

static FileFilter alwaysTrue() {
return fileSpec -> true;
}

static FileFilter alwaysFalse() {
return fileSpec -> false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.storage.files.api;

import jakarta.annotation.Nonnull;
import java.util.stream.Stream;

/**
* Object storage file operations, used to find files below a given prefix, to purge files, to
* identify referenced files, etc.
*
* <p>All functions of this interface rather yield incomplete results and continue over throwing
* exceptions.
*/
public interface FileOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For readability, I would introduce a FilePurgeOperations interface which could contain all of the purge operations and have this interface extend that one.

/**
* Find files that match the given prefix and filter.
*
* <p>Whether existing but inaccessible files are included in the result depends on the object
* store.
*
* <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's
* {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on
* the returned stream.
*
* @param prefix full object storage URI prefix, including scheme and bucket.
* @param filter file filter
* @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link
* FileSpec#size()} attributes populated with the information provided by the object store.
* The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link
* FileSpec#guessTypeFromName() guessed}.
*/
Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter);

/**
* Identifies all files referenced by the given table-metadata.
*
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would these files not be readable? Did you have cases in mind when that would happen?

* readable, the returned stream will just not include those.
*
* <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or
* views. Rate-limiting on a single invocation may not be effective as expected.
*
* @param tableMetadataLocation Iceberg table-metadata location
* @param deduplicate if true, attempt to deduplicate files by their location, adding additional
* heap pressure to the operation. Implementations may ignore this parameter or may not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to have the implementation ignore this parameter? Like, it seems as if it would be helpful because, if it is ignored sometimes, the caller will always have to assume that it is not deduplicated. So, the caller might be doing unnecessary work.

* deduplicate all identified files.
* @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()}
* attribute is usually not populated, as it would have to be derived from user-provided
* information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated
* based on where a file appears during identification.
*/
Stream<FileSpec> identifyIcebergTableFiles(
@Nonnull String tableMetadataLocation, boolean deduplicate);

/**
* Identifies all files referenced by the given view-metadata.
*
* <p>In case "container" files like the metadata are not readable, the returned stream will just
* not include those.
*
* <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or
* views. Rate-limiting on a single invocation may not be effective as expected.
*
* @param viewMetadataLocation Iceberg view-metadata location
* @param deduplicate if true, attempt to deduplicate files by their location, adding additional
* heap pressure to the operation. Implementations may ignore this parameter or may not
* deduplicate all identified files.
* @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()}
* attribute is usually not populated, as it would have been derived from user-provided
* information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated
* based on where a file appears during identification.
*/
Stream<FileSpec> identifyIcebergViewFiles(
@Nonnull String viewMetadataLocation, boolean deduplicate);

/**
* Purges all files that are referenced by the given table-metadata, respecting the given filter.
*
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
* readable, those files are just ignored.
*
* <p>This is effectively a convenience for {@code
* purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))}
*
* @see #purge(Stream, PurgeSpec)
* @see #identifyIcebergTableFiles(String, boolean)
* @see #findFiles(String, FileFilter)
*/
PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec);

/**
* Purges all files that are within the base location of the given table-metadata, purge only
* files that match the given filter.
*
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
* readable, those files are just ignored.
*
* <p>This is effectively a convenience for {@code
* purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))}
*
* @see #purge(Stream, PurgeSpec)
* @see #findFiles(String, FileFilter)
*/
PurgeStats purgeIcebergTableBaseLocation(
@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec);

/**
* Purges all files that are referenced by the given view-metadata, respecting the given filter. *
*
* <p>In case "container" files like the metadata are not readable, those files are just ignored.
*
* <p>This is effectively a convenience for {@code
* purge(identifyIcebergViewFiles(tableMetadataLocation).filter(fileFilter))}
*
* @see #purge(Stream, PurgeSpec)
* @see #identifyIcebergViewFiles(String, boolean)
* @see #findFiles(String, FileFilter)
*/
PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec);

/**
* Purges all files that are within the base location of the given view-metadata, purge only files
* that match the given filter. *
*
* <p>In case "container" files like the metadata are not readable, those files are just ignored.
*
* <p>This is effectively a convenience for {@code
* purge(findFiles(viewMetadata.baseLocation()).filter(fileFilter))}
*
* @see #purge(Stream, PurgeSpec)
* @see #findFiles(String, FileFilter)
*/
PurgeStats purgeIcebergViewBaseLocation(
@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec);

/**
* Purges all files that match the given stream of locations. The {@link Stream} will be fully
* consumed.
*
* <p>This is a convenience for {@link #purgeFiles(Stream, PurgeSpec)
* purgeFiles(locationStream.map(FileSpec::location))}
*/
PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec purgeSpec);

/**
* Purges all files from the given stream of locations. The {@link Stream} will be fully consumed.
*
* <p>Non-existing files and other deletion errors will not let the call fail, which makes it
* resilient against transient or irrelevant errors.
*/
PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, PurgeSpec purgeSpec);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.storage.files.api;

import org.apache.iceberg.io.FileIO;

/**
* Factory to create {@link FileOperations} instances to perform object storage related maintenance
* operations.
*/
public interface FileOperationsFactory {
/**
* Create a {@link FileOperations} instance for the given {@link FileIO} instance.
*
* @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link
* org.apache.iceberg.io.SupportsBulkOperations} and {@link
* org.apache.iceberg.io.SupportsPrefixOperations}.
*/
FileOperations createFileOperations(FileIO fileIO);
}
Loading