Skip to content

Commit 44a4dc4

Browse files
committed
Integrate with pg_mooncake
1 parent a0a5e44 commit 44a4dc4

13 files changed

+72
-17
lines changed

.github/workflows/MainDistributionPipeline.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jobs:
2929
duckdb_version: v1.3.2
3030
ci_tools_version: v1.3.2
3131
enable_rust: true
32+
exclude_archs: windows_amd64;windows_amd64_mingw;wasm_mvp;wasm_eh;wasm_threads
3233
extension_name: mooncake
3334

3435
code-quality-check:
@@ -38,4 +39,4 @@ jobs:
3839
duckdb_version: v1.3.2
3940
ci_tools_version: main
4041
extension_name: mooncake
41-
format_checks: 'format;tidy'
42+
format_checks: format

src/include/moonlink/moonlink.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class Moonlink {
1111

1212
DataPtr GetTableSchema(const string &schema, const string &table);
1313

14-
DataPtr ScanTableBegin(const string &schema, const string &table);
14+
DataPtr ScanTableBegin(const string &schema, const string &table, uint64_t lsn);
1515

1616
void ScanTableEnd(const string &schema, const string &table);
1717

src/include/pgmooncake.hpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include "duckdb/common/dl.hpp"
4+
5+
namespace duckdb {
6+
7+
class Pgmooncake {
8+
public:
9+
using drop_cstring_fn = void (*)(char *);
10+
using get_init_query_fn = char *(*)();
11+
using get_lsn_fn = uint64_t (*)();
12+
13+
static string GetInitQuery() {
14+
static drop_cstring_fn drop_cstring =
15+
reinterpret_cast<drop_cstring_fn>(dlsym(RTLD_DEFAULT, "pgmooncake_drop_cstring"));
16+
static get_init_query_fn get_init_query =
17+
reinterpret_cast<get_init_query_fn>(dlsym(RTLD_DEFAULT, "pgmooncake_get_init_query"));
18+
19+
if (!get_init_query) {
20+
return "";
21+
}
22+
char *init_query = get_init_query();
23+
string res(init_query);
24+
D_ASSERT(drop_cstring);
25+
drop_cstring(init_query);
26+
return res;
27+
}
28+
29+
static uint64_t GetLsn() {
30+
static get_lsn_fn get_lsn = reinterpret_cast<get_lsn_fn>(dlsym(RTLD_DEFAULT, "pgmooncake_get_lsn"));
31+
32+
return get_lsn ? get_lsn() : 0;
33+
}
34+
};
35+
36+
} // namespace duckdb

src/include/storage/mooncake_schema.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class Moonlink;
99

1010
class MooncakeSchema : public SchemaCatalogEntry {
1111
public:
12-
MooncakeSchema(Catalog &catalog, CreateSchemaInfo &info);
12+
MooncakeSchema(Catalog &catalog, CreateSchemaInfo &info, uint64_t lsn);
1313

1414
~MooncakeSchema();
1515

@@ -48,6 +48,7 @@ class MooncakeSchema : public SchemaCatalogEntry {
4848
void Alter(CatalogTransaction transaction, AlterInfo &info) override;
4949

5050
private:
51+
uint64_t lsn;
5152
Moonlink &moonlink;
5253
mutex lock;
5354
case_insensitive_map_t<unique_ptr<MooncakeTable>> tables;

src/include/storage/mooncake_table.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ class Moonlink;
99

1010
class MooncakeTable : public TableCatalogEntry {
1111
public:
12-
MooncakeTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Moonlink &moonlink);
12+
MooncakeTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, uint64_t lsn,
13+
Moonlink &moonlink);
1314

1415
~MooncakeTable();
1516

@@ -22,6 +23,7 @@ class MooncakeTable : public TableCatalogEntry {
2223
MooncakeTableMetadata &GetTableMetadata();
2324

2425
private:
26+
uint64_t lsn;
2527
Moonlink &moonlink;
2628
mutex lock;
2729
unique_ptr<MooncakeTableMetadata> metadata;

src/include/storage/mooncake_table_metadata.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class DeleteFilter;
88

99
class MooncakeTableMetadata {
1010
public:
11-
MooncakeTableMetadata(Moonlink &moonlink, const string &schema, const string &table);
11+
MooncakeTableMetadata(Moonlink &moonlink, const string &schema, const string &table, uint64_t lsn);
1212

1313
~MooncakeTableMetadata();
1414

src/include/storage/mooncake_transaction.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class MooncakeTransaction : public Transaction {
1414

1515
private:
1616
Catalog &catalog;
17+
uint64_t lsn;
1718
mutex lock;
1819
case_insensitive_map_t<unique_ptr<SchemaCatalogEntry>> schemas;
1920
};

src/mooncake_extension.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
#define DUCKDB_EXTENSION_MAIN
22

3+
#include "duckdb/main/connection.hpp"
34
#include "duckdb/main/database.hpp"
45
#include "mooncake_extension.hpp"
6+
#include "pgmooncake.hpp"
57
#include "storage/mooncake_storage.hpp"
68

79
namespace duckdb {
810

911
void MooncakeExtension::Load(DuckDB &db) {
1012
auto &config = DBConfig::GetConfig(*db.instance);
1113
config.storage_extensions["mooncake"] = make_uniq<MooncakeStorageExtension>();
14+
15+
string init_query = Pgmooncake::GetInitQuery();
16+
if (!init_query.empty()) {
17+
Connection connection(db);
18+
auto res = connection.Query(init_query);
19+
if (res->HasError()) {
20+
res->ThrowError();
21+
}
22+
}
1223
}
1324

1425
string MooncakeExtension::Name() {

src/moonlink/moonlink.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ DataPtr Moonlink::GetTableSchema(const string &schema, const string &table) {
1111
return DataPtr(moonlink_get_table_schema(stream.get(), database.c_str(), schema.c_str(), table.c_str()).Unwrap());
1212
}
1313

14-
DataPtr Moonlink::ScanTableBegin(const string &schema, const string &table) {
14+
DataPtr Moonlink::ScanTableBegin(const string &schema, const string &table, uint64_t lsn) {
1515
lock_guard<mutex> guard(lock);
1616
return DataPtr(
17-
moonlink_scan_table_begin(stream.get(), database.c_str(), schema.c_str(), table.c_str(), 0).Unwrap());
17+
moonlink_scan_table_begin(stream.get(), database.c_str(), schema.c_str(), table.c_str(), lsn).Unwrap());
1818
}
1919

2020
void Moonlink::ScanTableEnd(const string &schema, const string &table) {

src/storage/mooncake_schema.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
namespace duckdb {
1111

12-
MooncakeSchema::MooncakeSchema(Catalog &catalog, CreateSchemaInfo &info)
13-
: SchemaCatalogEntry(catalog, info), moonlink(catalog.Cast<MooncakeCatalog>().GetMoonlink()) {
12+
MooncakeSchema::MooncakeSchema(Catalog &catalog, CreateSchemaInfo &info, uint64_t lsn)
13+
: SchemaCatalogEntry(catalog, info), lsn(lsn), moonlink(catalog.Cast<MooncakeCatalog>().GetMoonlink()) {
1414
}
1515

1616
MooncakeSchema::~MooncakeSchema() = default;
@@ -96,7 +96,7 @@ optional_ptr<CatalogEntry> MooncakeSchema::LookupEntry(CatalogTransaction transa
9696
for (idx_t i = 0; i < names.size(); i++) {
9797
table_info.columns.AddColumn(ColumnDefinition(names[i], return_types[i]));
9898
}
99-
tables[table_name] = make_uniq<MooncakeTable>(catalog, *this, table_info, moonlink);
99+
tables[table_name] = make_uniq<MooncakeTable>(catalog, *this, table_info, lsn, moonlink);
100100
return *tables[table_name];
101101
}
102102

0 commit comments

Comments
 (0)