Skip to content

Commit 559f61c

Browse files
committed
[rest] Refactory RESTTokenFileIO to let it reused in vfs
1 parent 969c53e commit 559f61c

File tree

6 files changed

+57
-188
lines changed

6 files changed

+57
-188
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest;
20+
21+
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.options.Options;
23+
24+
import java.io.Serializable;
25+
26+
/**
27+
* Loader for creating a {@link RESTApi}.
28+
*
29+
* @since 1.3.0
30+
*/
31+
@Public
32+
public interface RESTApiLoader extends Serializable {
33+
34+
RESTApi load();
35+
36+
Options options();
37+
}

paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java renamed to paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java

File renamed without changes.

paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java renamed to paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.rest;
2020

21-
import org.apache.paimon.catalog.Catalog;
2221
import org.apache.paimon.catalog.CatalogContext;
2322
import org.apache.paimon.catalog.Identifier;
2423
import org.apache.paimon.fs.FileIO;
@@ -51,7 +50,7 @@
5150
/** A {@link FileIO} to support getting token from REST Server. */
5251
public class RESTTokenFileIO implements FileIO {
5352

54-
private static final long serialVersionUID = 1L;
53+
private static final long serialVersionUID = 2L;
5554

5655
public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
5756
ConfigOptions.key("data-token.enabled")
@@ -74,25 +73,22 @@ public class RESTTokenFileIO implements FileIO {
7473

7574
private static final Logger LOG = LoggerFactory.getLogger(RESTTokenFileIO.class);
7675

77-
private final RESTCatalogLoader catalogLoader;
76+
private final CatalogContext catalogContext;
7877
private final Identifier identifier;
7978
private final Path path;
8079

81-
// catalog instance before serialization, it will become null after serialization, then we
82-
// should create catalog from catalog loader
83-
private final transient RESTCatalog catalogInstance;
80+
// Api instance before serialization, it will become null after serialization, then we should
81+
// create RESTApi from catalogContext
82+
private transient volatile RESTApi apiInstance;
8483

8584
// the latest token from REST Server, serializable in order to avoid loading token from the REST
8685
// Server again after serialization
87-
private volatile RESTToken token;
86+
private transient volatile RESTToken token;
8887

8988
public RESTTokenFileIO(
90-
RESTCatalogLoader catalogLoader,
91-
RESTCatalog catalogInstance,
92-
Identifier identifier,
93-
Path path) {
94-
this.catalogLoader = catalogLoader;
95-
this.catalogInstance = catalogInstance;
89+
CatalogContext catalogContext, RESTApi apiInstance, Identifier identifier, Path path) {
90+
this.catalogContext = catalogContext;
91+
this.apiInstance = apiInstance;
9692
this.identifier = identifier;
9793
this.path = path;
9894
}
@@ -165,12 +161,13 @@ public FileIO fileIO() throws IOException {
165161
return fileIO;
166162
}
167163

168-
CatalogContext context = catalogLoader.context();
169-
Options options = context.options();
164+
Options options = catalogContext.options();
170165
// the original options are not overwritten
171166
options = new Options(RESTUtil.merge(token.token(), options.toMap()));
172167
options.set(FILE_IO_ALLOW_CACHE, false);
173-
context = CatalogContext.create(options, context.preferIO(), context.fallbackIO());
168+
CatalogContext context =
169+
CatalogContext.create(
170+
options, catalogContext.preferIO(), catalogContext.fallbackIO());
174171
try {
175172
fileIO = FileIO.get(path, context);
176173
} catch (IOException e) {
@@ -199,20 +196,10 @@ private boolean shouldRefresh() {
199196

200197
private void refreshToken() {
201198
LOG.info("begin refresh data token for identifier [{}]", identifier);
202-
GetTableTokenResponse response;
203-
if (catalogInstance != null) {
204-
try {
205-
response = catalogInstance.loadTableToken(identifier);
206-
} catch (Catalog.TableNotExistException e) {
207-
throw new RuntimeException(e);
208-
}
209-
} else {
210-
try (RESTCatalog catalog = catalogLoader.load()) {
211-
response = catalog.loadTableToken(identifier);
212-
} catch (Exception e) {
213-
throw new RuntimeException(e);
214-
}
199+
if (apiInstance == null) {
200+
apiInstance = new RESTApi(catalogContext.options(), false);
215201
}
202+
GetTableTokenResponse response = apiInstance.loadTableToken(identifier);
216203
LOG.info(
217204
"end refresh data token for identifier [{}] expiresAtMillis [{}]",
218205
identifier,

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -971,7 +971,7 @@ protected GetTableTokenResponse loadTableToken(Identifier identifier)
971971

972972
private FileIO fileIOForData(Path path, Identifier identifier) {
973973
return dataTokenEnabled
974-
? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
974+
? new RESTTokenFileIO(context, api, identifier, path)
975975
: fileIOFromOptions(path);
976976
}
977977

paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java

Lines changed: 3 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,16 @@
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.fs.Path;
25-
import org.apache.paimon.options.Options;
2625
import org.apache.paimon.rest.RESTApi;
27-
import org.apache.paimon.rest.RESTUtil;
26+
import org.apache.paimon.rest.RESTTokenFileIO;
2827
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
2928
import org.apache.paimon.rest.exceptions.BadRequestException;
3029
import org.apache.paimon.rest.exceptions.ForbiddenException;
3130
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
3231
import org.apache.paimon.rest.exceptions.NotImplementedException;
3332
import org.apache.paimon.rest.responses.GetDatabaseResponse;
3433
import org.apache.paimon.rest.responses.GetTableResponse;
35-
import org.apache.paimon.rest.responses.GetTableTokenResponse;
3634
import org.apache.paimon.schema.Schema;
37-
import org.apache.paimon.utils.IOUtils;
38-
import org.apache.paimon.utils.ThreadUtils;
39-
40-
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
41-
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
42-
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
4335

4436
import org.slf4j.Logger;
4537
import org.slf4j.LoggerFactory;
@@ -49,38 +41,18 @@
4941
import java.nio.file.FileAlreadyExistsException;
5042
import java.util.Collections;
5143
import java.util.List;
52-
import java.util.concurrent.Executors;
53-
import java.util.concurrent.TimeUnit;
5444

5545
import static org.apache.paimon.CoreOptions.TYPE;
5646
import static org.apache.paimon.TableType.OBJECT_TABLE;
57-
import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
58-
import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
5947

6048
/** Wrap over RESTCatalog to provide basic operations for virtual path. */
6149
public class VFSOperations {
50+
6251
private static final Logger LOG = LoggerFactory.getLogger(VFSOperations.class);
6352

6453
private final RESTApi api;
6554
private final CatalogContext context;
6655

67-
// table id -> fileIO
68-
private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE =
69-
Caffeine.newBuilder()
70-
.expireAfterAccess(30, TimeUnit.MINUTES)
71-
.maximumSize(1000)
72-
.removalListener(
73-
(ignored, value, cause) -> IOUtils.closeQuietly((FileIO) value))
74-
.scheduler(
75-
Scheduler.forScheduledExecutorService(
76-
Executors.newSingleThreadScheduledExecutor(
77-
ThreadUtils.newDaemonThreadFactory(
78-
"rest-token-file-io-scheduler"))))
79-
.build();
80-
81-
private static final Cache<String, VFSDataToken> TOKEN_CACHE =
82-
Caffeine.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).maximumSize(1000).build();
83-
8456
public VFSOperations(CatalogContext context) {
8557
this.context = context;
8658
this.api = new RESTApi(context.options());
@@ -116,9 +88,7 @@ public VFSIdentifier getVFSIdentifier(String virtualPath) throws IOException {
11688
}
11789
// Get real path
11890
StringBuilder realPath = new StringBuilder(table.getPath());
119-
boolean isTableRoot = true;
12091
if (parts.length > 2) {
121-
isTableRoot = false;
12292
if (!table.getPath().endsWith("/")) {
12393
realPath.append("/");
12494
}
@@ -129,9 +99,8 @@ public VFSIdentifier getVFSIdentifier(String virtualPath) throws IOException {
12999
}
130100
}
131101
}
132-
// Get REST token
133-
FileIO fileIO = getFileIO(new Identifier(databaseName, tableName), table);
134102

103+
FileIO fileIO = new RESTTokenFileIO(context, api, identifier, new Path(table.getPath()));
135104
if (parts.length == 2) {
136105
return new VFSTableRootIdentifier(
137106
table, realPath.toString(), fileIO, databaseName, tableName);
@@ -254,66 +223,6 @@ private void tryCreateObjectTable(Identifier identifier, Schema schema) throws I
254223
}
255224
}
256225

257-
private FileIO getFileIO(Identifier identifier, GetTableResponse table) throws IOException {
258-
VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId());
259-
if (shouldRefresh(token)) {
260-
synchronized (TOKEN_CACHE) {
261-
token = TOKEN_CACHE.getIfPresent(table.getId());
262-
if (shouldRefresh(token)) {
263-
token = refreshToken(identifier);
264-
TOKEN_CACHE.put(table.getId(), token);
265-
}
266-
}
267-
}
268-
269-
FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
270-
if (fileIO != null) {
271-
return fileIO;
272-
}
273-
274-
synchronized (FILE_IO_CACHE) {
275-
fileIO = FILE_IO_CACHE.getIfPresent(token);
276-
if (fileIO != null) {
277-
return fileIO;
278-
}
279-
280-
Options options = context.options();
281-
// the original options are not overwritten
282-
options = new Options(RESTUtil.merge(token.token(), options.toMap()));
283-
options.set(FILE_IO_ALLOW_CACHE, false);
284-
CatalogContext fileIOContext = CatalogContext.create(options);
285-
fileIO = FileIO.get(new Path(table.getPath()), fileIOContext);
286-
FILE_IO_CACHE.put(token, fileIO);
287-
return fileIO;
288-
}
289-
}
290-
291-
private boolean shouldRefresh(VFSDataToken token) {
292-
return token == null
293-
|| token.expireAtMillis() - System.currentTimeMillis()
294-
< TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
295-
}
296-
297-
private VFSDataToken refreshToken(Identifier identifier) throws IOException {
298-
LOG.info("begin refresh data token for identifier [{}]", identifier);
299-
GetTableTokenResponse response;
300-
try {
301-
response = api.loadTableToken(identifier);
302-
} catch (NoSuchResourceException e) {
303-
throw new FileNotFoundException("Table " + identifier + " not found");
304-
} catch (ForbiddenException e) {
305-
throw new IOException("No permission to access table " + identifier);
306-
}
307-
308-
LOG.info(
309-
"end refresh data token for identifier [{}] expiresAtMillis [{}]",
310-
identifier,
311-
response.getExpiresAtMillis());
312-
313-
VFSDataToken token = new VFSDataToken(response.getToken(), response.getExpiresAtMillis());
314-
return token;
315-
}
316-
317226
private GetTableResponse loadTableMetadata(Identifier identifier) throws IOException {
318227
// if the table is system table, we need to load table metadata from the system table's data
319228
// table

0 commit comments

Comments
 (0)