-
Notifications
You must be signed in to change notification settings - Fork 618
[15721] Blocking Schema Changes #1341
base: master
Are you sure you want to change the base?
Changes from 65 commits
0d83528
f015e94
583ec6a
469474b
7b2ae56
f255f78
fd0cb0b
a3a7cfa
9e07a95
e695ac2
d843ae3
59e5982
dd42a3b
a845135
57e9a6e
3575048
6b5da72
b08fa9e
0e46c90
d07920e
88a397c
f0ecc68
a2f73d9
ed7a2a9
397cf24
e61451e
098a35a
f344a55
e930b11
c31ef3a
74ca51b
ed1a0de
ef6b9f4
844e564
8cdea77
c70353e
d3b1321
6a294fc
f7af80f
d7465e3
bf20b05
d1c8437
1ce6ed0
66220b5
32ec8fd
36f111a
253b471
41039b8
4df594b
cf0c5a6
21ee68d
68f492f
9904788
a1e7265
fdce5cf
ae65f67
4e4dfdc
4d00de1
5e454d4
bc26411
32a260f
72c35ac
733fa9b
a57a5df
5b9818c
939df35
20f5d27
dbf6d7d
fc3e951
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,16 @@ | |
#include "catalog/table_metrics_catalog.h" | ||
#include "catalog/trigger_catalog.h" | ||
#include "concurrency/transaction_manager_factory.h" | ||
#include "executor/executor_context.h" | ||
#include "executor/insert_executor.h" | ||
#include "executor/seq_scan_executor.h" | ||
#include "function/date_functions.h" | ||
#include "function/decimal_functions.h" | ||
#include "function/old_engine_string_functions.h" | ||
#include "function/timestamp_functions.h" | ||
#include "index/index_factory.h" | ||
#include "planner/insert_plan.h" | ||
#include "planner/seq_scan_plan.h" | ||
#include "settings/settings_manager.h" | ||
#include "storage/storage_manager.h" | ||
#include "storage/table_factory.h" | ||
|
@@ -930,6 +935,287 @@ std::shared_ptr<SystemCatalogs> Catalog::GetSystemCatalogs( | |
return catalog_map_[database_oid]; | ||
} | ||
|
||
//===--------------------------------------------------------------------===// | ||
// ALTER TABLE | ||
//===--------------------------------------------------------------------===// | ||
|
||
/** | ||
* @brief Helper function for alter table, called internally | ||
* @param database_oid database to which the table belongs to | ||
* @param table_oid table to which the column belongs to | ||
* @param new_schema the new table schema | ||
* @param txn the transaction Context | ||
* @return TransactionContext ResultType(SUCCESS or FAILURE) | ||
*/ | ||
ResultType Catalog::AlterTable(oid_t database_oid, oid_t table_oid, const std::string &schema_name, | ||
std::unique_ptr<catalog::Schema> &new_schema, | ||
concurrency::TransactionContext *txn) { | ||
LOG_TRACE("AlterTable in Catalog"); | ||
|
||
if (txn == nullptr) | ||
throw CatalogException("Alter table requires transaction"); | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] Why double try catch block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we'll fix this. |
||
auto storage_manager = storage::StorageManager::GetInstance(); | ||
auto database = storage_manager->GetDatabaseWithOid(database_oid); | ||
try { | ||
auto old_table = database->GetTableWithOid(table_oid); | ||
auto old_schema = old_table->GetSchema(); | ||
auto pg_index = catalog_map_[database_oid]->GetIndexCatalog(); | ||
|
||
// Step 1: build empty table with new schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [good] Informative comments |
||
bool own_schema = true; | ||
bool adapt_table = false; | ||
auto new_table = storage::TableFactory::GetDataTable( | ||
database_oid, table_oid, | ||
catalog::Schema::CopySchema(new_schema.get()), old_table->GetName(), | ||
DEFAULT_TUPLES_PER_TILEGROUP, own_schema, adapt_table); | ||
// Step 2: Copy indexes | ||
auto old_index_oids = pg_index->GetIndexObjects(table_oid, txn); | ||
for (auto index_oid_pair : old_index_oids) { | ||
oid_t index_oid = index_oid_pair.first; | ||
// delete record in pg_index | ||
pg_index->DeleteIndex(index_oid, txn); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not delete index in pg_index only if not all indexed columns still exists? |
||
// Check if all indexed columns still exists | ||
auto old_index = old_table->GetIndexWithOid(index_oid); | ||
bool index_exist = true; | ||
std::vector<oid_t> new_key_attrs; | ||
|
||
for (oid_t column_id : old_index->GetMetadata()->GetKeyAttrs()) { | ||
bool is_found = false; | ||
std::string column_name = old_schema->GetColumn(column_id).GetName(); | ||
oid_t i = 0; | ||
for (auto new_column : new_schema->GetColumns()) { | ||
if (column_name == new_column.GetName()) { | ||
is_found = true; | ||
new_key_attrs.push_back(i); | ||
break; | ||
} | ||
i++; | ||
} | ||
if (!is_found) { | ||
index_exist = false; | ||
break; | ||
} | ||
} | ||
if (!index_exist) continue; | ||
|
||
// construct index on new table | ||
auto index_metadata = new index::IndexMetadata( | ||
old_index->GetName(), index_oid, table_oid, database_oid, | ||
old_index->GetMetadata()->GetIndexType(), | ||
old_index->GetMetadata()->GetIndexConstraintType(), | ||
new_schema.get(), | ||
catalog::Schema::CopySchema(new_schema.get(), new_key_attrs), | ||
new_key_attrs, old_index->GetMetadata()->HasUniqueKeys()); | ||
|
||
std::shared_ptr<index::Index> new_index( | ||
index::IndexFactory::GetIndex(index_metadata)); | ||
new_table->AddIndex(new_index); | ||
|
||
// reinsert record into pg_index | ||
pg_index->InsertIndex( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [race condition] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually InsertIndex uses a InsertPlan to insert a new tuple into pg_attribute so we can get concurrency control for free. |
||
index_oid, old_index->GetName(), table_oid, schema_name, | ||
old_index->GetMetadata()->GetIndexType(), | ||
old_index->GetMetadata()->GetIndexConstraintType(), | ||
old_index->GetMetadata()->HasUniqueKeys(), new_key_attrs, | ||
pool_.get(), txn); | ||
} | ||
std::unique_ptr<executor::ExecutorContext> context( | ||
new executor::ExecutorContext(txn, {})); | ||
// Step 3: build column mapping between old table and new table | ||
// we're using column name as unique identifier | ||
std::vector<oid_t> old_column_ids; | ||
std::unordered_map<oid_t, oid_t> column_map; | ||
for (oid_t old_column_id = 0; | ||
old_column_id < old_schema->GetColumnCount(); old_column_id++) { | ||
old_column_ids.push_back(old_column_id); | ||
for (oid_t new_column_id = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [inefficient implementation] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, thanks for your suggestion. |
||
new_column_id < new_schema->GetColumnCount(); new_column_id++) { | ||
if (old_schema->GetColumn(old_column_id).GetName() == | ||
new_schema->GetColumn(new_column_id).GetName()) { | ||
column_map[new_column_id] = old_column_id; | ||
} | ||
} | ||
} | ||
// Step 4: Get tuples from old table with sequential scan | ||
// TODO: Try to reuse Sequential scan function and insert function in | ||
// abstract catalog | ||
planner::SeqScanPlan seq_scan_node(old_table, nullptr, old_column_ids); | ||
executor::SeqScanExecutor seq_scan_executor(&seq_scan_node, | ||
context.get()); | ||
seq_scan_executor.Init(); | ||
while (seq_scan_executor.Execute()) { | ||
std::unique_ptr<executor::LogicalTile> result_tile( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] Curious, what if the result is empty? Is it handled here? |
||
seq_scan_executor.GetOutput()); | ||
for (size_t i = 0; i < result_tile->GetTupleCount(); i++) { | ||
// Transform tuple into new schema | ||
std::unique_ptr<storage::Tuple> tuple( | ||
new storage::Tuple(new_schema.get(), true)); | ||
|
||
for (oid_t new_column_id = 0; | ||
new_column_id < new_schema->GetColumnCount(); new_column_id++) { | ||
auto it = column_map.find(new_column_id); | ||
type::Value val; | ||
if (it == column_map.end()) { | ||
// new column, set value to null | ||
val = type::ValueFactory::GetNullValueByType( | ||
new_schema->GetColumn(new_column_id).GetType()); | ||
} else { | ||
// otherwise, copy value in old table | ||
val = result_tile->GetValue(i, it->second); | ||
if (new_schema->GetColumn(new_column_id).GetType() != | ||
old_schema->GetColumn(it->second).GetType()) { | ||
// change the value's type | ||
LOG_TRACE( | ||
"CASTED: %s TO %s", val.GetInfo().c_str(), | ||
new_schema->GetColumn(new_column_id).GetInfo().c_str()); | ||
auto casted_val = | ||
val.CastAs(new_schema->GetColumn(new_column_id).GetType()); | ||
tuple->SetValue(new_column_id, casted_val, pool_.get()); | ||
} else { | ||
tuple->SetValue(new_column_id, val, pool_.get()); | ||
} | ||
} | ||
} | ||
// insert new tuple into new table | ||
planner::InsertPlan node(new_table, std::move(tuple)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [major] Is it efficient to call planner inside a loop? |
||
executor::InsertExecutor executor(&node, context.get()); | ||
executor.Init(); | ||
executor.Execute(); | ||
} | ||
} | ||
// Step 5: delete all the column(attribute) records in pg_attribute | ||
// and reinsert them using new schema(column offset needs to change | ||
// accordingly) | ||
auto pg_attributes = | ||
catalog_map_[database_oid]->GetColumnCatalog(); | ||
pg_attributes->DeleteColumns(table_oid, txn); | ||
oid_t column_offset = 0; | ||
for (auto new_column : new_schema->GetColumns()) { | ||
pg_attributes->InsertColumn( | ||
table_oid, new_column.GetName(), column_offset, | ||
new_column.GetOffset(), new_column.GetType(), | ||
new_column.IsInlined(), new_column.GetConstraints(), pool_.get(), | ||
txn); | ||
column_offset++; | ||
} | ||
// TODO: Add gc logic | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you guys finished this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're still working on this, sorry about that. |
||
// txn->RecordDrop(database_oid, old_table->GetOid(), INVALID_OID); | ||
|
||
// Final step of physical change should be moved to commit time | ||
database->ReplaceTableWithOid(table_oid, new_table); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [memory leak] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out! We left a TODO here, and we are still looking for the correct way to handle this. |
||
|
||
LOG_TRACE("Alter table with oid %d succeed.", table_oid); | ||
} catch (CatalogException &e) { | ||
LOG_TRACE("Alter table failed."); | ||
return ResultType::FAILURE; | ||
} | ||
} catch (CatalogException &e) { | ||
return ResultType::FAILURE; | ||
} | ||
return ResultType::SUCCESS; | ||
} | ||
|
||
/** | ||
* @brief Add new columns to the table. | ||
* @param database_name database to which the table belongs to | ||
* @param table_name table to which the column belongs to | ||
* @param columns the column to be added | ||
* @param txn the transaction Context | ||
* @return TransactionContext ResultType(SUCCESS or FAILURE) | ||
* | ||
*/ | ||
ResultType Catalog::AddColumn( | ||
UNUSED_ATTRIBUTE const std::string &database_name, | ||
UNUSED_ATTRIBUTE const std::string &table_name, | ||
UNUSED_ATTRIBUTE const std::vector<std::string> &columns, | ||
UNUSED_ATTRIBUTE concurrency::TransactionContext *txn) { | ||
// TODO: perform ADD Operation | ||
return ResultType::SUCCESS; | ||
} | ||
|
||
/** | ||
* @brief Drop the column from the table. | ||
* @param database_name database to which the table belongs to | ||
* @param table_name table to which the columns belong to | ||
* @param columns the columns to be dropped | ||
* @param txn the transaction Context | ||
* @return TransactionContext ResultType(SUCCESS or FAILURE) | ||
*/ | ||
|
||
ResultType Catalog::DropColumn(UNUSED_ATTRIBUTE const std::string &database_name, | ||
UNUSED_ATTRIBUTE const std::string &table_name, | ||
UNUSED_ATTRIBUTE const std::vector<std::string> &columns, | ||
UNUSED_ATTRIBUTE concurrency::TransactionContext *txn) { | ||
return ResultType::SUCCESS; | ||
} | ||
|
||
/** | ||
* @brief Change the column name in the catalog. | ||
* @param database_name database to which the table belongs to | ||
* @param table_name table to which the column belongs to | ||
* @param columns the column to be dropped | ||
* @param txn the transaction Context | ||
* @return TransactionContext ResultType(SUCCESS or FAILURE) | ||
*/ | ||
ResultType Catalog::RenameColumn(const std::string &database_name, | ||
const std::string &table_name, | ||
const std::string &old_name, | ||
const std::string &new_name, | ||
const std::string &schema_name, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) { | ||
throw CatalogException("Change Column requires transaction."); | ||
} | ||
|
||
if (new_name.size() == 0) { | ||
throw CatalogException("Name can not be empty string."); | ||
} | ||
|
||
LOG_TRACE("Change Column Name %s to %s", old_name.c_str(), new_name.c_str()); | ||
|
||
try { | ||
// Get table from the name | ||
auto table = Catalog::GetInstance()->GetTableWithName(database_name, schema_name, | ||
table_name, txn); | ||
auto schema = table->GetSchema(); | ||
|
||
// Currently we only support change the first column name! | ||
|
||
// Check the validity of old name and the new name | ||
oid_t columnId = schema->GetColumnID(new_name); | ||
if (columnId != INVALID_OID) { | ||
throw CatalogException("New column already exists in the table."); | ||
} | ||
columnId = schema->GetColumnID(old_name); | ||
if (columnId == INVALID_OID) { | ||
throw CatalogException("Old column does not exist in the table."); | ||
} | ||
|
||
// Change column name in the global schema | ||
schema->ChangeColumnName(columnId, new_name); | ||
|
||
// Modify the pg_table | ||
oid_t table_oid = Catalog::GetInstance() | ||
->GetTableObject(database_name, schema_name, table_name, txn) | ||
->GetTableOid(); | ||
oid_t database_oid = Catalog::GetInstance() | ||
->GetTableObject(database_name, schema_name, table_name, txn) | ||
->GetDatabaseOid(); | ||
auto pg_attributes = | ||
catalog_map_[database_oid]->GetColumnCatalog(); | ||
bool res = pg_attributes->RenameColumn( | ||
database_oid, table_oid, old_name, new_name, txn); | ||
if (!res) { | ||
throw CatalogException("Change Column name failed."); | ||
} | ||
|
||
} catch (CatalogException &e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] Maybe printing the stack trace here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will fix this! |
||
return ResultType::FAILURE; | ||
} | ||
return ResultType::SUCCESS; | ||
} | ||
|
||
//===--------------------------------------------------------------------===// | ||
// DEPRECATED | ||
//===--------------------------------------------------------------------===// | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[good] Good comments!