Skip to content

Commit 98ba457

Browse files
committed
[server] Support alter database comment and custom properties
1 parent ead97f9 commit 98ba457

File tree

17 files changed

+688
-5
lines changed

17 files changed

+688
-5
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.fluss.exception.TableNotPartitionedException;
4545
import org.apache.fluss.exception.TooManyBucketsException;
4646
import org.apache.fluss.exception.TooManyPartitionsException;
47+
import org.apache.fluss.metadata.DatabaseChange;
4748
import org.apache.fluss.metadata.DatabaseDescriptor;
4849
import org.apache.fluss.metadata.DatabaseInfo;
4950
import org.apache.fluss.metadata.PartitionInfo;
@@ -122,6 +123,24 @@ public interface Admin extends AutoCloseable {
122123
CompletableFuture<Void> createDatabase(
123124
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists);
124125

126+
/**
127+
* Alter a database with the given {@code databaseChanges}.
128+
*
129+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
130+
*
131+
* <ul>
132+
* <li>{@link DatabaseNotExistException} when the database does not exist and {@code
133+
* ignoreIfNotExists} is false.
134+
* </ul>
135+
*
136+
* @param databaseName The name of the database.
137+
* @param databaseChanges The database changes.
138+
* @param ignoreIfNotExists if it is true, do nothing if database does not exist. If false,
139+
* throw a {@link DatabaseNotExistException}.
140+
*/
141+
CompletableFuture<Void> alterDatabase(
142+
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists);
143+
125144
/**
126145
* Get the database with the given database name asynchronously.
127146
*
@@ -246,8 +265,8 @@ CompletableFuture<Void> createTable(
246265
*
247266
* <ul>
248267
* <li>{@link DatabaseNotExistException} when the database does not exist.
249-
* <li>{@link TableNotExistException} when the table does not exist, and ignoreIfNotExists is
250-
* false.
268+
* <li>{@link TableNotExistException} when the table does not exist and {@code
269+
* ignoreIfNotExists} is false.
251270
* <li>{@link InvalidAlterTableException} if the alter operation is invalid, such as alter set
252271
* a table option which is not supported to modify currently.
253272
* </ul>

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.config.cluster.AlterConfig;
2828
import org.apache.fluss.config.cluster.ConfigEntry;
2929
import org.apache.fluss.exception.LeaderNotAvailableException;
30+
import org.apache.fluss.metadata.DatabaseChange;
3031
import org.apache.fluss.metadata.DatabaseDescriptor;
3132
import org.apache.fluss.metadata.DatabaseInfo;
3233
import org.apache.fluss.metadata.PartitionInfo;
@@ -45,6 +46,7 @@
4546
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4647
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4748
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
49+
import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
4850
import org.apache.fluss.rpc.messages.AlterTableRequest;
4951
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5052
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -263,6 +265,23 @@ public CompletableFuture<Void> alterTable(
263265
return gateway.alterTable(request).thenApply(r -> null);
264266
}
265267

268+
@Override
269+
public CompletableFuture<Void> alterDatabase(
270+
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists) {
271+
TablePath.validateDatabaseName(databaseName);
272+
AlterDatabaseRequest request = new AlterDatabaseRequest();
273+
274+
List<PbAlterConfig> pbDatabaseChanges =
275+
databaseChanges.stream()
276+
.map(ClientRpcMessageUtils::toPbAlterConfigsForDatabase)
277+
.collect(Collectors.toList());
278+
279+
request.addAllConfigChanges(pbDatabaseChanges)
280+
.setDatabaseName(databaseName)
281+
.setIgnoreIfNotExists(ignoreIfNotExists);
282+
return gateway.alterDatabase(request).thenApply(r -> null);
283+
}
284+
266285
@Override
267286
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
268287
GetTableInfoRequest request = new GetTableInfoRequest();

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.fluss.fs.FsPath;
3131
import org.apache.fluss.fs.FsPathAndFileName;
3232
import org.apache.fluss.fs.token.ObtainedSecurityToken;
33+
import org.apache.fluss.metadata.DatabaseChange;
3334
import org.apache.fluss.metadata.PartitionInfo;
3435
import org.apache.fluss.metadata.PartitionSpec;
3536
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -72,6 +73,7 @@
7273
import java.util.Set;
7374
import java.util.stream.Collectors;
7475

76+
import static org.apache.fluss.config.FlussConfigUtils.COMMENT_PROP;
7577
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
7678
import static org.apache.fluss.utils.Preconditions.checkState;
7779

@@ -385,4 +387,27 @@ public static List<ConfigEntry> toConfigEntries(List<PbDescribeConfig> pbDescrib
385387
pbDescribeConfig.getConfigSource())))
386388
.collect(Collectors.toList());
387389
}
390+
391+
public static PbAlterConfig toPbAlterConfigsForDatabase(DatabaseChange databaseChange) {
392+
PbAlterConfig info = new PbAlterConfig();
393+
if (databaseChange instanceof DatabaseChange.SetOption) {
394+
DatabaseChange.SetOption setOption = (DatabaseChange.SetOption) databaseChange;
395+
info.setConfigKey(setOption.getKey());
396+
info.setConfigValue(setOption.getValue());
397+
info.setOpType(AlterConfigOpType.SET.value());
398+
} else if (databaseChange instanceof DatabaseChange.ResetOption) {
399+
DatabaseChange.ResetOption resetOption = (DatabaseChange.ResetOption) databaseChange;
400+
info.setConfigKey(resetOption.getKey());
401+
info.setOpType(AlterConfigOpType.DELETE.value());
402+
} else if (databaseChange instanceof DatabaseChange.SetComment) {
403+
DatabaseChange.SetComment setComment = (DatabaseChange.SetComment) databaseChange;
404+
info.setConfigKey(COMMENT_PROP);
405+
info.setConfigValue(setComment.getComment());
406+
info.setOpType(AlterConfigOpType.SET.value());
407+
} else {
408+
throw new IllegalArgumentException(
409+
"Unsupported database change: " + databaseChange.getClass());
410+
}
411+
return info;
412+
}
388413
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.fluss.exception.TooManyPartitionsException;
4949
import org.apache.fluss.fs.FsPath;
5050
import org.apache.fluss.fs.FsPathAndFileName;
51+
import org.apache.fluss.metadata.DatabaseChange;
5152
import org.apache.fluss.metadata.DatabaseDescriptor;
5253
import org.apache.fluss.metadata.DatabaseInfo;
5354
import org.apache.fluss.metadata.DeleteBehavior;
@@ -160,6 +161,75 @@ void testGetDatabaseInfo() throws Exception {
160161
.isBetween(timestampBeforeCreate, timestampAfterCreate);
161162
}
162163

164+
@Test
165+
void testAlterDatabase() throws Exception {
166+
// create database
167+
String dbName = "test_alter_db";
168+
admin.createDatabase(
169+
dbName,
170+
DatabaseDescriptor.builder()
171+
.comment("original comment")
172+
.customProperty("key1", "value1")
173+
.customProperty("key2", "value2")
174+
.build(),
175+
false)
176+
.get();
177+
178+
DatabaseInfo databaseInfo = admin.getDatabaseInfo(dbName).get();
179+
DatabaseDescriptor existingDescriptor = databaseInfo.getDatabaseDescriptor();
180+
181+
// Verify initial state
182+
assertThat(existingDescriptor.getComment().get()).isEqualTo("original comment");
183+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key1", "value1");
184+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key2", "value2");
185+
186+
// Alter database: add and modify custom properties
187+
List<DatabaseChange> databaseChanges = new ArrayList<>();
188+
databaseChanges.add(DatabaseChange.set("key3", "value3"));
189+
databaseChanges.add(DatabaseChange.set("key1", "updated_value1"));
190+
databaseChanges.add(DatabaseChange.setComment("updated comment"));
191+
admin.alterDatabase(dbName, databaseChanges, false).get();
192+
193+
// Verify alterations
194+
DatabaseInfo alteredDatabaseInfo = admin.getDatabaseInfo(dbName).get();
195+
DatabaseDescriptor alteredDescriptor = alteredDatabaseInfo.getDatabaseDescriptor();
196+
assertThat(alteredDescriptor.getComment().get()).isEqualTo("updated comment");
197+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
198+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key2", "value2");
199+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key3", "value3");
200+
assertThat(alteredDescriptor.getCustomProperties()).hasSize(3);
201+
202+
// Alter database: reset a property
203+
databaseChanges = new ArrayList<>();
204+
databaseChanges.add(DatabaseChange.reset("key2"));
205+
admin.alterDatabase(dbName, databaseChanges, false).get();
206+
207+
// Verify reset
208+
DatabaseInfo resetDatabaseInfo = admin.getDatabaseInfo(dbName).get();
209+
DatabaseDescriptor resetDescriptor = resetDatabaseInfo.getDatabaseDescriptor();
210+
assertThat(resetDescriptor.getComment().get()).isEqualTo("updated comment");
211+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
212+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key3", "value3");
213+
assertThat(resetDescriptor.getCustomProperties()).doesNotContainKey("key2");
214+
assertThat(resetDescriptor.getCustomProperties()).hasSize(2);
215+
216+
// throw exception if database not exist
217+
List<DatabaseChange> finalDatabaseChanges = databaseChanges;
218+
assertThatThrownBy(
219+
() ->
220+
admin.alterDatabase(
221+
"test_alter_db_not_exist",
222+
finalDatabaseChanges,
223+
false)
224+
.get())
225+
.cause()
226+
.isInstanceOf(DatabaseNotExistException.class)
227+
.hasMessage(String.format("Database %s not exists.", "test_alter_db_not_exist"));
228+
229+
// should success if ignore not exist
230+
admin.alterDatabase("test_alter_db_not_exist", databaseChanges, true).get();
231+
}
232+
163233
@Test
164234
void testGetTableInfoAndSchema() throws Exception {
165235
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class FlussConfigUtils {
3838

3939
public static final List<String> ALTERABLE_TABLE_OPTIONS;
4040

41+
// ======================= Constants ===============================
42+
43+
// constants for table and database
44+
public static final String COMMENT_PROP = "comment";
45+
4146
static {
4247
TABLE_OPTIONS = extractConfigOptions("table.");
4348
CLIENT_OPTIONS = extractConfigOptions("client.");

0 commit comments

Comments
 (0)