Skip to content

Commit 25f141d

Browse files
wraymofacebook-github-bot
authored andcommitted
feat: Add TokenProvider to ConnectorQueryCtx and include it in the cache key for FileHandle (facebookincubator#13919)
Summary: Summary As discussed in facebookincubator#13875 and facebookincubator#13914, we need to provide user information to the filesystem to support authentication and impersonation. This requirement can be generalized with a TokenProvider abstraction. This PR introduces an abstract `TokenProvide`r class and integrates it into both `ConnectorQueryCtx` and `FileOptions`. It also becomes part of the `FileHandle` cache key. To use it, users simply need to implement the TokenProvider interface with hash, equals, and getToken methods. The filesystem can then access relevant user or token information when reading from or writing to files. Pull Request resolved: facebookincubator#13919 Reviewed By: Yuhta Differential Revision: D78280984 Pulled By: xiaoxmeng fbshipit-source-id: fa26c566aef45a66863e0328ad633b27a974f218
1 parent 406f60a commit 25f141d

File tree

13 files changed

+133
-24
lines changed

13 files changed

+133
-24
lines changed

velox/common/file/FileSystems.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "velox/common/base/Exceptions.h"
1919
#include "velox/common/base/RuntimeMetrics.h"
20+
#include "velox/common/file/TokenProvider.h"
2021
#include "velox/common/memory/MemoryPool.h"
2122

2223
#include <functional>
@@ -84,6 +85,9 @@ struct FileOptions {
8485
/// A hint to the file system for which region size of the file should be
8586
/// read. Specifically, the read length.
8687
std::optional<int64_t> readRangeHint{std::nullopt};
88+
89+
/// A token provider that can be used to get tokens for accessing the file.
90+
std::shared_ptr<TokenProvider> tokenProvider{nullptr};
8791
};
8892

8993
/// Defines directory options

velox/common/file/TokenProvider.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <string>
20+
21+
namespace facebook::velox::filesystems {
22+
23+
/// Identifier for the file systems to implement to differentiate different
24+
/// tokens needed in the same query (user). Such information usually needs to
25+
/// be passed down and stored in the ReadFile/WriteFile object of the specific
26+
/// file system.
27+
class AccessTokenKey {
28+
virtual ~AccessTokenKey() = default;
29+
};
30+
31+
/// Abstract token each file system can implement and cast.
32+
class AccessToken {
33+
virtual ~AccessToken() = default;
34+
};
35+
36+
/// Interface for providing access tokens to file systems.
37+
class TokenProvider {
38+
public:
39+
virtual ~TokenProvider() = default;
40+
41+
virtual bool equals(const TokenProvider& other) const = 0;
42+
virtual size_t hash() const = 0;
43+
44+
virtual std::shared_ptr<AccessToken> getToken(
45+
const AccessTokenKey& key) const = 0;
46+
};
47+
48+
} // namespace facebook::velox::filesystems

velox/connectors/Connector.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "velox/common/base/SpillStats.h"
2424
#include "velox/common/caching/AsyncDataCache.h"
2525
#include "velox/common/caching/ScanTracker.h"
26+
#include "velox/common/file/TokenProvider.h"
2627
#include "velox/common/future/VeloxPromise.h"
2728
#include "velox/core/ExpressionEvaluator.h"
2829
#include "velox/type/Filter.h"
@@ -413,7 +414,8 @@ class ConnectorQueryCtx {
413414
int driverId,
414415
const std::string& sessionTimezone,
415416
bool adjustTimestampToTimezone = false,
416-
folly::CancellationToken cancellationToken = {})
417+
folly::CancellationToken cancellationToken = {},
418+
std::shared_ptr<filesystems::TokenProvider> tokenProvider = {})
417419
: operatorPool_(operatorPool),
418420
connectorPool_(connectorPool),
419421
sessionProperties_(sessionProperties),
@@ -428,7 +430,8 @@ class ConnectorQueryCtx {
428430
planNodeId_(planNodeId),
429431
sessionTimezone_(sessionTimezone),
430432
adjustTimestampToTimezone_(adjustTimestampToTimezone),
431-
cancellationToken_(std::move(cancellationToken)) {
433+
cancellationToken_(std::move(cancellationToken)),
434+
fsTokenProvider_(std::move(tokenProvider)) {
432435
VELOX_CHECK_NOT_NULL(sessionProperties);
433436
}
434437

@@ -516,6 +519,10 @@ class ConnectorQueryCtx {
516519
selectiveNimbleReaderEnabled_ = value;
517520
}
518521

522+
std::shared_ptr<filesystems::TokenProvider> fsTokenProvider() const {
523+
return fsTokenProvider_;
524+
}
525+
519526
private:
520527
memory::MemoryPool* const operatorPool_;
521528
memory::MemoryPool* const connectorPool_;
@@ -532,6 +539,7 @@ class ConnectorQueryCtx {
532539
const std::string sessionTimezone_;
533540
const bool adjustTimestampToTimezone_;
534541
const folly::CancellationToken cancellationToken_;
542+
const std::shared_ptr<filesystems::TokenProvider> fsTokenProvider_;
535543
bool selectiveNimbleReaderEnabled_{false};
536544
};
537545

velox/connectors/hive/FileHandle.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ std::string groupName(const std::string& filename) {
4040
} // namespace
4141

4242
std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
43-
const std::string& filename,
43+
const FileHandleKey& key,
4444
const FileProperties* properties,
4545
filesystems::File::IoStats* stats) {
4646
// We have seen cases where drivers are stuck when creating file handles.
@@ -53,11 +53,13 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
5353
fileHandle = std::make_unique<FileHandle>();
5454
filesystems::FileOptions options;
5555
options.stats = stats;
56+
options.tokenProvider = key.tokenProvider;
5657
if (properties) {
5758
options.fileSize = properties->fileSize;
5859
options.readRangeHint = properties->readRangeHint;
5960
options.extraFileInfo = properties->extraFileInfo;
6061
}
62+
const auto& filename = key.filename;
6163
fileHandle->file = filesystems::getFileSystem(filename, properties_)
6264
->openFileForRead(filename, options);
6365
fileHandle->uuid = StringIdLease(fileIds(), filename);

velox/connectors/hive/FileHandle.h

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525

2626
#pragma once
2727

28+
#include "velox/common/base/BitUtil.h"
2829
#include "velox/common/caching/CachedFactory.h"
2930
#include "velox/common/caching/FileIds.h"
3031
#include "velox/common/config/Config.h"
3132
#include "velox/common/file/File.h"
33+
#include "velox/common/file/TokenProvider.h"
3234
#include "velox/connectors/hive/FileProperties.h"
3335

3436
namespace facebook::velox {
@@ -59,7 +61,44 @@ struct FileHandleSizer {
5961
uint64_t operator()(const FileHandle& a);
6062
};
6163

62-
using FileHandleCache = SimpleLRUCache<std::string, FileHandle>;
64+
struct FileHandleKey {
65+
std::string filename;
66+
std::shared_ptr<filesystems::TokenProvider> tokenProvider{nullptr};
67+
68+
bool operator==(const FileHandleKey& other) const {
69+
if (filename != other.filename) {
70+
return false;
71+
}
72+
73+
if (tokenProvider == other.tokenProvider) {
74+
return true;
75+
}
76+
77+
if (!tokenProvider || !other.tokenProvider) {
78+
return false;
79+
}
80+
81+
return tokenProvider->equals(*other.tokenProvider);
82+
}
83+
};
84+
85+
} // namespace facebook::velox
86+
87+
namespace std {
88+
template <>
89+
struct hash<facebook::velox::FileHandleKey> {
90+
size_t operator()(const facebook::velox::FileHandleKey& key) const noexcept {
91+
size_t filenameHash = std::hash<std::string>()(key.filename);
92+
return key.tokenProvider ? facebook::velox::bits::hashMix(
93+
filenameHash, key.tokenProvider->hash())
94+
: filenameHash;
95+
}
96+
};
97+
} // namespace std
98+
99+
namespace facebook::velox {
100+
using FileHandleCache =
101+
SimpleLRUCache<facebook::velox::FileHandleKey, FileHandle>;
63102

64103
// Creates FileHandles via the Generator interface the CachedFactory requires.
65104
class FileHandleGenerator {
@@ -68,7 +107,7 @@ class FileHandleGenerator {
68107
FileHandleGenerator(std::shared_ptr<const config::ConfigBase> properties)
69108
: properties_(std::move(properties)) {}
70109
std::unique_ptr<FileHandle> operator()(
71-
const std::string& filename,
110+
const FileHandleKey& filename,
72111
const FileProperties* properties,
73112
filesystems::File::IoStats* stats);
74113

@@ -77,14 +116,14 @@ class FileHandleGenerator {
77116
};
78117

79118
using FileHandleFactory = CachedFactory<
80-
std::string,
119+
FileHandleKey,
81120
FileHandle,
82121
FileHandleGenerator,
83122
FileProperties,
84123
filesystems::File::IoStats,
85124
FileHandleSizer>;
86125

87-
using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;
126+
using FileHandleCachedPtr = CachedPtr<FileHandleKey, FileHandle>;
88127

89128
using FileHandleCacheStats = SimpleLRUCacheStats;
90129

velox/connectors/hive/HiveConnector.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ HiveConnector::HiveConnector(
4444
hiveConfig_(std::make_shared<HiveConfig>(config)),
4545
fileHandleFactory_(
4646
hiveConfig_->isFileHandleCacheEnabled()
47-
? std::make_unique<SimpleLRUCache<std::string, FileHandle>>(
47+
? std::make_unique<SimpleLRUCache<FileHandleKey, FileHandle>>(
4848
hiveConfig_->numCacheFileHandles())
4949
: nullptr,
5050
std::make_unique<FileHandleGenerator>(config)),

velox/connectors/hive/SplitReader.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,12 @@ void SplitReader::createReader() {
286286
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);
287287

288288
FileHandleCachedPtr fileHandleCachePtr;
289+
FileHandleKey fileHandleKey{
290+
.filename = hiveSplit_->filePath,
291+
.tokenProvider = connectorQueryCtx_->fsTokenProvider()};
289292
try {
290293
fileHandleCachePtr = fileHandleFactory_->generate(
291-
hiveSplit_->filePath,
294+
fileHandleKey,
292295
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr,
293296
fsStats_ ? fsStats_.get() : nullptr);
294297
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());

velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
9292
/*tableParameters=*/{},
9393
deleteReaderOpts);
9494

95-
auto deleteFileHandleCachePtr =
96-
fileHandleFactory_->generate(deleteFile_.filePath);
95+
const FileHandleKey fileHandleKey{
96+
.filename = deleteFile_.filePath,
97+
.tokenProvider = connectorQueryCtx_->fsTokenProvider()};
98+
auto deleteFileHandleCachePtr = fileHandleFactory_->generate(fileHandleKey);
9799
auto deleteFileInput = createBufferedInput(
98100
*deleteFileHandleCachePtr,
99101
deleteReaderOpts,

velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ void IcebergSplitReaderBenchmark::readSingleColumn(
324324
"");
325325

326326
FileHandleFactory fileHandleFactory(
327-
std::make_unique<SimpleLRUCache<std::string, FileHandle>>(
327+
std::make_unique<SimpleLRUCache<FileHandleKey, FileHandle>>(
328328
hiveConfig->numCacheFileHandles()),
329329
std::make_unique<FileHandleGenerator>(connectorSessionProperties_));
330330

velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,14 @@ TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) {
163163

164164
TEST_F(AbfsFileSystemTest, fileHandleWithProperties) {
165165
FileHandleFactory factory(
166-
std::make_unique<SimpleLRUCache<std::string, FileHandle>>(1),
166+
std::make_unique<SimpleLRUCache<FileHandleKey, FileHandle>>(1),
167167
std::make_unique<FileHandleGenerator>(azuriteServer_->hiveConfig()));
168168
FileProperties properties = {15 + kOneMB, 1};
169-
auto fileHandleProperties =
170-
factory.generate(azuriteServer_->fileURI(), &properties);
169+
FileHandleKey key{azuriteServer_->fileURI()};
170+
auto fileHandleProperties = factory.generate(key, &properties);
171171
readData(fileHandleProperties->file.get());
172172

173-
auto fileHandleWithoutProperties =
174-
factory.generate(azuriteServer_->fileURI());
173+
auto fileHandleWithoutProperties = factory.generate(key);
175174
readData(fileHandleWithoutProperties->file.get());
176175
}
177176

0 commit comments

Comments
 (0)