Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ private void clearExpireFinishedOrCancelledAlterJobsV2() {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isExpire() && GlobalStateMgr.getCurrentState()
.getClusterSnapshotMgr().isDeletionSafeToExecute(alterJobV2.getFinishedTimeMs())) {
iterator.remove();
RemoveAlterJobV2OperationLog log =
new RemoveAlterJobV2OperationLog(alterJobV2.getJobId(), alterJobV2.getType());
GlobalStateMgr.getCurrentState().getEditLog().logRemoveExpiredAlterJobV2(log);
GlobalStateMgr.getCurrentState().getEditLog().logRemoveExpiredAlterJobV2(log, wal -> {
iterator.remove();
});
LOG.info("remove expired {} job {}. finish at {}", alterJobV2.getType(),
alterJobV2.getJobId(), TimeUtils.longToTimeString(alterJobV2.getFinishedTimeMs()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ public Void visitSwapTableClause(SwapTableClause clause, ConnectContext context)

// First, we need to check whether the table to be operated on can be renamed
try {
olapNewTbl.checkAndSetName(origTblName, true);
origTable.checkAndSetName(newTblName, true);
olapNewTbl.checkNameConflict(origTblName);
origTable.checkNameConflict(newTblName);

if (origTable.isMaterializedView() || newTbl.isMaterializedView()) {
if (!(origTable.isMaterializedView() && newTbl.isMaterializedView())) {
Expand Down Expand Up @@ -539,8 +539,6 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
String colocateGroup = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
GlobalStateMgr.getCurrentState().getColocateTableIndex()
.modifyTableColocate(db, olapTable, colocateGroup, false, null);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
GlobalStateMgr.getCurrentState().getLocalMetastore().convertDistributionType(db, olapTable);
} else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
if (!olapTable.dynamicPartitionExists()) {
try {
Expand Down Expand Up @@ -944,8 +942,7 @@ public Void visitAlterViewClause(AlterViewClause alterViewClause, ConnectContext
alterViewClause.getColumns(),
ctx.getSessionVariable().getSqlMode(), alterViewClause.getComment());

GlobalStateMgr.getCurrentState().getAlterJobMgr().alterView(alterViewInfo, false);
GlobalStateMgr.getCurrentState().getEditLog().logModifyViewDef(alterViewInfo);
GlobalStateMgr.getCurrentState().getAlterJobMgr().alterView(alterViewInfo);
return null;
}

Expand Down
54 changes: 34 additions & 20 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,11 @@ public void swapTableInternal(SwapTableOperationLog log) throws DdlException {
db.dropTable(newTblName);

// rename new table name to origin table name and add it to database
newTbl.checkAndSetName(origTblName, false);
newTbl.checkAndSetName(origTblName);
db.registerTableUnlocked(newTbl);

// rename origin table name to new table name and add it to database
origTable.checkAndSetName(newTblName, false);
origTable.checkAndSetName(newTblName);
db.registerTableUnlocked(origTable);

// swap dependencies of base table
Expand Down Expand Up @@ -573,34 +573,48 @@ private void invalidateRelatedMaterializedViews(OlapTable olapTable) {
relatedMvIds.clear();
}

public void alterView(AlterViewInfo alterViewInfo, boolean isReplay) {
long dbId = alterViewInfo.getDbId();
long tableId = alterViewInfo.getTableId();
public void alterView(AlterViewInfo alterViewInfo) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(alterViewInfo.getDbId());
View view = (View) GlobalStateMgr.getCurrentState()
.getLocalMetastore().getTable(db.getId(), alterViewInfo.getTableId());
String inlineViewDef = alterViewInfo.getInlineViewDef();
List<Column> newFullSchema = alterViewInfo.getNewFullSchema();
String comment = alterViewInfo.getComment();

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
View view = (View) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
long sqlMode = alterViewInfo.getSqlMode();

Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(view.getId()), LockType.WRITE);
try {
String viewName = view.getName();
view.setInlineViewDefWithSqlMode(inlineViewDef, alterViewInfo.getSqlMode());
try {
view.init();
view.checkInlineViewDef(inlineViewDef, sqlMode);
} catch (StarRocksException e) {
throw new AlterJobException("failed to init view stmt", e);
throw new AlterJobException("failed to check inline view def", e);
}
view.setNewFullSchema(newFullSchema);
view.setComment(comment);
GlobalStateMgr.getCurrentState().getEditLog().logModifyViewDef(alterViewInfo, wal -> {
view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode);
view.setNewFullSchema(alterViewInfo.getNewFullSchema());
view.setComment(alterViewInfo.getComment());
});
AlterMVJobExecutor.inactiveRelatedMaterializedViewsRecursive(view,
MaterializedViewExceptions.inactiveReasonForBaseViewChanged(viewName), isReplay);
db.dropTable(viewName);
db.registerTableUnlocked(view);
MaterializedViewExceptions.inactiveReasonForBaseViewChanged(view.getName()), false);
LOG.info("modify view[{}] definition to {}", view.getName(), inlineViewDef);
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(view.getId()), LockType.WRITE);
}
}

LOG.info("replay modify view[{}] definition to {}", viewName, inlineViewDef);
public void replayAlterView(AlterViewInfo alterViewInfo) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(alterViewInfo.getDbId());
View view = (View) GlobalStateMgr.getCurrentState()
.getLocalMetastore().getTable(db.getId(), alterViewInfo.getTableId());

Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(view.getId()), LockType.WRITE);
try {
view.setInlineViewDefWithSqlMode(alterViewInfo.getInlineViewDef(), alterViewInfo.getSqlMode());
view.setNewFullSchema(alterViewInfo.getNewFullSchema());
view.setComment(alterViewInfo.getComment());
AlterMVJobExecutor.inactiveRelatedMaterializedViewsRecursive(view,
MaterializedViewExceptions.inactiveReasonForBaseViewChanged(view.getName()), true);
LOG.info("modify view[{}] definition to {}", view.getName(), alterViewInfo.getInlineViewDef());
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(view.getId()), LockType.WRITE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,14 @@ public Void visitTableRenameClause(TableRenameClause clause, ConnectContext cont
if (GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), newMvName) != null) {
throw new SemanticException("Materialized view [" + newMvName + "] is already used");
}
table.setName(newMvName);
db.dropTable(oldMvName);
db.registerTableUnlocked(table);
final RenameMaterializedViewLog renameMaterializedViewLog =
new RenameMaterializedViewLog(table.getId(), db.getId(), newMvName);
updateTaskDefinition((MaterializedView) table);
GlobalStateMgr.getCurrentState().getEditLog().logMvRename(renameMaterializedViewLog);
GlobalStateMgr.getCurrentState().getEditLog().logMvRename(renameMaterializedViewLog, wal -> {
table.setName(newMvName);
db.dropTable(oldMvName);
db.registerTableUnlocked(table);
updateTaskDefinition((MaterializedView) table);
});
LOG.info("rename materialized view[{}] to {}, id: {}", oldMvName, newMvName, table.getId());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,10 @@ private void processModifyColumnComment(ModifyColumnCommentClause alterClause, D
if (!oneCol.isPresent()) {
throw new DdlException("Column[" + modifyColumnName + "] does not exists");
} else {
oneCol.get().setComment(comment);
ModifyColumnCommentLog log = new ModifyColumnCommentLog(db.getId(), olapTable.getId(), modifyColumnName, comment);
GlobalStateMgr.getCurrentState().getEditLog().logModifyColumnComment(log);
GlobalStateMgr.getCurrentState().getEditLog().logModifyColumnComment(log, wal -> {
oneCol.get().setComment(comment);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException {
long repoId = globalStateMgr.getNextId();
Repository repo = new Repository(repoId, stmt.getName(), stmt.isReadOnly(), stmt.getLocation(), storage);

Status st = repoMgr.addAndInitRepoIfNotExist(repo, false);
Status st = repoMgr.addAndInitRepoIfNotExist(repo);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: " + st.getErrMsg());
Expand All @@ -260,7 +260,7 @@ public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
}
}

Status st = repoMgr.removeRepo(repo.getName(), false /* not replay */);
Status st = repoMgr.removeRepo(repo.getName());
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to drop repository: " + st.getErrMsg());
Expand Down
81 changes: 52 additions & 29 deletions fe/fe-core/src/main/java/com/starrocks/backup/RepositoryMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,46 @@ protected void runOneCycle() {
}
}

public Status addAndInitRepoIfNotExist(Repository repo, boolean isReplay) {
public Status addAndInitRepoIfNotExist(Repository repo) {
lock.lock();
try {
if (!repoNameMap.containsKey(repo.getName())) {
if (!isReplay) {
// create repository path and repo info file in remote storage
Status st = repo.initRepository();
if (!st.ok()) {
return st;
}
// create repository path and repo info file in remote storage
Status st = repo.initRepository();
if (!st.ok()) {
return st;
}
repoNameMap.put(repo.getName(), repo);
repoIdMap.put(repo.getId(), repo);

if (!isReplay) {
// write log
GlobalStateMgr.getCurrentState().getEditLog().logCreateRepository(repo);
}

LOG.info("successfully adding repo {} to repository mgr. is replay: {}",
repo.getName(), isReplay);
// write log
GlobalStateMgr.getCurrentState().getEditLog().logCreateRepository(repo, wal -> {
addRepoWithoutLock(repo);
});
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "repository with same name already exist: " + repo.getName());
}
return new Status(ErrCode.COMMON_ERROR, "repository with same name already exist: " + repo.getName());
} finally {
lock.unlock();
}
}

public void replayAddRepo(Repository repo) {
lock.lock();
try {
if (!repoNameMap.containsKey(repo.getName())) {
addRepoWithoutLock(repo);
}
} finally {
lock.unlock();
}
}

private void addRepoWithoutLock(Repository repo) {
repoNameMap.put(repo.getName(), repo);
repoIdMap.put(repo.getId(), repo);
LOG.info("successfully adding repo {} to repository mgr. ", repo.getName());
}

public Repository getRepo(String repoName) {
return repoNameMap.get(repoName);
}
Expand All @@ -113,18 +124,15 @@ public Repository getRepo(long repoId) {
return repoIdMap.get(repoId);
}

public Status removeRepo(String repoName, boolean isReplay) {
public Status removeRepo(String repoName) {
lock.lock();
try {
Repository repo = repoNameMap.remove(repoName);
Repository repo = repoNameMap.get(repoName);
if (repo != null) {
repoIdMap.remove(repo.getId());

if (!isReplay) {
// log
GlobalStateMgr.getCurrentState().getEditLog().logDropRepository(repoName);
}
LOG.info("successfully removing repo {} from repository mgr", repoName);
// log
GlobalStateMgr.getCurrentState().getEditLog().logDropRepository(repoName, wal -> {
removeRepoWithoutLock(repo);
});
return Status.OK;
}
return new Status(ErrCode.NOT_FOUND, "repository does not exist");
Expand All @@ -133,6 +141,24 @@ public Status removeRepo(String repoName, boolean isReplay) {
}
}

public void replayRemoveRepo(String repoName) {
lock.lock();
try {
Repository repo = repoNameMap.get(repoName);
if (repo != null) {
removeRepoWithoutLock(repo);
}
} finally {
lock.unlock();
}
}

private void removeRepoWithoutLock(Repository repo) {
repoIdMap.remove(repo.getId());
repoNameMap.remove(repo.getName());
LOG.info("successfully removing repo {} from repository mgr", repo.getName());
}

public List<List<String>> getReposInfo() {
List<List<String>> infos = Lists.newArrayList();
for (Repository repo : repoIdMap.values()) {
Expand All @@ -141,9 +167,6 @@ public List<List<String>> getReposInfo() {
return infos;
}




@Override
public void gsonPostProcess() throws IOException {
for (Repository repo : repoNameMap.values()) {
Expand Down
Loading
Loading