Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Move catalog initializations to Catalog bootstrap #1447

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
7 changes: 6 additions & 1 deletion src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "catalog/table_catalog.h"
#include "catalog/table_metrics_catalog.h"
#include "catalog/trigger_catalog.h"
#include "catalog/zone_map_catalog.h"
#include "codegen/code_context.h"
#include "concurrency/transaction_manager_factory.h"
#include "function/date_functions.h"
Expand Down Expand Up @@ -349,9 +350,13 @@ void Catalog::Bootstrap() {
SettingsCatalog::GetInstance(txn);
LanguageCatalog::GetInstance(txn);

// TODO: change pg_proc to per database
// TODO: change following catalogs to per database
ProcCatalog::GetInstance(txn);

if (settings::SettingsManager::GetBool(settings::SettingId::zone_map)) {
ZoneMapCatalog::GetInstance(txn);
}

if (settings::SettingsManager::GetBool(settings::SettingId::brain)) {
QueryHistoryCatalog::GetInstance(txn);
}
Expand Down
29 changes: 7 additions & 22 deletions src/catalog/column_stats_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@
namespace peloton {
namespace catalog {

ColumnStatsCatalog *ColumnStatsCatalog::GetInstance(
concurrency::TransactionContext *txn) {
static ColumnStatsCatalog column_stats_catalog{txn};
return &column_stats_catalog;
}

ColumnStatsCatalog::ColumnStatsCatalog(concurrency::TransactionContext *txn)
: AbstractCatalog(txn, "CREATE TABLE " CATALOG_DATABASE_NAME
ColumnStatsCatalog::ColumnStatsCatalog(concurrency::TransactionContext *txn,
const std::string &database_name)
: AbstractCatalog(txn, "CREATE TABLE " + database_name +
"." CATALOG_SCHEMA_NAME "." COLUMN_STATS_CATALOG_NAME
" ("
"database_id INT NOT NULL, "
"table_id INT NOT NULL, "
"column_id INT NOT NULL, "
"num_rows INT NOT NULL, "
Expand All @@ -44,28 +38,27 @@ ColumnStatsCatalog::ColumnStatsCatalog(concurrency::TransactionContext *txn)
"has_index BOOLEAN);") {
// unique key: (database_id, table_id, column_id)
Catalog::GetInstance()->CreateIndex(txn,
CATALOG_DATABASE_NAME,
database_name,
CATALOG_SCHEMA_NAME,
COLUMN_STATS_CATALOG_NAME,
COLUMN_STATS_CATALOG_NAME "_skey0",
{0, 1, 2},
{ColumnId::TABLE_ID, ColumnId::COLUMN_ID},
true,
IndexType::BWTREE);
// non-unique key: (database_id, table_id)
Catalog::GetInstance()->CreateIndex(txn,
CATALOG_DATABASE_NAME,
database_name,
CATALOG_SCHEMA_NAME,
COLUMN_STATS_CATALOG_NAME,
COLUMN_STATS_CATALOG_NAME "_skey1",
{0, 1},
{ColumnId::TABLE_ID},
false,
IndexType::BWTREE);
}

ColumnStatsCatalog::~ColumnStatsCatalog() {}

bool ColumnStatsCatalog::InsertColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id,
std::string column_name,
Expand All @@ -80,7 +73,6 @@ bool ColumnStatsCatalog::InsertColumnStats(concurrency::TransactionContext *txn,
std::unique_ptr<storage::Tuple> tuple(
new storage::Tuple(catalog_table_->GetSchema(), true));

auto val_db_id = type::ValueFactory::GetIntegerValue(database_id);
auto val_table_id = type::ValueFactory::GetIntegerValue(table_id);
auto val_column_id = type::ValueFactory::GetIntegerValue(column_id);
auto val_num_row = type::ValueFactory::GetIntegerValue(num_rows);
Expand Down Expand Up @@ -110,7 +102,6 @@ bool ColumnStatsCatalog::InsertColumnStats(concurrency::TransactionContext *txn,
type::ValueFactory::GetVarcharValue(column_name);
type::Value val_has_index = type::ValueFactory::GetBooleanValue(has_index);

tuple->SetValue(ColumnId::DATABASE_ID, val_db_id, nullptr);
tuple->SetValue(ColumnId::TABLE_ID, val_table_id, nullptr);
tuple->SetValue(ColumnId::COLUMN_ID, val_column_id, nullptr);
tuple->SetValue(ColumnId::NUM_ROWS, val_num_row, nullptr);
Expand All @@ -127,21 +118,18 @@ bool ColumnStatsCatalog::InsertColumnStats(concurrency::TransactionContext *txn,
}

bool ColumnStatsCatalog::DeleteColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id) {
oid_t index_offset = IndexId::SECONDARY_KEY_0; // Secondary key index

std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(database_id).Copy());
values.push_back(type::ValueFactory::GetIntegerValue(table_id).Copy());
values.push_back(type::ValueFactory::GetIntegerValue(column_id).Copy());

return DeleteWithIndexScan(txn, index_offset, values);
}

std::unique_ptr<std::vector<type::Value>> ColumnStatsCatalog::GetColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id) {
std::vector<oid_t> column_ids(
Expand All @@ -151,7 +139,6 @@ std::unique_ptr<std::vector<type::Value>> ColumnStatsCatalog::GetColumnStats(con
oid_t index_offset = IndexId::SECONDARY_KEY_0; // Secondary key index

std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(database_id).Copy());
values.push_back(type::ValueFactory::GetIntegerValue(table_id).Copy());
values.push_back(type::ValueFactory::GetIntegerValue(column_id).Copy());

Expand Down Expand Up @@ -194,7 +181,6 @@ std::unique_ptr<std::vector<type::Value>> ColumnStatsCatalog::GetColumnStats(con

// Return value: number of column stats
size_t ColumnStatsCatalog::GetTableStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
std::map<oid_t,
std::unique_ptr<std::vector<type::Value>>> &column_stats_map) {
Expand All @@ -206,7 +192,6 @@ size_t ColumnStatsCatalog::GetTableStats(concurrency::TransactionContext *txn,
oid_t index_offset = IndexId::SECONDARY_KEY_1; // Secondary key index

std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(database_id).Copy());
values.push_back(type::ValueFactory::GetIntegerValue(table_id).Copy());

auto result_tiles =
Expand Down
9 changes: 8 additions & 1 deletion src/catalog/system_catalogs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ SystemCatalogs::SystemCatalogs(concurrency::TransactionContext *txn,
: pg_trigger_(nullptr),
pg_table_metrics_(nullptr),
pg_index_metrics_(nullptr),
pg_query_metrics_(nullptr) {
pg_query_metrics_(nullptr),
pg_column_stats_(nullptr) {
oid_t database_oid = database->GetOid();

pg_attribute_ = new ColumnCatalog(txn, database, pool);
Expand Down Expand Up @@ -92,6 +93,7 @@ SystemCatalogs::~SystemCatalogs() {
if (pg_table_metrics_) delete pg_table_metrics_;
if (pg_index_metrics_) delete pg_index_metrics_;
if (pg_query_metrics_) delete pg_query_metrics_;
if (pg_column_stats_) delete pg_column_stats_;
}

/*@brief using sql create statement to create secondary catalog tables
Expand Down Expand Up @@ -122,6 +124,11 @@ void SystemCatalogs::Bootstrap(concurrency::TransactionContext *txn,
pg_query_metrics_ = new QueryMetricsCatalog(txn, database_name);
}

if (!pg_column_stats_) {
pg_column_stats_ = new ColumnStatsCatalog(txn, database_name);
}


// Reset oid of each catalog to avoid collisions between catalog
// values added by system and users when checkpoint recovery.
pg_attribute_->UpdateOid(OID_FOR_USER_OFFSET);
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catalog_defaults.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace catalog {
// Local oids from START_OID = 0 to START_OID + OID_OFFSET are reserved
#define OID_OFFSET 100
#define OID_FOR_USER_OFFSET 10000
#define CATALOG_TABLES_COUNT 10
#define CATALOG_TABLES_COUNT 11

// Oid mask for each type
#define DATABASE_OID_MASK (static_cast<oid_t>(catalog::CatalogType::DATABASE))
Expand Down
42 changes: 17 additions & 25 deletions src/include/catalog/column_stats_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ namespace catalog {

class ColumnStatsCatalog : public AbstractCatalog {
public:
~ColumnStatsCatalog();
ColumnStatsCatalog(concurrency::TransactionContext *txn,
const std::string &database_name);

// Global Singleton
static ColumnStatsCatalog *GetInstance(
concurrency::TransactionContext *txn = nullptr);
~ColumnStatsCatalog();

//===--------------------------------------------------------------------===//
// write Related API
//===--------------------------------------------------------------------===//
bool InsertColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id,
std::string column_name,
Expand All @@ -71,40 +69,22 @@ class ColumnStatsCatalog : public AbstractCatalog {
type::AbstractPool *pool);

bool DeleteColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id);

//===--------------------------------------------------------------------===//
// Read-only Related API
//===--------------------------------------------------------------------===//
std::unique_ptr<std::vector<type::Value>> GetColumnStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
oid_t column_id);

size_t GetTableStats(concurrency::TransactionContext *txn,
oid_t database_id,
oid_t table_id,
std::map<oid_t, std::unique_ptr<std::vector<type::Value>>>
&column_stats_map);
// TODO: add more if needed

enum ColumnId {
DATABASE_ID = 0,
TABLE_ID = 1,
COLUMN_ID = 2,
NUM_ROWS = 3,
CARDINALITY = 4,
FRAC_NULL = 5,
MOST_COMMON_VALS = 6,
MOST_COMMON_FREQS = 7,
HISTOGRAM_BOUNDS = 8,
COLUMN_NAME = 9,
HAS_INDEX = 10,
// Add new columns here in creation order
};

// TODO: add more if needed
enum ColumnStatsOffset {
NUM_ROWS_OFF = 0,
CARDINALITY_OFF = 1,
Expand All @@ -117,7 +97,19 @@ class ColumnStatsCatalog : public AbstractCatalog {
};

private:
ColumnStatsCatalog(concurrency::TransactionContext *txn);
enum ColumnId {
TABLE_ID = 0,
COLUMN_ID = 1,
NUM_ROWS = 2,
CARDINALITY = 3,
FRAC_NULL = 4,
MOST_COMMON_VALS = 5,
MOST_COMMON_FREQS = 6,
HISTOGRAM_BOUNDS = 7,
COLUMN_NAME = 8,
HAS_INDEX = 9,
// Add new columns here in creation order
};

enum IndexId {
SECONDARY_KEY_0 = 0,
Expand Down
9 changes: 9 additions & 0 deletions src/include/catalog/system_catalogs.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <mutex>

#include "catalog/column_stats_catalog.h"
#include "catalog/constraint_catalog.h"
#include "catalog/database_catalog.h"
#include "catalog/index_metrics_catalog.h"
Expand Down Expand Up @@ -125,6 +126,13 @@ class SystemCatalogs {
return pg_query_metrics_;
}

ColumnStatsCatalog *GetColumnStatsCatalog() {
if (!pg_column_stats_) {
throw CatalogException("Column stats catalog has not been initialized");
}
return pg_column_stats_;
}

private:
ColumnCatalog *pg_attribute_;
SchemaCatalog *pg_namespace_;
Expand All @@ -138,6 +146,7 @@ class SystemCatalogs {
TableMetricsCatalog *pg_table_metrics_;
IndexMetricsCatalog *pg_index_metrics_;
QueryMetricsCatalog *pg_query_metrics_;
ColumnStatsCatalog * pg_column_stats_;
};

} // namespace catalog
Expand Down
6 changes: 2 additions & 4 deletions src/include/optimizer/stats/stats_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
namespace peloton {

namespace storage {
class Database;
class DataTable;
}

Expand All @@ -41,10 +42,6 @@ class StatsStorage {

StatsStorage();

/* Functions for managing stats table and schema */

void CreateStatsTableInCatalog();

/* Functions for adding, updating and quering stats */

void InsertOrUpdateTableStats(storage::DataTable *table,
Expand Down Expand Up @@ -72,6 +69,7 @@ class StatsStorage {
/* Functions for triggerring stats collection */

ResultType AnalyzeStatsForAllTables(
storage::Database *database,
concurrency::TransactionContext *txn = nullptr);

ResultType AnalyzeStatsForTable(
Expand Down
34 changes: 19 additions & 15 deletions src/include/settings/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
//===----------------------------------------------------------------------===//
// Peloton port
SETTING_int(port,
"Peloton port (default: 15721)",
15721,
1024, 65535,
false, false)
"Peloton port (default: 15721)",
15721,
1024, 65535,
false, false)

// Maximum number of connections
SETTING_int(max_connections,
"Maximum number of connections (default: 64)",
64,
1, 512,
true, true)
"Maximum number of connections (default: 64)",
64,
1, 512,
true, true)

SETTING_int(rpc_port,
"Peloton rpc port (default: 15445)",
Expand Down Expand Up @@ -83,11 +83,11 @@ SETTING_string(root_cert_file,
//===----------------------------------------------------------------------===//

SETTING_double(bnlj_buffer_size,
"The default buffer size to use for blockwise nested loop joins (default: 1 MB)",
1.0 * 1024.0 * 1024.0,
1.0 * 1024,
1.0 * 1024.0 * 1024.0 * 1024,
true, true)
"The default buffer size to use for blockwise nested loop joins (default: 1 MB)",
1.0 * 1024.0 * 1024.0,
1.0 * 1024,
1.0 * 1024.0 * 1024.0 * 1024,
true, true)

// Size of the MonoQueue task queue
SETTING_int(monoqueue_task_queue_size,
Expand Down Expand Up @@ -214,6 +214,10 @@ SETTING_bool(dump_ir,
false,
true, true)

SETTING_bool(zone_map,
"Enable zone map (default: false)",
false, false, false)

//===----------------------------------------------------------------------===//
// Optimizer
//===----------------------------------------------------------------------===//
Expand All @@ -232,8 +236,8 @@ SETTING_int(task_execution_timeout,
"execution step of optimizer, "
"assuming one plan has been found (default 5000)",
5000,
1000, 60000,
true, true)
1000, 60000,
true, true)

//===----------------------------------------------------------------------===//
// GENERAL
Expand Down
Loading