Skip to content

Commit 791bffc

Browse files
committed
[feature] Support alter database comment and custom properties
1 parent cf3dac1 commit 791bffc

File tree

15 files changed

+186
-6
lines changed

15 files changed

+186
-6
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,24 @@ CompletableFuture<Void> dropDatabase(
161161
/** List all databases in fluss cluster asynchronously. */
162162
CompletableFuture<List<String>> listDatabases();
163163

164+
/**
165+
* Alter a database asynchronously.
166+
*
167+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
168+
*
169+
* <ul>
170+
* <li>{@link DatabaseNotExistException} if the database not exists and {@code
171+
* ignoreIfNotExists} is false.
172+
* </ul>
173+
*
174+
* @param databaseName The name of the database to alter.
175+
* @param databaseDescriptor The descriptor of the database to alter.
176+
* @param ignoreIfNotExists Flag to specify behavior when a database with the given name not
177+
* exists: if set to false, throw a DatabaseNotExistException, if set to true, do nothing.
178+
*/
179+
CompletableFuture<Void> alterDatabase(
180+
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfNotExists);
181+
164182
/**
165183
* Create a new table asynchronously.
166184
*

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alibaba.fluss.rpc.RpcClient;
4040
import com.alibaba.fluss.rpc.gateway.AdminGateway;
4141
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
42+
import com.alibaba.fluss.rpc.messages.AlterDatabaseRequest;
4243
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
4344
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
4445
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
@@ -214,6 +215,16 @@ public CompletableFuture<List<String>> listDatabases() {
214215
.thenApply(ListDatabasesResponse::getDatabaseNamesList);
215216
}
216217

218+
@Override
219+
public CompletableFuture<Void> alterDatabase(
220+
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfNotExists) {
221+
AlterDatabaseRequest request = new AlterDatabaseRequest();
222+
request.setDatabaseJson(databaseDescriptor.toJsonBytes())
223+
.setDatabaseName(databaseName)
224+
.setIgnoreIdNotExists(ignoreIfNotExists);
225+
return gateway.alterDatabase(request).thenApply(r -> null);
226+
}
227+
217228
@Override
218229
public CompletableFuture<Void> createTable(
219230
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists) {

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ void testGetDatabaseInfo() throws Exception {
134134
DatabaseDescriptor.builder()
135135
.comment("test comment")
136136
.customProperty("key1", "value1")
137+
.customProperty("key2", "value2")
137138
.build(),
138139
false);
139140
DatabaseInfo databaseInfo = admin.getDatabaseInfo("test_db_2").get();
@@ -144,9 +145,30 @@ void testGetDatabaseInfo() throws Exception {
144145
.isEqualTo("test comment");
145146
assertThat(databaseInfo.getDatabaseDescriptor().getCustomProperties())
146147
.containsEntry("key1", "value1");
147-
assertThat(databaseInfo.getDatabaseDescriptor().getCustomProperties()).hasSize(1);
148+
assertThat(databaseInfo.getDatabaseDescriptor().getCustomProperties()).hasSize(2);
148149
assertThat(databaseInfo.getCreatedTime())
149150
.isBetween(timestampBeforeCreate, timestampAfterCreate);
151+
152+
long timesBeforeAlter = System.currentTimeMillis();
153+
admin.alterDatabase(
154+
"test_db_2",
155+
DatabaseDescriptor.builder()
156+
.comment("alter comment")
157+
.customProperty("key1", "new_value1")
158+
.customProperty("key3", "val3")
159+
.build(),
160+
false);
161+
databaseInfo = admin.getDatabaseInfo("test_db_2").get();
162+
long timestampAfterAlter = System.currentTimeMillis();
163+
assertThat(databaseInfo.getCreatedTime()).isLessThan(databaseInfo.getModifiedTime());
164+
assertThat(databaseInfo.getDatabaseName()).isEqualTo("test_db_2");
165+
assertThat(databaseInfo.getDatabaseDescriptor().getComment().get())
166+
.isEqualTo("alter comment");
167+
assertThat(databaseInfo.getDatabaseDescriptor().getCustomProperties())
168+
.containsEntry("key1", "new_value1")
169+
.containsEntry("key3", "val3");
170+
assertThat(databaseInfo.getDatabaseDescriptor().getCustomProperties()).hasSize(2);
171+
assertThat(databaseInfo.getModifiedTime()).isBetween(timesBeforeAlter, timestampAfterAlter);
150172
}
151173

152174
@Test

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,21 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean
230230
}
231231

232232
@Override
233-
public void alterDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean b)
233+
public void alterDatabase(
234+
String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfNotExists)
234235
throws DatabaseNotExistException, CatalogException {
235-
throw new UnsupportedOperationException();
236+
try {
237+
admin.alterDatabase(databaseName, toFlussDatabase(catalogDatabase), ignoreIfNotExists);
238+
} catch (Exception e) {
239+
Throwable t = ExceptionUtils.stripExecutionException(e);
240+
if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
241+
throw new DatabaseNotExistException(getName(), databaseName);
242+
} else {
243+
throw new CatalogException(
244+
String.format("Failed to alter database %s in %s", databaseName, getName()),
245+
t);
246+
}
247+
}
236248
}
237249

238250
@Override

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ void testCreateWithUnSupportDataType() {
511511

512512
@Test
513513
void testCreateDatabase() {
514+
// create database without anything
514515
tEnv.executeSql("create database test_db");
515516
List<Row> databases =
516517
CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect());
@@ -521,6 +522,16 @@ void testCreateDatabase() {
521522
tEnv.executeSql("drop database test_db");
522523
databases = CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect());
523524
assertThat(databases.toString()).isEqualTo(String.format("[+I[%s]]", DEFAULT_DB));
525+
526+
// create database with comment and custom properties
527+
tEnv.executeSql("create database test_db comment 'test_db' with('key1' = 'val1')");
528+
databases = CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect());
529+
530+
assertThat(databases.stream().map(Row::toString).collect(Collectors.toList()))
531+
.containsExactlyInAnyOrderElementsOf(
532+
Arrays.asList(String.format("+I[%s]", DEFAULT_DB), "+I[test_db]"));
533+
// alter database
534+
tEnv.executeSql("alter database test_db set('key1' = 'new_val1', 'key2' = 'val2')");
524535
}
525536

526537
@Test

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,17 @@ void testDatabase() throws Exception {
394394
assertThat(db2.getComment()).isEqualTo("test comment");
395395
assertThat(db2.getProperties())
396396
.isEqualTo(Collections.singletonMap(SCAN_STARTUP_MODE.key(), "earliest"));
397+
// test alter db2
398+
catalog.alterDatabase(
399+
"db2",
400+
new CatalogDatabaseImpl(
401+
Collections.singletonMap(SCAN_STARTUP_MODE.key(), "latest"),
402+
"alter comment"),
403+
false);
404+
db2 = catalog.getDatabase("db2");
405+
assertThat(db2.getComment()).isEqualTo("alter comment");
406+
assertThat(db2.getProperties())
407+
.isEqualTo(Collections.singletonMap(SCAN_STARTUP_MODE.key(), "latest"));
397408
// test create table in db1
398409
ObjectPath path1 = new ObjectPath("db1", "t1");
399410
CatalogTable table = this.newCatalogTable(new HashMap<>());
@@ -432,8 +443,6 @@ void testDatabase() throws Exception {
432443
assertThatThrownBy(() -> catalog.listTables("unknown"))
433444
.isInstanceOf(DatabaseNotExistException.class)
434445
.hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME);
435-
assertThatThrownBy(() -> catalog.alterDatabase("db2", null, false))
436-
.isInstanceOf(UnsupportedOperationException.class);
437446
assertThat(catalog.getDefaultDatabase()).isEqualTo(DEFAULT_DB);
438447

439448
// Test catalog with null default database

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.alibaba.fluss.rpc.gateway;
1919

20+
import com.alibaba.fluss.rpc.messages.AlterDatabaseRequest;
21+
import com.alibaba.fluss.rpc.messages.AlterDatabaseResponse;
2022
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
2123
import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
2224
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -56,6 +58,14 @@ public interface AdminGateway extends AdminReadOnlyGateway {
5658
@RPC(api = ApiKeys.DROP_DATABASE)
5759
CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest request);
5860

61+
/**
62+
* Alter a database.
63+
*
64+
* @param request Alter database request.
65+
*/
66+
@RPC(api = ApiKeys.ALTER_DATABASE)
67+
CompletableFuture<AlterDatabaseResponse> alterDatabase(AlterDatabaseRequest request);
68+
5969
/**
6070
* Creates a new table.
6171
*

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public enum ApiKeys {
7070
CREATE_ACLS(1039, 0, 0, PUBLIC),
7171
LIST_ACLS(1040, 0, 0, PUBLIC),
7272
DROP_ACLS(1041, 0, 0, PUBLIC),
73-
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
73+
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
74+
ALTER_DATABASE(1043, 0, 0, PUBLIC);
7475

7576
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
7677
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ message CreateDatabaseRequest {
6363
message CreateDatabaseResponse {
6464
}
6565

66+
// alter database request and response
67+
message AlterDatabaseRequest {
68+
required string database_name = 1;
69+
required bool ignore_id_not_exists = 2;
70+
required bytes database_json = 3;
71+
}
72+
73+
message AlterDatabaseResponse {
74+
}
75+
6676
// get table request and response
6777
message GetDatabaseInfoRequest {
6878
required string database_name = 1;

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
4343
import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
4444
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
45+
import com.alibaba.fluss.rpc.messages.AlterDatabaseRequest;
46+
import com.alibaba.fluss.rpc.messages.AlterDatabaseResponse;
4547
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
4648
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
4749
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -220,6 +222,23 @@ public CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest
220222
return CompletableFuture.completedFuture(response);
221223
}
222224

225+
@Override
226+
public CompletableFuture<AlterDatabaseResponse> alterDatabase(AlterDatabaseRequest request) {
227+
if (authorizer != null) {
228+
authorizer.authorize(
229+
currentSession(),
230+
OperationType.ALTER,
231+
Resource.database(request.getDatabaseName()));
232+
}
233+
234+
AlterDatabaseResponse response = new AlterDatabaseResponse();
235+
DatabaseDescriptor databaseDescriptor =
236+
DatabaseDescriptor.fromJsonBytes(request.getDatabaseJson());
237+
metadataManager.alterDatabase(
238+
request.getDatabaseName(), databaseDescriptor, request.isIgnoreIdNotExists());
239+
return CompletableFuture.completedFuture(response);
240+
}
241+
223242
@Override
224243
public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) {
225244
TablePath tablePath = toTablePath(request.getTablePath());

0 commit comments

Comments
 (0)