Skip to content

Commit 525f689

Browse files
committed
Object storage operations (proposal)
API and implementations to perform long-running operations against object stores, mostly to purge files.
1 parent a6d23d8 commit 525f689

File tree

29 files changed

+2590
-0
lines changed

29 files changed

+2590
-0
lines changed

bom/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ dependencies {
8484

8585
api(project(":polaris-core"))
8686

87+
api(project(":polaris-storage-files-api"))
88+
api(project(":polaris-storage-files-impl"))
89+
8790
api(project(":polaris-relational-jdbc"))
8891

8992
api(project(":polaris-extensions-auth-opa"))

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5
9191
opentelemetry-bom = { module = "io.opentelemetry:opentelemetry-bom", version = "1.57.0" }
9292
opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version= "2.20.1-alpha" }
9393
opentelemetry-semconv = { module = "io.opentelemetry.semconv:opentelemetry-semconv", version = "1.37.0" }
94+
nessie-bom = { module = "org.projectnessie.nessie:nessie-bom", version = "0.106.0" }
9495
picocli = { module = "info.picocli:picocli-codegen", version.ref = "picocli" }
9596
picocli-codegen = { module = "info.picocli:picocli-codegen", version.ref = "picocli" }
9697
postgresql = { module = "org.postgresql:postgresql", version = "42.7.8" }

gradle/projects.main.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ polaris-extensions-federation-hive=extensions/federation/hive
4545
polaris-extensions-auth-opa=extensions/auth/opa/impl
4646
polaris-extensions-auth-opa-tests=extensions/auth/opa/tests
4747

48+
polaris-storage-files-api=storage/files/api
49+
polaris-storage-files-impl=storage/files/impl
50+
4851
polaris-config-docs-annotations=tools/config-docs/annotations
4952
polaris-config-docs-generator=tools/config-docs/generator
5053
polaris-config-docs-site=tools/config-docs/site

storage/files/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Polaris object store operations
21+
22+
API and implementations to perform long-running operations against object stores, mostly to purge files.
23+
24+
Functionalities to scan an object store and to purge files are separated. Filter mechanisms are used to
25+
select the files to be deleted (purged).
26+
27+
There are implementations to identify the files referenced by a particular Iceberg table or view metadata, including
28+
statistics files, manifest lists of all snapshots, the manifest files and the data/delete files.
29+
30+
The file operations perform no effort to identify duplicates during the identification of files referenced by
31+
a table or view metadata.
32+
This means that, for example, a data file referenced in multiple manifest files will be returned twice.
33+
34+
Purge operations are performed in one or multiple bulk delete operations.
35+
The implementation takes care of not including the same file more than once within a single bulk delete operation.
36+
37+
One alternative implementation purges all files within the base location of a table or view metadata.
38+
39+
All implemented operations are designed to be resilient against failures as those are expected to be run as
40+
maintenance operations or as part of such.
41+
The operations are implemented to continue in case of errors and eventually succeed instead of failing eagerly.
42+
Maintenance operations are usually not actively observed, and manually fixing consistency issues in object
43+
stores is not a straightforward task for users.
44+
45+
# Potential future enhancements
46+
47+
The operations provided by `FileOperations` are meant for maintenance operations, which are not
48+
time- or performance-critical.
49+
It is more important that the operations are resilient against failures, do not add unnecessary CPU or heap pressure
50+
and eventually succeed.
51+
Further, maintenance operations should not eat up too much I/O bandwidth to not interfere with other user-facing
52+
operations.
53+
54+
Depending on the overall load of the system, it might be worth running some operations in parallel.
55+
56+
# Code architecture
57+
58+
The code is split in two modules. One for the (Polaris internal) API interfaces and one for the implementations.
59+
60+
Tests against various object store implementations are included as unit tests using an on-heap object-store-mock
61+
and as integration tests against test containers for S3, GCS and ADLS.
62+
The object-store-mock used in unit tests is also used to validate the low heap-pressure required by the
63+
implementations.
64+
65+
The actual object store interaction of the current implementation is delegated to Iceberg `FileIO` implementations.
66+
Only `FileIO` implementations that support prefix-operations (`SupportsPrefixOperations` interface) and
67+
bulk-operations (`SupportsBulkOperations` interface) are currently supported.
68+
The `FileIO` implementations `S3FileIO`, `GCSFileIO` and `ADLSFileIO` support both.
69+
Beside the necessary `FileIO` usage in `FileOperationsFactory`, none of the API functions refer to `FileIO`
70+
to allow the API to be implemented against other, more tailored object store backend implementations.

storage/files/api/build.gradle.kts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
id("polaris-server")
22+
id("org.kordamp.gradle.jandex")
23+
}
24+
25+
dependencies {
26+
implementation(platform(libs.iceberg.bom))
27+
implementation("org.apache.iceberg:iceberg-api")
28+
implementation("org.apache.iceberg:iceberg-core")
29+
30+
implementation(libs.guava)
31+
32+
implementation(platform(libs.jackson.bom))
33+
implementation("com.fasterxml.jackson.core:jackson-annotations")
34+
implementation("com.fasterxml.jackson.core:jackson-core")
35+
implementation("com.fasterxml.jackson.core:jackson-databind")
36+
37+
compileOnly(project(":polaris-immutables"))
38+
annotationProcessor(project(":polaris-immutables", configuration = "processor"))
39+
40+
compileOnly(libs.jakarta.annotation.api)
41+
}
42+
43+
tasks.named("javadoc") { dependsOn("jandex") }
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.storage.files.api;
21+
22+
import java.util.function.Predicate;
23+
24+
@FunctionalInterface
25+
public interface FileFilter extends Predicate<FileSpec> {
26+
27+
static FileFilter alwaysTrue() {
28+
return fileSpec -> true;
29+
}
30+
31+
static FileFilter alwaysFalse() {
32+
return fileSpec -> false;
33+
}
34+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.storage.files.api;
21+
22+
import jakarta.annotation.Nonnull;
23+
import java.util.stream.Stream;
24+
25+
/**
26+
* Object storage file operations, used to find files below a given prefix, to purge files, to
27+
* identify referenced files, etc.
28+
*
29+
* <p>All functions of this interface rather yield incomplete results and continue over throwing
30+
* exceptions.
31+
*/
32+
public interface FileOperations {
33+
/**
34+
* Find files that match the given prefix and filter.
35+
*
36+
* <p>Whether existing but inaccessible files are included in the result depends on the object
37+
* store.
38+
*
39+
* <p>Call sites should consider rate-limiting the scan operations, for example, by using Guava's
40+
* {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); return x; }} step on
41+
* the returned stream.
42+
*
43+
* @param prefix full object storage URI prefix, including scheme and bucket.
44+
* @param filter file filter
45+
* @return a stream of file specs with the {@link FileSpec#createdAtMillis()} and {@link
46+
* FileSpec#size()} attributes populated with the information provided by the object store.
47+
* The {@link FileSpec#fileType() file type} attribute is not populated, it may be {@link
48+
* FileSpec#guessTypeFromName() guessed}.
49+
*/
50+
Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter filter);
51+
52+
/**
53+
* Identifies all files referenced by the given table-metadata.
54+
*
55+
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
56+
* readable, the returned stream will just not include those.
57+
*
58+
* <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or
59+
* views. Rate-limiting on a single invocation may not be effective as expected.
60+
*
61+
* @param tableMetadataLocation Iceberg table-metadata location
62+
* @param deduplicate if true, attempt to deduplicate files by their location, adding additional
63+
* heap pressure to the operation. Implementations may ignore this parameter or may not
64+
* deduplicate all identified files.
65+
* @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()}
66+
* attribute is usually not populated, as it would have to be derived from user-provided
67+
* information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated
68+
* based on where a file appears during identification.
69+
*/
70+
Stream<FileSpec> identifyIcebergTableFiles(
71+
@Nonnull String tableMetadataLocation, boolean deduplicate);
72+
73+
/**
74+
* Identifies all files referenced by the given view-metadata.
75+
*
76+
* <p>In case "container" files like the metadata are not readable, the returned stream will just
77+
* not include those.
78+
*
79+
* <p>Rate-limiting the returned stream is recommended when identifying multiple tables and/or
80+
* views. Rate-limiting on a single invocation may not be effective as expected.
81+
*
82+
* @param viewMetadataLocation Iceberg view-metadata location
83+
* @param deduplicate if true, attempt to deduplicate files by their location, adding additional
84+
* heap pressure to the operation. Implementations may ignore this parameter or may not
85+
* deduplicate all identified files.
86+
* @return a stream of {@link FileSpec file specs}. The {@link FileSpec#createdAtMillis()}
87+
* attribute is usually not populated, as it would have been derived from user-provided
88+
* information in metadata or snapshot. The {@link FileSpec#fileType()} attribute is populated
89+
* based on where a file appears during identification.
90+
*/
91+
Stream<FileSpec> identifyIcebergViewFiles(
92+
@Nonnull String viewMetadataLocation, boolean deduplicate);
93+
94+
/**
95+
* Purges all files that are referenced by the given table-metadata, respecting the given filter.
96+
*
97+
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
98+
* readable, those files are just ignored.
99+
*
100+
* <p>This is effectively a convenience for {@code
101+
* purge(identifyIcebergTableFiles(tableMetadataLocation).filter(purgeSpec.fileFilter()))}
102+
*
103+
* @see #purge(Stream, PurgeSpec)
104+
* @see #identifyIcebergTableFiles(String, boolean)
105+
* @see #findFiles(String, FileFilter)
106+
*/
107+
PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec);
108+
109+
/**
110+
* Purges all files that are within the base location of the given table-metadata, purge only
111+
* files that match the given filter.
112+
*
113+
* <p>In case "container" files, like the metadata, manifest-list or manifest files, are not
114+
* readable, those files are just ignored.
115+
*
116+
* <p>This is effectively a convenience for {@code
117+
* purge(findFiles(tableMetadata.baseLocation()).filter(purgeSpec.fileFilter()))}
118+
*
119+
* @see #purge(Stream, PurgeSpec)
120+
* @see #findFiles(String, FileFilter)
121+
*/
122+
PurgeStats purgeIcebergTableBaseLocation(
123+
@Nonnull String tableMetadataLocation, PurgeSpec purgeSpec);
124+
125+
/**
126+
* Purges all files that are referenced by the given view-metadata, respecting the given filter. *
127+
*
128+
* <p>In case "container" files like the metadata are not readable, those files are just ignored.
129+
*
130+
* <p>This is effectively a convenience for {@code
131+
* purge(identifyIcebergViewFiles(tableMetadataLocation).filter(fileFilter))}
132+
*
133+
* @see #purge(Stream, PurgeSpec)
134+
* @see #identifyIcebergViewFiles(String, boolean)
135+
* @see #findFiles(String, FileFilter)
136+
*/
137+
PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec);
138+
139+
/**
140+
* Purges all files that are within the base location of the given view-metadata, purge only files
141+
* that match the given filter. *
142+
*
143+
* <p>In case "container" files like the metadata are not readable, those files are just ignored.
144+
*
145+
* <p>This is effectively a convenience for {@code
146+
* purge(findFiles(viewMetadata.baseLocation()).filter(fileFilter))}
147+
*
148+
* @see #purge(Stream, PurgeSpec)
149+
* @see #findFiles(String, FileFilter)
150+
*/
151+
PurgeStats purgeIcebergViewBaseLocation(
152+
@Nonnull String viewMetadataLocation, PurgeSpec purgeSpec);
153+
154+
/**
155+
* Purges all files that match the given stream of locations. The {@link Stream} will be fully
156+
* consumed.
157+
*
158+
* <p>This is a convenience for {@link #purgeFiles(Stream, PurgeSpec)
159+
* purgeFiles(locationStream.map(FileSpec::location))}
160+
*/
161+
PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec purgeSpec);
162+
163+
/**
164+
* Purges all files from the given stream of locations. The {@link Stream} will be fully consumed.
165+
*
166+
* <p>Non-existing files and other deletion errors will not let the call fail, which makes it
167+
* resilient against transient or irrelevant errors.
168+
*/
169+
PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, PurgeSpec purgeSpec);
170+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.storage.files.api;
21+
22+
import org.apache.iceberg.io.FileIO;
23+
24+
/**
25+
* Factory to create {@link FileOperations} instances to perform object storage related maintenance
26+
* operations.
27+
*/
28+
public interface FileOperationsFactory {
29+
/**
30+
* Create a {@link FileOperations} instance for the given {@link FileIO} instance.
31+
*
32+
* @param fileIO the {@link FileIO} instance to use. The given instance must implement both {@link
33+
* org.apache.iceberg.io.SupportsBulkOperations} and {@link
34+
* org.apache.iceberg.io.SupportsPrefixOperations}.
35+
*/
36+
FileOperations createFileOperations(FileIO fileIO);
37+
}

0 commit comments

Comments
 (0)