This repository was archived by the owner on Sep 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 618
/
Copy pathcolumn_catalog.cpp
287 lines (247 loc) · 11.7 KB
/
column_catalog.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
//===----------------------------------------------------------------------===//
//
// Peloton
//
// column_catalog.cpp
//
// Identification: src/catalog/column_catalog.cpp
//
// Copyright (c) 2015-17, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include "catalog/column_catalog.h"
#include "catalog/catalog.h"
#include "catalog/system_catalogs.h"
#include "catalog/table_catalog.h"
#include "concurrency/transaction_context.h"
#include "storage/data_table.h"
#include "storage/database.h"
#include "type/value_factory.h"
namespace peloton {
namespace catalog {
ColumnCatalogObject::ColumnCatalogObject(executor::LogicalTile *tile,
int tupleId)
: table_oid(tile->GetValue(tupleId, ColumnCatalog::ColumnId::TABLE_OID)
.GetAs<oid_t>()),
column_name(tile->GetValue(tupleId, ColumnCatalog::ColumnId::COLUMN_NAME)
.ToString()),
column_id(tile->GetValue(tupleId, ColumnCatalog::ColumnId::COLUMN_ID)
.GetAs<uint32_t>()),
column_offset(
tile->GetValue(tupleId, ColumnCatalog::ColumnId::COLUMN_OFFSET)
.GetAs<uint32_t>()),
column_type(StringToTypeId(
tile->GetValue(tupleId, ColumnCatalog::ColumnId::COLUMN_TYPE)
.ToString())),
is_inlined(tile->GetValue(tupleId, ColumnCatalog::ColumnId::IS_INLINED)
.GetAs<bool>()),
is_primary(tile->GetValue(tupleId, ColumnCatalog::ColumnId::IS_PRIMARY)
.GetAs<bool>()),
is_not_null(tile->GetValue(tupleId, ColumnCatalog::ColumnId::IS_NOT_NULL)
.GetAs<bool>()) {}
ColumnCatalog::ColumnCatalog(storage::Database *pg_catalog,
type::AbstractPool *pool,
concurrency::TransactionContext *txn)
: AbstractCatalog(COLUMN_CATALOG_OID, COLUMN_CATALOG_NAME,
InitializeSchema().release(), pg_catalog) {
// Add indexes for pg_attribute
AddIndex({ColumnId::TABLE_OID, ColumnId::COLUMN_NAME},
COLUMN_CATALOG_PKEY_OID, COLUMN_CATALOG_NAME "_pkey",
IndexConstraintType::PRIMARY_KEY);
AddIndex({ColumnId::TABLE_OID, ColumnId::COLUMN_ID}, COLUMN_CATALOG_SKEY0_OID,
COLUMN_CATALOG_NAME "_skey0", IndexConstraintType::UNIQUE);
AddIndex({ColumnId::TABLE_OID}, COLUMN_CATALOG_SKEY1_OID,
COLUMN_CATALOG_NAME "_skey1", IndexConstraintType::DEFAULT);
// Insert columns of pg_attribute table into pg_attribute itself
uint32_t column_id = 0;
for (auto column : catalog_table_->GetSchema()->GetColumns()) {
InsertColumn(COLUMN_CATALOG_OID, column.GetName(), column_id,
column.GetOffset(), column.GetType(), column.IsInlined(),
column.GetConstraints(), pool, txn);
column_id++;
}
}
ColumnCatalog::~ColumnCatalog() {}
/*@brief private function for initialize schema of pg_attribute
* @return unqiue pointer to schema
*/
std::unique_ptr<catalog::Schema> ColumnCatalog::InitializeSchema() {
const std::string primary_key_constraint_name = "primary_key";
const std::string not_null_constraint_name = "not_null";
auto table_id_column = catalog::Column(
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER),
"table_oid", true);
table_id_column.AddConstraint(catalog::Constraint(
ConstraintType::PRIMARY, primary_key_constraint_name));
table_id_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto column_name_column = catalog::Column(
type::TypeId::VARCHAR, max_name_size, "column_name", false);
column_name_column.AddConstraint(catalog::Constraint(
ConstraintType::PRIMARY, primary_key_constraint_name));
column_name_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto column_id_column = catalog::Column(
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER),
"column_id", true);
column_id_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto column_offset_column = catalog::Column(
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER),
"column_offset", true);
column_offset_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto column_type_column = catalog::Column(
type::TypeId::VARCHAR, max_name_size, "column_type", false);
column_type_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto is_inlined_column = catalog::Column(
type::TypeId::BOOLEAN, type::Type::GetTypeSize(type::TypeId::BOOLEAN),
"is_inlined", true);
is_inlined_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto is_primary_column = catalog::Column(
type::TypeId::BOOLEAN, type::Type::GetTypeSize(type::TypeId::BOOLEAN),
"is_primary", true);
is_primary_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
auto is_not_null_column = catalog::Column(
type::TypeId::BOOLEAN, type::Type::GetTypeSize(type::TypeId::BOOLEAN),
"is_not_null", true);
is_not_null_column.AddConstraint(
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name));
std::unique_ptr<catalog::Schema> column_catalog_schema(new catalog::Schema(
{table_id_column, column_name_column, column_id_column,
column_offset_column, column_type_column, is_inlined_column,
is_primary_column, is_not_null_column}));
return column_catalog_schema;
}
bool ColumnCatalog::InsertColumn(oid_t table_oid,
const std::string &column_name,
oid_t column_id, oid_t column_offset,
type::TypeId column_type, bool is_inlined,
const std::vector<Constraint> &constraints,
type::AbstractPool *pool,
concurrency::TransactionContext *txn) {
// Create the tuple first
std::unique_ptr<storage::Tuple> tuple(
new storage::Tuple(catalog_table_->GetSchema(), true));
auto val0 = type::ValueFactory::GetIntegerValue(table_oid);
auto val1 = type::ValueFactory::GetVarcharValue(column_name, nullptr);
auto val2 = type::ValueFactory::GetIntegerValue(column_id);
auto val3 = type::ValueFactory::GetIntegerValue(column_offset);
auto val4 =
type::ValueFactory::GetVarcharValue(TypeIdToString(column_type), nullptr);
auto val5 = type::ValueFactory::GetBooleanValue(is_inlined);
bool is_primary = false, is_not_null = false;
for (auto constraint : constraints) {
if (constraint.GetType() == ConstraintType::PRIMARY) {
is_primary = true;
} else if (constraint.GetType() == ConstraintType::NOT_NULL ||
constraint.GetType() == ConstraintType::NOTNULL) {
is_not_null = true;
}
}
auto val6 = type::ValueFactory::GetBooleanValue(is_primary);
auto val7 = type::ValueFactory::GetBooleanValue(is_not_null);
tuple->SetValue(ColumnId::TABLE_OID, val0, pool);
tuple->SetValue(ColumnId::COLUMN_NAME, val1, pool);
tuple->SetValue(ColumnId::COLUMN_ID, val2, pool);
tuple->SetValue(ColumnId::COLUMN_OFFSET, val3, pool);
tuple->SetValue(ColumnId::COLUMN_TYPE, val4, pool);
tuple->SetValue(ColumnId::IS_INLINED, val5, pool);
tuple->SetValue(ColumnId::IS_PRIMARY, val6, pool);
tuple->SetValue(ColumnId::IS_NOT_NULL, val7, pool);
// Insert the tuple
return InsertTuple(std::move(tuple), txn);
}
bool ColumnCatalog::DeleteColumn(oid_t table_oid,
const std::string &column_name,
concurrency::TransactionContext *txn) {
oid_t index_offset =
IndexId::PRIMARY_KEY; // Index of table_oid & column_name
std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy());
values.push_back(
type::ValueFactory::GetVarcharValue(column_name, nullptr).Copy());
// delete column from cache
auto pg_table = Catalog::GetInstance()
->GetSystemCatalogs(database_oid)
->GetTableCatalog();
auto table_object = pg_table->GetTableObject(table_oid, txn);
table_object->EvictColumnObject(column_name);
return DeleteWithIndexScan(index_offset, values, txn);
}
/* @brief delete all column records from the same table
* this function is useful when calling DropTable
* @param table_oid
* @param txn TransactionContext
* @return a vector of table oid
*/
bool ColumnCatalog::DeleteColumns(oid_t table_oid,
concurrency::TransactionContext *txn) {
oid_t index_offset = IndexId::SKEY_TABLE_OID; // Index of table_oid
std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy());
// delete columns from cache
auto pg_table = Catalog::GetInstance()
->GetSystemCatalogs(database_oid)
->GetTableCatalog();
auto table_object = pg_table->GetTableObject(table_oid, txn);
table_object->EvictAllColumnObjects();
return DeleteWithIndexScan(index_offset, values, txn);
}
const std::unordered_map<oid_t, std::shared_ptr<ColumnCatalogObject>>
ColumnCatalog::GetColumnObjects(oid_t table_oid,
concurrency::TransactionContext *txn) {
// try get from cache
auto pg_table = Catalog::GetInstance()
->GetSystemCatalogs(database_oid)
->GetTableCatalog();
auto table_object = pg_table->GetTableObject(table_oid, txn);
PELOTON_ASSERT(table_object && table_object->GetTableOid() == table_oid);
auto column_objects = table_object->GetColumnObjects(true);
if (column_objects.size() != 0) return column_objects;
// cache miss, get from pg_attribute
std::vector<oid_t> column_ids(all_column_ids);
oid_t index_offset = IndexId::SKEY_TABLE_OID; // Index of table_oid
std::vector<type::Value> values;
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy());
auto result_tiles =
GetResultWithIndexScan(column_ids, index_offset, values, txn);
for (auto &tile : (*result_tiles)) {
for (auto tuple_id : *tile) {
auto column_object =
std::make_shared<ColumnCatalogObject>(tile.get(), tuple_id);
table_object->InsertColumnObject(column_object);
}
}
return table_object->GetColumnObjects();
}
/*
* @brief Rename the column name to a new name.
* @ return whether the update succeed
*/
bool ColumnCatalog::RenameColumn(oid_t database_oid,
oid_t table_oid,
const std::string &column_name,
const std::string &new_name,
concurrency::TransactionContext *txn) {
std::vector<oid_t> update_columns({ColumnId::COLUMN_NAME});
std::vector<type::Value> update_values;
update_values.push_back(type::ValueFactory::GetVarcharValue(new_name).Copy());
// values to execute index scan
std::vector<type::Value> scan_values;
scan_values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy());
scan_values.push_back(
type::ValueFactory::GetVarcharValue(column_name, nullptr).Copy());
// Index of table_oid & column_name
oid_t index_offset = IndexId::PRIMARY_KEY;
auto table_object =
Catalog::GetInstance()->GetTableObject(database_oid, table_oid, txn);
table_object->EvictColumnObject(column_name);
return UpdateWithIndexScan(update_columns, update_values, scan_values,
index_offset, txn);
}
} // namespace catalog
} // namespace peloton