This repository was archived by the owner on Sep 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 618
/
Copy pathindex_selection_job.cpp
189 lines (166 loc) · 6.8 KB
/
index_selection_job.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
//===----------------------------------------------------------------------===//
//
// Peloton
//
// index_selection_job.cpp
//
// Identification: src/brain/index_selection_job.cpp
//
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include "brain/index_selection_util.h"
#include "brain/index_selection_job.h"
#include "brain/index_selection.h"
#include "catalog/query_history_catalog.h"
#include "catalog/system_catalogs.h"
#include "optimizer/stats/stats_storage.h"
namespace peloton {
namespace brain {
void IndexSelectionJob::OnJobInvocation(BrainEnvironment *env) {
LOG_INFO("Started Index Suggestion Task");
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
auto txn = txn_manager.BeginTransaction();
// Analyze stats for all the tables.
// TODO: AnalyzeStatsForAllTables crashes sometimes.
// optimizer::StatsStorage *stats_storage =
// optimizer::StatsStorage::GetInstance();
// ResultType stats_result = stats_storage->AnalyzeStatsForAllTables(txn);
// if (stats_result != ResultType::SUCCESS) {
// LOG_ERROR(
// "Cannot generate stats for table columns. Not performing index "
// "suggestion...");
// txn_manager.AbortTransaction(txn);
// return;
// }
// Query the catalog for new SQL queries.
// New SQL queries are the queries that were added to the system
// after the last_timestamp_
auto &query_catalog = catalog::QueryHistoryCatalog::GetInstance(txn);
auto query_history =
query_catalog.GetQueryStringsAfterTimestamp(last_timestamp_, txn);
if (query_history->size() > num_queries_threshold_) {
LOG_INFO("Tuning threshold has crossed. Time to tune the DB!");
// Run the index selection.
std::vector<std::string> queries;
for (auto query_pair : *query_history) {
queries.push_back(query_pair.second);
}
// TODO: Handle multiple databases
brain::Workload workload(queries, DEFAULT_DB_NAME, txn);
LOG_INFO("Knob: Num Indexes: %zu",
env->GetIndexSelectionKnobs().num_indexes_);
LOG_INFO("Knob: Naive: %zu",
env->GetIndexSelectionKnobs().naive_enumeration_threshold_);
LOG_INFO("Knob: Num Iterations: %zu",
env->GetIndexSelectionKnobs().num_iterations_);
brain::IndexSelection is = {workload, env->GetIndexSelectionKnobs(), txn};
brain::IndexConfiguration best_config;
is.GetBestIndexes(best_config);
if (best_config.IsEmpty()) {
LOG_INFO("Best config is empty. No new indexes this time...");
}
// Get the index objects from database.
auto database_object = catalog::Catalog::GetInstance()->GetDatabaseObject(
DEFAULT_DB_NAME, txn);
auto pg_index = catalog::Catalog::GetInstance()
->GetSystemCatalogs(database_object->GetDatabaseOid())
->GetIndexCatalog();
auto cur_indexes = pg_index->GetIndexObjects(txn);
auto drop_indexes = GetIndexesToDrop(cur_indexes, best_config);
// Drop useless indexes.
for (auto index : drop_indexes) {
LOG_DEBUG("Dropping Index: %s", index->GetIndexName().c_str());
DropIndexRPC(database_object->GetDatabaseOid(), index.get());
}
// Create new indexes.
for (auto index : best_config.GetIndexes()) {
CreateIndexRPC(index.get());
}
last_timestamp_ = GetLatestQueryTimestamp(query_history.get());
} else {
LOG_INFO("Index Suggestion - not performing this time");
}
txn_manager.CommitTransaction(txn);
}
std::vector<std::shared_ptr<catalog::IndexCatalogObject>>
IndexSelectionJob::GetIndexesToDrop(
std::unordered_map<oid_t, std::shared_ptr<catalog::IndexCatalogObject>>
&index_objects,
brain::IndexConfiguration best_config) {
std::vector<std::shared_ptr<catalog::IndexCatalogObject>> ret_indexes;
// Get the existing indexes and drop them.
for (auto index : index_objects) {
auto index_name = index.second->GetIndexName();
// TODO [vamshi]: REMOVE THIS IN THE FINAL CODE
// This is a hack for now. Add a boolean to the index catalog to
// find out if an index is a brain suggested index/user created index.
if (index_name.find(brain_suggested_index_prefix_str) !=
std::string::npos) {
bool found = false;
for (auto installed_index : best_config.GetIndexes()) {
if ((index.second.get()->GetTableOid() ==
installed_index.get()->table_oid) &&
(index.second.get()->GetKeyAttrs() ==
installed_index.get()->column_oids)) {
found = true;
}
}
// Drop only indexes which are not suggested this time.
if (!found) {
ret_indexes.push_back(index.second);
}
}
}
return ret_indexes;
}
void IndexSelectionJob::CreateIndexRPC(brain::HypotheticalIndexObject *index) {
// TODO: Remove hardcoded database name and server end point.
capnp::EzRpcClient client("localhost:15445");
PelotonService::Client peloton_service = client.getMain<PelotonService>();
// Create the index name: concat - db_id, table_id, col_ids
std::stringstream sstream;
sstream << brain_suggested_index_prefix_str << "_" << index->db_oid << "_"
<< index->table_oid << "_";
std::vector<oid_t> col_oid_vector;
for (auto col : index->column_oids) {
col_oid_vector.push_back(col);
sstream << col << "_";
}
auto index_name = sstream.str();
auto request = peloton_service.createIndexRequest();
request.getRequest().setDatabaseOid(index->db_oid);
request.getRequest().setTableOid(index->table_oid);
request.getRequest().setIndexName(index_name);
request.getRequest().setUniqueKeys(false);
auto col_list =
request.getRequest().initKeyAttrOids(index->column_oids.size());
for (auto i = 0UL; i < index->column_oids.size(); i++) {
col_list.set(i, index->column_oids[i]);
}
PELOTON_ASSERT(index->column_oids.size() > 0);
auto response = request.send().wait(client.getWaitScope());
}
void IndexSelectionJob::DropIndexRPC(oid_t database_oid,
catalog::IndexCatalogObject *index) {
// TODO: Remove hardcoded database name and server end point.
// TODO: Have to be removed when merged with tli's code.
capnp::EzRpcClient client("localhost:15445");
PelotonService::Client peloton_service = client.getMain<PelotonService>();
auto request = peloton_service.dropIndexRequest();
request.getRequest().setDatabaseOid(database_oid);
request.getRequest().setIndexOid(index->GetIndexOid());
auto response = request.send().wait(client.getWaitScope());
}
uint64_t IndexSelectionJob::GetLatestQueryTimestamp(
std::vector<std::pair<uint64_t, std::string>> *queries) {
uint64_t latest_time = 0;
for (auto query : *queries) {
if (query.first > latest_time) {
latest_time = query.first;
}
}
return latest_time;
}
}
}