Skip to content

Commit 92ec36e

Browse files
authored
Fix httpfs registration (#38)
1 parent 1c9d190 commit 92ec36e

File tree

4 files changed

+110
-49
lines changed

4 files changed

+110
-49
lines changed

src/fake_filesystem.cpp

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,75 +11,75 @@ namespace {
1111
const NoDestructor<std::string> FAKE_FILESYSTEM_PREFIX {"/tmp/cache_httpfs_fake_filesystem"};
1212
} // namespace
1313

14-
CacheHttpfsFakeFsHandle::CacheHttpfsFakeFsHandle(string path, unique_ptr<FileHandle> internal_file_handle_p,
15-
CacheHttpfsFakeFileSystem &fs)
14+
ObserveHttpfsFakeFsHandle::ObserveHttpfsFakeFsHandle(string path, unique_ptr<FileHandle> internal_file_handle_p,
15+
ObserveHttpfsFakeFileSystem &fs)
1616
: FileHandle(fs, std::move(path), internal_file_handle_p->GetFlags()),
1717
internal_file_handle(std::move(internal_file_handle_p)) {
1818
}
19-
CacheHttpfsFakeFileSystem::CacheHttpfsFakeFileSystem() : local_filesystem(LocalFileSystem::CreateLocal()) {
19+
ObserveHttpfsFakeFileSystem::ObserveHttpfsFakeFileSystem() : local_filesystem(LocalFileSystem::CreateLocal()) {
2020
local_filesystem->CreateDirectory(*FAKE_FILESYSTEM_PREFIX);
2121
}
22-
bool CacheHttpfsFakeFileSystem::CanHandleFile(const string &path) {
22+
bool ObserveHttpfsFakeFileSystem::CanHandleFile(const string &path) {
2323
return StringUtil::StartsWith(path, *FAKE_FILESYSTEM_PREFIX);
2424
}
2525

26-
unique_ptr<FileHandle> CacheHttpfsFakeFileSystem::OpenFile(const string &path, FileOpenFlags flags,
27-
optional_ptr<FileOpener> opener) {
26+
unique_ptr<FileHandle> ObserveHttpfsFakeFileSystem::OpenFile(const string &path, FileOpenFlags flags,
27+
optional_ptr<FileOpener> opener) {
2828
auto file_handle = local_filesystem->OpenFile(path, flags, opener);
29-
return make_uniq<CacheHttpfsFakeFsHandle>(path, std::move(file_handle), *this);
29+
return make_uniq<ObserveHttpfsFakeFsHandle>(path, std::move(file_handle), *this);
3030
}
31-
void CacheHttpfsFakeFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
32-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
31+
void ObserveHttpfsFakeFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
32+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
3333
local_filesystem->Read(*local_filesystem_handle, buffer, nr_bytes, location);
3434
}
35-
int64_t CacheHttpfsFakeFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
36-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
35+
int64_t ObserveHttpfsFakeFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
36+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
3737
return local_filesystem->Read(*local_filesystem_handle, buffer, nr_bytes);
3838
}
3939

40-
void CacheHttpfsFakeFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
41-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
40+
void ObserveHttpfsFakeFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
41+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
4242
local_filesystem->Write(*local_filesystem_handle, buffer, nr_bytes, location);
4343
}
44-
int64_t CacheHttpfsFakeFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
45-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
44+
int64_t ObserveHttpfsFakeFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
45+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
4646
return local_filesystem->Write(*local_filesystem_handle, buffer, nr_bytes);
4747
}
48-
int64_t CacheHttpfsFakeFileSystem::GetFileSize(FileHandle &handle) {
49-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
48+
int64_t ObserveHttpfsFakeFileSystem::GetFileSize(FileHandle &handle) {
49+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
5050
return local_filesystem->GetFileSize(*local_filesystem_handle);
5151
}
52-
void CacheHttpfsFakeFileSystem::FileSync(FileHandle &handle) {
53-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
52+
void ObserveHttpfsFakeFileSystem::FileSync(FileHandle &handle) {
53+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
5454
local_filesystem->FileSync(*local_filesystem_handle);
5555
}
5656

57-
void CacheHttpfsFakeFileSystem::Seek(FileHandle &handle, idx_t location) {
58-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
57+
void ObserveHttpfsFakeFileSystem::Seek(FileHandle &handle, idx_t location) {
58+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
5959
local_filesystem->Seek(*local_filesystem_handle, location);
6060
}
61-
idx_t CacheHttpfsFakeFileSystem::SeekPosition(FileHandle &handle) {
62-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
61+
idx_t ObserveHttpfsFakeFileSystem::SeekPosition(FileHandle &handle) {
62+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
6363
return local_filesystem->SeekPosition(*local_filesystem_handle);
6464
}
65-
bool CacheHttpfsFakeFileSystem::Trim(FileHandle &handle, idx_t offset_bytes, idx_t length_bytes) {
66-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
65+
bool ObserveHttpfsFakeFileSystem::Trim(FileHandle &handle, idx_t offset_bytes, idx_t length_bytes) {
66+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
6767
return local_filesystem->Trim(*local_filesystem_handle, offset_bytes, length_bytes);
6868
}
69-
timestamp_t CacheHttpfsFakeFileSystem::GetLastModifiedTime(FileHandle &handle) {
70-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
69+
timestamp_t ObserveHttpfsFakeFileSystem::GetLastModifiedTime(FileHandle &handle) {
70+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
7171
return local_filesystem->GetLastModifiedTime(*local_filesystem_handle);
7272
}
73-
FileType CacheHttpfsFakeFileSystem::GetFileType(FileHandle &handle) {
74-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
73+
FileType ObserveHttpfsFakeFileSystem::GetFileType(FileHandle &handle) {
74+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
7575
return local_filesystem->GetFileType(*local_filesystem_handle);
7676
}
77-
void CacheHttpfsFakeFileSystem::Truncate(FileHandle &handle, int64_t new_size) {
78-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
77+
void ObserveHttpfsFakeFileSystem::Truncate(FileHandle &handle, int64_t new_size) {
78+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
7979
local_filesystem->Truncate(*local_filesystem_handle, new_size);
8080
}
81-
bool CacheHttpfsFakeFileSystem::OnDiskFile(FileHandle &handle) {
82-
auto &local_filesystem_handle = handle.Cast<CacheHttpfsFakeFsHandle>().internal_file_handle;
81+
bool ObserveHttpfsFakeFileSystem::OnDiskFile(FileHandle &handle) {
82+
auto &local_filesystem_handle = handle.Cast<ObserveHttpfsFakeFsHandle>().internal_file_handle;
8383
return local_filesystem->OnDiskFile(*local_filesystem_handle);
8484
}
8585

src/include/fake_filesystem.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
namespace duckdb {
99

1010
// Forward declaration.
11-
class CacheHttpfsFakeFileSystem;
11+
class ObserveHttpfsFakeFileSystem;
1212

13-
class CacheHttpfsFakeFsHandle : public FileHandle {
13+
class ObserveHttpfsFakeFsHandle : public FileHandle {
1414
public:
15-
CacheHttpfsFakeFsHandle(string path, unique_ptr<FileHandle> internal_file_handle_p, CacheHttpfsFakeFileSystem &fs);
16-
~CacheHttpfsFakeFsHandle() override = default;
15+
ObserveHttpfsFakeFsHandle(string path, unique_ptr<FileHandle> internal_file_handle_p,
16+
ObserveHttpfsFakeFileSystem &fs);
17+
~ObserveHttpfsFakeFsHandle() override = default;
1718
void Close() override {
1819
internal_file_handle->Close();
1920
}
@@ -22,9 +23,9 @@ class CacheHttpfsFakeFsHandle : public FileHandle {
2223
};
2324

2425
// WARNING: fake filesystem is used for testing purpose and shouldn't be used in production.
25-
class CacheHttpfsFakeFileSystem : public LocalFileSystem {
26+
class ObserveHttpfsFakeFileSystem : public LocalFileSystem {
2627
public:
27-
CacheHttpfsFakeFileSystem();
28+
ObserveHttpfsFakeFileSystem();
2829
bool CanHandleFile(const string &path) override;
2930
std::string GetName() const override {
3031
return "observefs_fake_filesystem";

src/observefs_extension.cpp

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "duckdb/common/exception.hpp"
55
#include "duckdb/common/opener_file_system.hpp"
66
#include "duckdb/common/string_util.hpp"
7+
#include "duckdb/common/unique_ptr.hpp"
78
#include "fake_filesystem.hpp"
89
#include "filesystem_ref_registry.hpp"
910
#include "hffs.hpp"
@@ -72,31 +73,75 @@ static void WrapFileSystem(const DataChunk &args, ExpressionState &state, Vector
7273
result.Reference(Value(SUCCESS));
7374
}
7475

76+
// List all registered function for the database instance.
77+
static void ListRegisteredFileSystems(DataChunk &args, ExpressionState &state, Vector &result) {
78+
auto &result_entries = ListVector::GetEntry(result);
79+
80+
// duckdb instance has a opener filesystem, which is a wrapper around virtual filesystem.
81+
auto &duckdb_instance = GetDatabaseInstance(state);
82+
auto &opener_filesystem = duckdb_instance.GetFileSystem().Cast<OpenerFileSystem>();
83+
auto &vfs = opener_filesystem.GetFileSystem();
84+
auto filesystems = vfs.ListSubSystems();
85+
std::sort(filesystems.begin(), filesystems.end());
86+
87+
// Set filesystem instances.
88+
ListVector::Reserve(result, filesystems.size());
89+
ListVector::SetListSize(result, filesystems.size());
90+
auto data = FlatVector::GetData<string_t>(result_entries);
91+
for (int idx = 0; idx < filesystems.size(); ++idx) {
92+
data[idx] = StringVector::AddString(result_entries, std::move(filesystems[idx]));
93+
}
94+
95+
// Define the list element (offset + length)
96+
auto list_data = FlatVector::GetData<list_entry_t>(result);
97+
list_data[0].offset = 0;
98+
list_data[0].length = filesystems.size();
99+
100+
// Set result as valid.
101+
FlatVector::SetValidity(result, ValidityMask(filesystems.size()));
102+
}
103+
75104
static void LoadInternal(ExtensionLoader &loader) {
76105
ObservabilityFsRefRegistry::Get().Reset();
77106

78107
// Register filesystem instance to instance.
79-
auto &instance = loader.GetDatabaseInstance();
80-
auto &fs = instance.GetFileSystem();
108+
auto &duckdb_instance = loader.GetDatabaseInstance();
109+
auto &opener_filesystem = duckdb_instance.GetFileSystem().Cast<OpenerFileSystem>();
110+
auto &vfs = opener_filesystem.GetFileSystem();
81111

82112
// TODO(hjiang): Register a fake filesystem at extension load for testing purpose. This is not ideal since
83113
// additional necessary instance is shipped in the extension. Local filesystem is not viable because it's not
84114
// registered in virtual filesystem. A better approach is find another filesystem not in httpfs extension.
85-
fs.RegisterSubSystem(make_uniq<CacheHttpfsFakeFileSystem>());
115+
vfs.RegisterSubSystem(make_uniq<ObserveHttpfsFakeFileSystem>());
86116

87117
// By default register all filesystem instances inside of httpfs.
88-
auto observability_httpfs_filesystem = make_uniq<ObservabilityFileSystem>(make_uniq<HTTPFileSystem>());
118+
//
119+
// Register http filesystem.
120+
auto http_fs = vfs.ExtractSubSystem("HTTPFileSystem");
121+
if (http_fs == nullptr) {
122+
http_fs = make_uniq<HTTPFileSystem>();
123+
}
124+
auto observability_httpfs_filesystem = make_uniq<ObservabilityFileSystem>(std::move(http_fs));
89125
ObservabilityFsRefRegistry::Get().Register(observability_httpfs_filesystem.get());
90-
fs.RegisterSubSystem(std::move(observability_httpfs_filesystem));
126+
vfs.RegisterSubSystem(std::move(observability_httpfs_filesystem));
91127

92-
auto observability_hf_filesystem = make_uniq<ObservabilityFileSystem>(make_uniq<HuggingFaceFileSystem>());
128+
// Register hugging filesystem.
129+
auto hf_fs = vfs.ExtractSubSystem("HuggingFaceFileSystem");
130+
if (hf_fs == nullptr) {
131+
hf_fs = make_uniq<HuggingFaceFileSystem>();
132+
}
133+
auto observability_hf_filesystem = make_uniq<ObservabilityFileSystem>(std::move(hf_fs));
93134
ObservabilityFsRefRegistry::Get().Register(observability_hf_filesystem.get());
94-
fs.RegisterSubSystem(std::move(observability_hf_filesystem));
135+
vfs.RegisterSubSystem(std::move(observability_hf_filesystem));
95136

96-
auto observability_s3_filesystem =
97-
make_uniq<ObservabilityFileSystem>(make_uniq<S3FileSystem>(BufferManager::GetBufferManager(instance)));
137+
// Register s3 filesystem.
138+
auto s3_fs = vfs.ExtractSubSystem("S3FileSystem");
139+
if (s3_fs == nullptr) {
140+
s3_fs = make_uniq<S3FileSystem>(BufferManager::GetBufferManager(duckdb_instance));
141+
}
142+
auto observability_s3_filesystem = make_uniq<ObservabilityFileSystem>(std::move(s3_fs));
98143
ObservabilityFsRefRegistry::Get().Register(observability_s3_filesystem.get());
99-
fs.RegisterSubSystem(std::move(observability_s3_filesystem));
144+
vfs.RegisterSubSystem(std::move(observability_s3_filesystem));
100145

101146
// Register observability data cleanup function.
102147
ScalarFunction clear_cache_function("observefs_clear", /*arguments=*/ {},
@@ -109,6 +154,13 @@ static void LoadInternal(ExtensionLoader &loader) {
109154
/*return_type=*/LogicalType::VARCHAR, GetProfileStats);
110155
loader.RegisterFunction(get_profile_stats_function);
111156

157+
// Register a function to list all existing filesystem instances, which is useful for wrapping.
158+
ScalarFunction list_registered_filesystem_function("observefs_list_registered_filesystems",
159+
/*arguments=*/ {},
160+
/*return_type=*/LogicalType::LIST(LogicalType::VARCHAR),
161+
ListRegisteredFileSystems);
162+
loader.RegisterFunction(list_registered_filesystem_function);
163+
112164
// Register a function to wrap all duckdb-vfs-compatible filesystems. By default only httpfs filesystem instances
113165
// are wrapped. Usage for the target filesystem can be used as normal.
114166
//

test/sql/extension.test

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,11 @@ query I
88
SELECT COUNT(*) FROM duckdb_extensions() WHERE extension_name = 'observefs';
99
----
1010
1
11+
12+
query I
13+
SELECT unnest(observefs_list_registered_filesystems());
14+
----
15+
observability-HTTPFileSystem
16+
observability-HuggingFaceFileSystem
17+
observability-S3FileSystem
18+
observefs_fake_filesystem

0 commit comments

Comments
 (0)