Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ version: v1
lint:
use:
- DEFAULT
except:
- FIELD_NOT_REQUIRED
- ONEOF_NOT_REQUIRED
rules:
# Set maximum line length to 100 characters
max_line_length: 100
enum_zero_value_suffix: _UNSPECIFIED
rpc_allow_same_request_response: false
rpc_allow_google_protobuf_empty_requests: true
rpc_allow_google_protobuf_empty_responses: true

# Exclude generated files from linting
lint:
ignore:
- build
- vcpkg_installed
30 changes: 27 additions & 3 deletions src/client/distributed_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,36 @@

namespace duckdb {

DistributedClient::DistributedClient(string server_url_p) : server_url(std::move(server_url_p)) {
client = make_uniq<DistributedFlightClient>(server_url);
DistributedClient::DistributedClient(string server_url_p, string db_path_p)
: server_url(std::move(server_url_p)), db_path(std::move(db_path_p)) {
client = make_uniq<DistributedFlightClient>(server_url, db_path);
auto status = client->Connect();
if (!status.ok()) {
throw Exception(ExceptionType::CONNECTION, "Failed to connect to Flight server: " + status.ToString());
}
}

DistributedClient &DistributedClient::GetInstance() {
/*static*/ DistributedClient &DistributedClient::GetInstance() {
static NoDestructor<DistributedClient> client {};
return *client;
}

/*static*/ void DistributedClient::Configure(const string &server_url_param, const string &db_path_param) {
auto &instance = GetInstance();

// Reconfigure if either server_url or db_path changed.
if (instance.server_url != server_url_param || instance.db_path != db_path_param) {
instance.server_url = server_url_param;
instance.db_path = db_path_param;
instance.client = make_uniq<DistributedFlightClient>(server_url_param, db_path_param);
auto status = instance.client->Connect();
if (!status.ok()) {
throw Exception(ExceptionType::CONNECTION,
"Failed to connect to Arrow Flight server: " + status.ToString());
}
}
}

unique_ptr<QueryResult> DistributedClient::ScanTable(const string &table_name, idx_t limit, idx_t offset) {
std::unique_ptr<arrow::flight::FlightStreamReader> stream;
auto status = client->ScanTable(table_name, limit, offset, stream);
Expand Down Expand Up @@ -192,4 +209,11 @@ unique_ptr<QueryResult> DistributedClient::InsertInto(const string &insert_sql)
return ExecuteSQL(insert_sql);
}

void DistributedClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) {
auto status = client->GetCatalogInfo(response);
if (!status.ok()) {
throw Exception(ExceptionType::INVALID, StringUtil::Format("Failed to get catalog: %s", status.ToString()));
}
}

} // namespace duckdb
27 changes: 25 additions & 2 deletions src/client/distributed_flight_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace duckdb {

DistributedFlightClient::DistributedFlightClient(string server_url_p) : server_url(std::move(server_url_p)) {
DistributedFlightClient::DistributedFlightClient(string server_url_p, string db_path_p)
: server_url(std::move(server_url_p)), db_path(std::move(db_path_p)) {
}

arrow::Status DistributedFlightClient::Connect() {
Expand All @@ -18,6 +19,7 @@ arrow::Status DistributedFlightClient::Connect() {

arrow::Status DistributedFlightClient::ExecuteSQL(const string &sql, distributed::DistributedResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *exec_req = req.mutable_execute_sql();
exec_req->set_sql(sql);
return SendAction(req, response);
Expand All @@ -26,13 +28,15 @@ arrow::Status DistributedFlightClient::ExecuteSQL(const string &sql, distributed
arrow::Status DistributedFlightClient::CreateTable(const string &create_sql,
distributed::DistributedResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *create_req = req.mutable_create_table();
create_req->set_sql(create_sql);
return SendAction(req, response);
}

arrow::Status DistributedFlightClient::DropTable(const string &drop_sql, distributed::DistributedResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *drop_req = req.mutable_drop_table();
drop_req->set_table_name(drop_sql);
return SendAction(req, response);
Expand All @@ -41,20 +45,23 @@ arrow::Status DistributedFlightClient::DropTable(const string &drop_sql, distrib
arrow::Status DistributedFlightClient::CreateIndex(const string &create_sql,
distributed::DistributedResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *create_req = req.mutable_create_index();
create_req->set_sql(create_sql);
return SendAction(req, response);
}

arrow::Status DistributedFlightClient::DropIndex(const string &index_name, distributed::DistributedResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *drop_req = req.mutable_drop_index();
drop_req->set_index_name(index_name);
return SendAction(req, response);
}

arrow::Status DistributedFlightClient::TableExists(const string &table_name, bool &exists) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *exists_req = req.mutable_table_exists();
exists_req->set_table_name(table_name);

Expand All @@ -68,9 +75,24 @@ arrow::Status DistributedFlightClient::TableExists(const string &table_name, boo
return arrow::Status::OK();
}

arrow::Status DistributedFlightClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *catalog_req = req.mutable_get_catalog_info();

distributed::DistributedResponse resp;
ARROW_RETURN_NOT_OK(SendAction(req, resp));
if (!resp.success()) {
return arrow::Status::Invalid(resp.error_message());
}

response = resp.get_catalog_info();
return arrow::Status::OK();
}

arrow::Status DistributedFlightClient::InsertData(const string &table_name, std::shared_ptr<arrow::RecordBatch> batch,
distributed::DistributedResponse &response) {
arrow::flight::FlightDescriptor descriptor = arrow::flight::FlightDescriptor::Path({table_name});
arrow::flight::FlightDescriptor descriptor = arrow::flight::FlightDescriptor::Path({db_path, table_name});

std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
Expand Down Expand Up @@ -102,6 +124,7 @@ arrow::Status DistributedFlightClient::InsertData(const string &table_name, std:
arrow::Status DistributedFlightClient::ScanTable(const string &table_name, uint64_t limit, uint64_t offset,
std::unique_ptr<arrow::flight::FlightStreamReader> &stream) {
distributed::DistributedRequest req;
req.set_db_path(db_path);
auto *scan_req = req.mutable_scan_table();
scan_req->set_table_name(table_name);
scan_req->set_limit(limit);
Expand Down
70 changes: 69 additions & 1 deletion src/client/duckherder_catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckherder_catalog.hpp"

#include "distributed_client.hpp"
#include "distributed_delete.hpp"
#include "distributed_insert.hpp"
#include "duckdb/catalog/duck_catalog.hpp"
Expand All @@ -9,11 +10,15 @@
#include "duckdb/common/unique_ptr.hpp"
#include "duckdb/logging/logger.hpp"
#include "duckdb/main/attached_database.hpp"
#include "duckdb/parser/column_definition.hpp"
#include "duckdb/parser/constraints/not_null_constraint.hpp"
#include "duckdb/parser/expression/columnref_expression.hpp"
#include "duckdb/parser/parsed_data/alter_table_info.hpp"
#include "duckdb/parser/parsed_data/create_index_info.hpp"
#include "duckdb/parser/parsed_data/create_schema_info.hpp"
#include "duckdb/parser/parsed_data/create_table_info.hpp"
#include "duckdb/parser/statement/create_statement.hpp"
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/expression/bound_columnref_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/planner/logical_operator.hpp"
Expand All @@ -37,6 +42,69 @@ DuckherderCatalog::~DuckherderCatalog() = default;

void DuckherderCatalog::Initialize(bool load_builtin) {
duckdb_catalog->Initialize(load_builtin);
auto server_url = GetServerUrl();
DistributedClient::Configure(server_url, server_db_path);
}

void DuckherderCatalog::FinalizeLoad(optional_ptr<ClientContext> context) {
// Automatically sync catalog from server when database is loaded
if (context && !server_db_path.empty()) {
std::cerr << "[DuckherderCatalog::FinalizeLoad] Starting automatic catalog sync from server for database: " << server_db_path << std::endl;
SyncCatalogFromServer(*context);
std::cerr << "[DuckherderCatalog::FinalizeLoad] Catalog sync completed" << std::endl;
}
}

void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) {
auto &client = DistributedClient::GetInstance();
distributed::GetCatalogInfoResponse catalog_info;

std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Fetching catalog info from server" << std::endl;

client.GetCatalogInfo(catalog_info);

std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Found " << catalog_info.tables_size() << " tables in server database" << std::endl;

// Create tables in the local catalog using CREATE TABLE via the context
auto db_name = GetName();
for (int i = 0; i < catalog_info.tables_size(); i++) {
const auto &table_info = catalog_info.tables(i);

std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Processing table " << table_info.schema_name() << "." << table_info.table_name()
<< " with " << table_info.columns_size() << " columns" << std::endl;

// Build CREATE TABLE SQL with full qualification
string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (",
db_name,
table_info.schema_name(),
table_info.table_name());

for (int j = 0; j < table_info.columns_size(); j++) {
const auto &col = table_info.columns(j);
if (j > 0) {
create_sql += ", ";
}
create_sql += StringUtil::Format("%s %s", col.name(), col.type());
if (!col.nullable()) {
create_sql += " NOT NULL";
}
}
create_sql += ")";

// Execute CREATE TABLE using the context (this creates it locally in this catalog)
auto result = context.Query(create_sql, false);
if (result->HasError()) {
throw Exception(ExceptionType::EXECUTOR, "Failed to create table: " + result->GetError());
}

std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Created local table: " << create_sql << std::endl;

// Automatically register as remote table
RegisterRemoteTable(table_info.table_name(), GetServerUrl(), table_info.table_name());
std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Registered remote table: " << table_info.table_name() << std::endl;
}

// TODO: Sync indexes as well
}

optional_ptr<CatalogEntry> DuckherderCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) {
Expand Down Expand Up @@ -265,7 +333,7 @@ bool DuckherderCatalog::IsRemoteIndex(const string &index_name) const {
}

string DuckherderCatalog::GetServerUrl() const {
return StringUtil::Format("http://%s:%d", server_host, server_port);
return StringUtil::Format("grpc://%s:%d", server_host, server_port);
}

} // namespace duckdb
11 changes: 10 additions & 1 deletion src/include/client/distributed_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ namespace duckdb {

class DistributedClient {
public:
explicit DistributedClient(string server_url_p = "grpc://localhost:8815");
explicit DistributedClient(string server_url_p = "grpc://localhost:8815", string db_path_p = "");
~DistributedClient() = default;

// Get client singleton.
static DistributedClient &GetInstance();

// Configure the singleton instance with server details.
static void Configure(const string &server_url, const string &db_path);

// Execute arbitrary SQL on the server.
unique_ptr<QueryResult> ExecuteSQL(const string &sql);

Expand All @@ -39,15 +43,20 @@ class DistributedClient {
unique_ptr<QueryResult> DropIndex(const string &index_name);

// INSERT INTO on server.
//
// TODO(hjiang): Currently for implementation easy, directly execute SQL statements, should be use transfer rows and
// table name.
unique_ptr<QueryResult> InsertInto(const string &insert_sql);

// Get table data.
unique_ptr<QueryResult> ScanTable(const string &table_name, idx_t limit = 1000, idx_t offset = 0);

// Get catalog information from the server.
void GetCatalogInfo(distributed::GetCatalogInfoResponse &response);

private:
string server_url;
string db_path;
unique_ptr<DistributedFlightClient> client;
};

Expand Down
6 changes: 5 additions & 1 deletion src/include/client/distributed_flight_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace duckdb {

class DistributedFlightClient {
public:
explicit DistributedFlightClient(string server_url);
explicit DistributedFlightClient(string server_url, string db_path = "");
~DistributedFlightClient() = default;

// Connect to server.
Expand All @@ -39,6 +39,9 @@ class DistributedFlightClient {
// Check if table exists.
arrow::Status TableExists(const string &table_name, bool &exists);

// Get catalog information (tables, columns, indexes).
arrow::Status GetCatalogInfo(distributed::GetCatalogInfoResponse &response);

// Insert data using Arrow RecordBatch.
arrow::Status InsertData(const string &table_name, std::shared_ptr<arrow::RecordBatch> batch,
distributed::DistributedResponse &response);
Expand All @@ -53,6 +56,7 @@ class DistributedFlightClient {

private:
string server_url;
string db_path;
arrow::flight::Location location;
std::unique_ptr<arrow::flight::FlightClient> client;
};
Expand Down
3 changes: 3 additions & 0 deletions src/include/client/duckherder_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class DuckherderCatalog : public DuckCatalog {
~DuckherderCatalog() override;

void Initialize(bool load_builtin) override;
void FinalizeLoad(optional_ptr<ClientContext> context) override;

void SyncCatalogFromServer(ClientContext &context);

string GetCatalogType() override {
return "duckherder";
Expand Down
Loading