Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions database/backends/postgresql_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,325 @@ kcenon::common::VoidResult postgresql_backend::execute_query(const std::string&
#endif
}

kcenon::common::Result<core::database_result> postgresql_backend::select_prepared(
const std::string& query,
const std::vector<core::database_value>& params)
{
if (!is_initialized()) {
last_error_ = "Backend not initialized";
return kcenon::common::error_info{
static_cast<int>(database::error_code::invalid_state),
last_error_,
"postgresql_backend"
};
}

core::database_result result;

#ifdef USE_POSTGRESQL
if (!connection_) {
last_error_ = "No active connection";
return kcenon::common::error_info{
static_cast<int>(database::error_code::connection_failed),
last_error_,
"postgresql_backend"
};
}
try {
pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
pqxx::work txn(*conn);

// Build pqxx::params from database_value vector
pqxx::params pq_params;
for (const auto& val : params) {
std::visit([&pq_params](const auto& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
pq_params.append();
} else if constexpr (std::is_same_v<T, bool>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, int64_t>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, double>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, std::string>) {
pq_params.append(v);
}
}, val);
}

pqxx::result pqxx_result = txn.exec_params(query, pq_params);
txn.commit();

for (const auto& row : pqxx_result) {
core::database_row db_row;
for (size_t i = 0; i < row.size(); ++i) {
std::string column_name = pqxx_result.column_name(i);
if (row[i].is_null()) {
db_row[column_name] = nullptr;
} else {
if (row[i].type() == PG_INT8OID ||
row[i].type() == PG_INT4OID) {
db_row[column_name] = row[i].as<int64_t>();
} else if (row[i].type() == PG_FLOAT8OID ||
row[i].type() == PG_FLOAT4OID) {
db_row[column_name] = row[i].as<double>();
} else if (row[i].type() == PG_BOOLOID) {
db_row[column_name] = row[i].as<bool>();
} else {
db_row[column_name] = row[i].as<std::string>();
}
}
}
result.push_back(std::move(db_row));
}
} catch (const std::exception& e) {
last_error_ = std::string("Select prepared error: ") + e.what();
logger_.error("select_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}
#elif defined(HAVE_LIBPQ)
if (!connection_) {
last_error_ = "No active connection";
return kcenon::common::error_info{
static_cast<int>(database::error_code::connection_failed),
last_error_,
"postgresql_backend"
};
}
try {
// Convert params to C string array for PQexecParams
std::vector<std::string> param_strings;
std::vector<const char*> param_values;
param_strings.reserve(params.size());
param_values.reserve(params.size());

for (const auto& val : params) {
std::visit([&param_strings, &param_values](const auto& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
param_strings.emplace_back();
param_values.push_back(nullptr);
} else if constexpr (std::is_same_v<T, bool>) {
param_strings.push_back(v ? "t" : "f");
param_values.push_back(param_strings.back().c_str());
} else if constexpr (std::is_same_v<T, std::string>) {
param_strings.push_back(v);
param_values.push_back(param_strings.back().c_str());
} else {
param_strings.push_back(std::to_string(v));
param_values.push_back(param_strings.back().c_str());
}
}, val);
}

PGresult* pg_result = PQexecParams(
static_cast<PGconn*>(connection_),
query.c_str(),
static_cast<int>(params.size()),
nullptr, // let server infer types
param_values.data(),
nullptr, // text format lengths
nullptr, // text format
0 // text result format
);

if (PQresultStatus(pg_result) != PGRES_TUPLES_OK) {
last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
PQclear(pg_result);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}

int rows = PQntuples(pg_result);
int cols = PQnfields(pg_result);

for (int row = 0; row < rows; ++row) {
core::database_row db_row;
for (int col = 0; col < cols; ++col) {
std::string column_name = PQfname(pg_result, col);
if (PQgetisnull(pg_result, row, col)) {
db_row[column_name] = nullptr;
} else {
const char* value = PQgetvalue(pg_result, row, col);
Oid type = PQftype(pg_result, col);

if (type == 20 || type == 21 || type == 23) {
db_row[column_name] = static_cast<int64_t>(std::stoll(value));
} else if (type == 700 || type == 701) {
db_row[column_name] = std::stod(value);
} else if (type == 16) {
db_row[column_name] = (*value == 't' || *value == '1');
} else {
db_row[column_name] = std::string(value);
}
}
}
result.push_back(std::move(db_row));
}
PQclear(pg_result);
} catch (const std::exception& e) {
last_error_ = std::string("Select prepared error: ") + e.what();
logger_.error("select_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}
#else
// Fallback to string interpolation for mock mode
return database_backend::select_prepared(query, params);
#endif

last_error_.clear();
return result;
}

kcenon::common::VoidResult postgresql_backend::execute_prepared(
const std::string& query,
const std::vector<core::database_value>& params)
{
if (!is_initialized()) {
last_error_ = "Backend not initialized";
return kcenon::common::error_info{
static_cast<int>(database::error_code::invalid_state),
last_error_,
"postgresql_backend"
};
}

#ifdef USE_POSTGRESQL
try {
if (!connection_) {
last_error_ = "No active PostgreSQL connection";
logger_.error("execute_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::connection_failed),
last_error_,
"postgresql_backend"
};
}

pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
pqxx::work txn(*conn);

pqxx::params pq_params;
for (const auto& val : params) {
std::visit([&pq_params](const auto& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
pq_params.append();
} else if constexpr (std::is_same_v<T, bool>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, int64_t>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, double>) {
pq_params.append(v);
} else if constexpr (std::is_same_v<T, std::string>) {
pq_params.append(v);
}
}, val);
}

txn.exec_params(query, pq_params);
txn.commit();
last_error_.clear();
return kcenon::common::ok();
} catch (const std::exception& e) {
last_error_ = std::string("Execute prepared error: ") + e.what();
logger_.error("execute_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}
#elif defined(HAVE_LIBPQ)
if (!connection_) {
last_error_ = "No active PostgreSQL connection";
logger_.error("execute_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::connection_failed),
last_error_,
"postgresql_backend"
};
}

std::vector<std::string> param_strings;
std::vector<const char*> param_values;
param_strings.reserve(params.size());
param_values.reserve(params.size());

for (const auto& val : params) {
std::visit([&param_strings, &param_values](const auto& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
param_strings.emplace_back();
param_values.push_back(nullptr);
} else if constexpr (std::is_same_v<T, bool>) {
param_strings.push_back(v ? "t" : "f");
param_values.push_back(param_strings.back().c_str());
} else if constexpr (std::is_same_v<T, std::string>) {
param_strings.push_back(v);
param_values.push_back(param_strings.back().c_str());
} else {
param_strings.push_back(std::to_string(v));
param_values.push_back(param_strings.back().c_str());
}
}, val);
}

PGresult* pg_result = PQexecParams(
static_cast<PGconn*>(connection_),
query.c_str(),
static_cast<int>(params.size()),
nullptr,
param_values.data(),
nullptr,
nullptr,
0
);

if (pg_result == nullptr) {
last_error_ = "PostgreSQL execute prepared failed";
logger_.error("execute_prepared", last_error_);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}

ExecStatusType status = PQresultStatus(pg_result);
bool success = (status == PGRES_COMMAND_OK) || (status == PGRES_TUPLES_OK);

if (!success) {
last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
logger_.error("execute_prepared", last_error_);
PQclear(pg_result);
return kcenon::common::error_info{
static_cast<int>(database::error_code::query_failed),
last_error_,
"postgresql_backend"
};
}

PQclear(pg_result);
last_error_.clear();
return kcenon::common::ok();
#else
return database_backend::execute_prepared(query, params);
#endif
}

kcenon::common::VoidResult postgresql_backend::begin_transaction()
{
if (!is_initialized()) {
Expand Down
8 changes: 8 additions & 0 deletions database/backends/postgresql_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ class postgresql_backend

kcenon::common::VoidResult execute_query(const std::string& query_string) override;

[[nodiscard]] kcenon::common::Result<core::database_result> select_prepared(
const std::string& query,
const std::vector<core::database_value>& params) override;

[[nodiscard]] kcenon::common::VoidResult execute_prepared(
const std::string& query,
const std::vector<core::database_value>& params) override;

kcenon::common::VoidResult begin_transaction() override;

kcenon::common::VoidResult commit_transaction() override;
Expand Down
Loading
Loading