-
Notifications
You must be signed in to change notification settings - Fork 618
[15721] Index Suggestion #1347
base: master
Are you sure you want to change the base?
[15721] Index Suggestion #1347
Changes from 150 commits
d18033d
5fdadea
ec6c94b
492b95f
8410136
96eadf4
9087931
0908588
5e2cbff
fcfe058
04e49f8
d62462b
2e19c1c
ac653aa
4d44009
371fd38
c23cc36
4d694ec
a51fe84
d043128
32f9040
324e430
5978d32
a24ded7
11bc159
e0cac79
83c1b44
1e5925c
4b463dc
96a41b1
12a343a
e98461a
1ec6f55
d23d0dc
a94cac9
11adba0
4c8dce7
6f67e0c
aa63a5f
f8a8180
b619333
d01d018
d9d0cfc
d984e89
11fdce2
afa1582
3178695
5f4a822
fd2de46
3db49a7
b7c4f9c
756ecb8
0d336d0
f58cf77
213a351
e846956
85705dd
920083a
93b2214
e3b43d0
c907ef3
342f6a3
c54f4e0
39259fb
f323ed9
6330ab6
b291f58
f4ce787
c6915f7
49b95df
a6da36d
01c994e
e1dad43
90e7d65
57c1c83
4b4e256
61786ae
fa1dbba
6bbaa94
5591755
5d0d2b8
28e818b
8fd0bf4
3f394f7
8f1b897
40576fe
10843ca
3085a58
1e9b959
55354b9
96f500b
eb3da24
2657e76
a564372
9f5bdc5
e290797
57955b4
ecec9ce
4e3370c
818c583
e4865c4
53c1101
ae3e26b
7152d46
0062cc5
fee2bea
4642b34
490677f
51d7f56
fc0d60e
a48e085
a3ac507
f6b18d0
eb5239f
693516b
6017790
b024304
8b2169c
f718511
8639124
3a5227a
aeabd94
7ee9b0f
99be940
1e3cd9c
bd4593b
5fe0108
a8af555
273b89b
7091c7f
67ff655
51139e6
f9b2c5e
cb8d209
3c3559e
2da21af
71d4213
7d6fc37
6d48e80
d22b7bb
1060627
0b12801
5029ed1
1e31d2a
8b937da
f8262cd
f4bca42
4c37855
38757ac
4792d91
5460082
3b757f1
8bc5170
51f5a1a
5c322c1
3ef9128
146100d
d250fbe
3230ec3
dc424ea
43b742b
c422a63
27a0df0
a06189a
332543f
9d0a005
11d2f3e
59ee8d3
6817300
3546f6a
e2e4578
4f48831
4dc06ac
65d5a06
480ae4d
81420e7
28483e5
e1bd8ba
f8e6eda
597e798
b99312a
50db015
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// Peloton | ||
// | ||
// index_selection_context.cpp | ||
// | ||
// Identification: src/brain/index_selection_context.cpp | ||
// | ||
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
#include "brain/index_selection_context.h" | ||
#include "common/logger.h" | ||
|
||
namespace peloton { | ||
namespace brain { | ||
|
||
IndexSelectionContext::IndexSelectionContext(IndexSelectionKnobs knobs) | ||
: knobs_(knobs) {} | ||
|
||
} // namespace brain | ||
} // namespace peloton |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// Peloton | ||
// | ||
// index_selection_job.cpp | ||
// | ||
// Identification: src/brain/index_selection_job.cpp | ||
// | ||
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
#include "brain/index_selection_util.h" | ||
#include "brain/index_selection_job.h" | ||
#include "brain/index_selection.h" | ||
#include "catalog/query_history_catalog.h" | ||
#include "catalog/system_catalogs.h" | ||
#include "optimizer/stats/stats_storage.h" | ||
|
||
namespace peloton { | ||
namespace brain { | ||
|
||
#define BRAIN_SUGGESTED_INDEX_MAGIC_STR "brain_suggested_index_" | ||
|
||
void IndexSelectionJob::OnJobInvocation(BrainEnvironment *env) { | ||
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); | ||
auto txn = txn_manager.BeginTransaction(); | ||
LOG_INFO("Started Index Suggestion Task"); | ||
|
||
// Generate column stats for all the tables before we begin. | ||
// TODO[vamshi] | ||
// Instead of collecting stats for every table, collect them only for the | ||
// tables | ||
// we are analyzing i.e. tables that are referenced in the current workload. | ||
optimizer::StatsStorage *stats_storage = | ||
optimizer::StatsStorage::GetInstance(); | ||
ResultType result = stats_storage->AnalyzeStatsForAllTables(txn); | ||
if (result != ResultType::SUCCESS) { | ||
LOG_ERROR( | ||
"Cannot generate stats for table columns. Not performing index " | ||
"suggestion..."); | ||
txn_manager.AbortTransaction(txn); | ||
return; | ||
} | ||
|
||
// Query the catalog for new SQL queries. | ||
// New SQL queries are the queries that were added to the system | ||
// after the last_timestamp_ | ||
auto query_catalog = &catalog::QueryHistoryCatalog::GetInstance(txn); | ||
auto query_history = | ||
query_catalog->GetQueryStringsAfterTimestamp(last_timestamp_, txn); | ||
if (query_history->size() > num_queries_threshold_) { | ||
LOG_INFO("Tuning threshold has crossed. Time to tune the DB!"); | ||
|
||
// Run the index selection. | ||
std::vector<std::string> queries; | ||
for (auto query_pair : *query_history) { | ||
queries.push_back(query_pair.second); | ||
} | ||
|
||
// TODO: Handle multiple databases | ||
brain::Workload workload(queries, DEFAULT_DB_NAME, txn); | ||
brain::IndexSelection is = {workload, env->GetIndexSelectionKnobs(), txn}; | ||
brain::IndexConfiguration best_config; | ||
is.GetBestIndexes(best_config); | ||
|
||
if (best_config.IsEmpty()) { | ||
LOG_INFO("Best config is empty"); | ||
} | ||
|
||
// Get the existing indexes and drop them. | ||
// TODO: Handle multiple databases | ||
auto database_object = catalog::Catalog::GetInstance()->GetDatabaseObject( | ||
DEFAULT_DB_NAME, txn); | ||
auto pg_index = catalog::Catalog::GetInstance() | ||
->GetSystemCatalogs(database_object->GetDatabaseOid()) | ||
->GetIndexCatalog(); | ||
auto indexes = pg_index->GetIndexObjects(txn); | ||
for (auto index : indexes) { | ||
auto index_name = index.second->GetIndexName(); | ||
// TODO [vamshi]: | ||
// This is a hack for now. Add a boolean to the index catalog to | ||
// find out if an index is a brain suggested index/user created index. | ||
if (index_name.find(BRAIN_SUGGESTED_INDEX_MAGIC_STR) != | ||
std::string::npos) { | ||
bool found = false; | ||
for (auto installed_index: best_config.GetIndexes()) { | ||
if ((index.second.get()->GetTableOid() == installed_index.get()->table_oid) && | ||
(index.second.get()->GetKeyAttrs() == installed_index.get()->column_oids)) { | ||
found = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: using a |
||
} | ||
} | ||
// Drop only indexes which are not suggested this time. | ||
if (!found) { | ||
LOG_DEBUG("Dropping Index: %s", index_name.c_str()); | ||
DropIndexRPC(database_object->GetDatabaseOid(), index.second.get()); | ||
} | ||
} | ||
} | ||
|
||
for (auto index : best_config.GetIndexes()) { | ||
// Create RPC for index creation on the server side. | ||
CreateIndexRPC(index.get()); | ||
} | ||
|
||
// Update the last_timestamp to the be the latest query's timestamp in | ||
// the current workload, so that we fetch the new queries next time. | ||
// TODO[vamshi]: Make this efficient. Currently assuming that the latest | ||
// query can be anywhere in the vector. if the latest query is always at the | ||
// end, then we can avoid scan over all the queries. | ||
last_timestamp_ = GetLatestQueryTimestamp(query_history.get()); | ||
} else { | ||
LOG_INFO("Tuning - not this time"); | ||
} | ||
txn_manager.CommitTransaction(txn); | ||
} | ||
|
||
void IndexSelectionJob::CreateIndexRPC(brain::HypotheticalIndexObject *index) { | ||
// TODO: Remove hardcoded database name and server end point. | ||
capnp::EzRpcClient client("localhost:15445"); | ||
PelotonService::Client peloton_service = client.getMain<PelotonService>(); | ||
|
||
// Create the index name: concat - db_id, table_id, col_ids | ||
std::stringstream sstream; | ||
sstream << BRAIN_SUGGESTED_INDEX_MAGIC_STR << ":" << index->db_oid << ":" | ||
<< index->table_oid << ":"; | ||
std::vector<oid_t> col_oid_vector; | ||
for (auto col : index->column_oids) { | ||
col_oid_vector.push_back(col); | ||
sstream << col << ","; | ||
} | ||
auto index_name = sstream.str(); | ||
|
||
auto request = peloton_service.createIndexRequest(); | ||
request.getRequest().setDatabaseOid(index->db_oid); | ||
request.getRequest().setTableOid(index->table_oid); | ||
request.getRequest().setIndexName(index_name); | ||
request.getRequest().setUniqueKeys(false); | ||
|
||
auto col_list = | ||
request.getRequest().initKeyAttrOids(index->column_oids.size()); | ||
for (auto i = 0UL; i < index->column_oids.size(); i++) { | ||
col_list.set(i, index->column_oids[i]); | ||
} | ||
|
||
PELOTON_ASSERT(index->column_oids.size() > 0); | ||
auto response = request.send().wait(client.getWaitScope()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check the response and through some warning if it does not succeed? |
||
} | ||
|
||
void IndexSelectionJob::DropIndexRPC(oid_t database_oid, | ||
catalog::IndexCatalogObject *index) { | ||
// TODO: Remove hardcoded database name and server end point. | ||
capnp::EzRpcClient client("localhost:15445"); | ||
PelotonService::Client peloton_service = client.getMain<PelotonService>(); | ||
|
||
auto request = peloton_service.dropIndexRequest(); | ||
request.getRequest().setDatabaseOid(database_oid); | ||
request.getRequest().setIndexOid(index->GetIndexOid()); | ||
|
||
auto response = request.send().wait(client.getWaitScope()); | ||
} | ||
|
||
uint64_t IndexSelectionJob::GetLatestQueryTimestamp( | ||
std::vector<std::pair<uint64_t, std::string>> *queries) { | ||
uint64_t latest_time = 0; | ||
for (auto query : *queries) { | ||
if (query.first > latest_time) { | ||
latest_time = query.first; | ||
} | ||
} | ||
return latest_time; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// Peloton | ||
// | ||
// index_selection_util.cpp | ||
// | ||
// Identification: src/brain/index_selection_util.cpp | ||
// | ||
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
#include "brain/index_selection_util.h" | ||
#include "common/logger.h" | ||
|
||
namespace peloton { | ||
namespace brain { | ||
|
||
//===--------------------------------------------------------------------===// | ||
// IndexObject | ||
//===--------------------------------------------------------------------===// | ||
|
||
const std::string HypotheticalIndexObject::ToString() const { | ||
std::stringstream str_stream; | ||
str_stream << "Database: " << db_oid << "\n"; | ||
str_stream << "Table: " << table_oid << "\n"; | ||
str_stream << "Columns: "; | ||
for (auto col : column_oids) { | ||
str_stream << col << ", "; | ||
} | ||
str_stream << "\n"; | ||
return str_stream.str(); | ||
} | ||
|
||
bool HypotheticalIndexObject::operator==( | ||
const HypotheticalIndexObject &obj) const { | ||
return (db_oid == obj.db_oid && table_oid == obj.table_oid && | ||
column_oids == obj.column_oids); | ||
} | ||
|
||
bool HypotheticalIndexObject::IsCompatible( | ||
std::shared_ptr<HypotheticalIndexObject> index) const { | ||
return (db_oid == index->db_oid) && (table_oid == index->table_oid); | ||
} | ||
|
||
HypotheticalIndexObject HypotheticalIndexObject::Merge( | ||
std::shared_ptr<HypotheticalIndexObject> index) { | ||
HypotheticalIndexObject result; | ||
result.db_oid = db_oid; | ||
result.table_oid = table_oid; | ||
result.column_oids = column_oids; | ||
for (auto column : index->column_oids) { | ||
if (std::find(column_oids.begin(), column_oids.end(), column) == | ||
column_oids.end()) | ||
result.column_oids.push_back(column); | ||
} | ||
return result; | ||
} | ||
|
||
//===--------------------------------------------------------------------===// | ||
// IndexConfiguration | ||
//===--------------------------------------------------------------------===// | ||
|
||
void IndexConfiguration::Merge(IndexConfiguration &config) { | ||
auto indexes = config.GetIndexes(); | ||
for (auto it = indexes.begin(); it != indexes.end(); it++) { | ||
indexes_.insert(*it); | ||
} | ||
} | ||
|
||
void IndexConfiguration::Set(IndexConfiguration &config) { | ||
indexes_.clear(); | ||
auto indexes = config.GetIndexes(); | ||
for (auto it = indexes.begin(); it != indexes.end(); it++) { | ||
indexes_.insert(*it); | ||
} | ||
} | ||
|
||
void IndexConfiguration::RemoveIndexObject( | ||
std::shared_ptr<HypotheticalIndexObject> index_info) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not pass a |
||
indexes_.erase(index_info); | ||
} | ||
|
||
void IndexConfiguration::AddIndexObject( | ||
std::shared_ptr<HypotheticalIndexObject> index_info) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same, |
||
indexes_.insert(index_info); | ||
} | ||
|
||
size_t IndexConfiguration::GetIndexCount() const { return indexes_.size(); } | ||
|
||
bool IndexConfiguration::IsEmpty() const { return indexes_.empty(); } | ||
|
||
const std::set<std::shared_ptr<HypotheticalIndexObject>> | ||
&IndexConfiguration::GetIndexes() const { | ||
return indexes_; | ||
} | ||
|
||
const std::string IndexConfiguration::ToString() const { | ||
std::stringstream str_stream; | ||
str_stream << "Num of indexes: " << GetIndexCount() << "\n"; | ||
for (auto index : indexes_) { | ||
str_stream << index->ToString() << " "; | ||
} | ||
return str_stream.str(); | ||
} | ||
|
||
bool IndexConfiguration::operator==(const IndexConfiguration &config) const { | ||
auto config_indexes = config.GetIndexes(); | ||
return indexes_ == config_indexes; | ||
} | ||
|
||
IndexConfiguration IndexConfiguration::operator-( | ||
const IndexConfiguration &config) { | ||
auto config_indexes = config.GetIndexes(); | ||
|
||
std::set<std::shared_ptr<HypotheticalIndexObject>> result; | ||
std::set_difference(indexes_.begin(), indexes_.end(), config_indexes.begin(), | ||
config_indexes.end(), | ||
std::inserter(result, result.end())); | ||
return IndexConfiguration(result); | ||
} | ||
|
||
void IndexConfiguration::Clear() { indexes_.clear(); } | ||
|
||
//===--------------------------------------------------------------------===// | ||
// IndexObjectPool | ||
//===--------------------------------------------------------------------===// | ||
|
||
std::shared_ptr<HypotheticalIndexObject> IndexObjectPool::GetIndexObject( | ||
HypotheticalIndexObject &obj) { | ||
auto ret = map_.find(obj); | ||
if (ret != map_.end()) { | ||
return ret->second; | ||
} | ||
return nullptr; | ||
} | ||
|
||
std::shared_ptr<HypotheticalIndexObject> IndexObjectPool::PutIndexObject( | ||
HypotheticalIndexObject &obj) { | ||
auto index_s_ptr = GetIndexObject(obj); | ||
if (index_s_ptr != nullptr) return index_s_ptr; | ||
HypotheticalIndexObject *index_copy = new HypotheticalIndexObject(); | ||
*index_copy = obj; | ||
index_s_ptr = std::shared_ptr<HypotheticalIndexObject>(index_copy); | ||
map_[*index_copy] = index_s_ptr; | ||
return index_s_ptr; | ||
} | ||
|
||
//===--------------------------------------------------------------------===// | ||
// Workload | ||
//===--------------------------------------------------------------------===// | ||
|
||
Workload::Workload(std::vector<std::string> &queries, std::string database_name, | ||
concurrency::TransactionContext *txn) | ||
: database_name(database_name) { | ||
LOG_TRACE("Initializing workload with %ld queries", queries.size()); | ||
std::unique_ptr<binder::BindNodeVisitor> binder( | ||
new binder::BindNodeVisitor(txn, database_name)); | ||
|
||
// Parse and bind every query. Store the results in the workload vector. | ||
for (auto query : queries) { | ||
LOG_DEBUG("Query: %s", query.c_str()); | ||
|
||
// Create a unique_ptr to free this pointer at the end of this loop | ||
// iteration. | ||
auto stmt_list = std::unique_ptr<parser::SQLStatementList>( | ||
parser::PostgresParser::ParseSQLString(query)); | ||
PELOTON_ASSERT(stmt_list->is_valid); | ||
// TODO[vamshi]: Only one query for now. | ||
PELOTON_ASSERT(stmt_list->GetNumStatements() == 1); | ||
|
||
// Create a new shared ptr from the unique ptr because | ||
// these queries will be referenced by multiple objects later. | ||
// Release the unique ptr from the stmt list to avoid freeing at the end | ||
// of | ||
// this loop iteration. | ||
auto stmt = stmt_list->PassOutStatement(0); | ||
auto stmt_shared = std::shared_ptr<parser::SQLStatement>(stmt.release()); | ||
PELOTON_ASSERT(stmt_shared->GetType() != StatementType::INVALID); | ||
|
||
// Bind the query | ||
binder->BindNameToNode(stmt_shared.get()); | ||
|
||
// Only take the DML queries from the workload | ||
switch (stmt_shared->GetType()) { | ||
case StatementType::INSERT: | ||
case StatementType::DELETE: | ||
case StatementType::UPDATE: | ||
case StatementType::SELECT: | ||
AddQuery(stmt_shared); | ||
default: | ||
// Ignore other queries. | ||
LOG_TRACE("Ignoring query: %s" + stmt->GetInfo().c_str()); | ||
} | ||
} | ||
} | ||
|
||
} // namespace brain | ||
} // namespace peloton |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is should be addressed at least by having a wrapper function with database name as argument pass in. Multiple database handling is important especially after the catalog refactor.
-- Tianyu, Justin & Tianyi