Skip to content

Commit 7364a8e

Browse files
committed
modified based on CR
1 parent e321693 commit 7364a8e

File tree

47 files changed

+541
-622
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+541
-622
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
24+
import org.apache.fluss.cluster.AlterConfig;
25+
import org.apache.fluss.cluster.ConfigEntry;
2426
import org.apache.fluss.cluster.ServerNode;
2527
import org.apache.fluss.config.ConfigOptions;
26-
import org.apache.fluss.config.dynamic.AlterConfigOp;
27-
import org.apache.fluss.config.dynamic.ConfigEntry;
2828
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2929
import org.apache.fluss.exception.DatabaseNotEmptyException;
3030
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -483,13 +483,13 @@ ListOffsetsResult listOffsets(
483483
*
484484
* @return A CompletableFuture containing the configs of the cluster.
485485
*/
486-
CompletableFuture<Collection<ConfigEntry>> describeConfigs();
486+
CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs();
487487

488488
/**
489489
* Alter the configs of the cluster.
490490
*
491491
* @param configs List of configs to alter.
492492
* @return A CompletableFuture indicating completion of the operation.
493493
*/
494-
CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
494+
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);
495495
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
25+
import org.apache.fluss.cluster.AlterConfig;
2526
import org.apache.fluss.cluster.Cluster;
27+
import org.apache.fluss.cluster.ConfigEntry;
2628
import org.apache.fluss.cluster.ServerNode;
27-
import org.apache.fluss.config.dynamic.AlterConfigOp;
28-
import org.apache.fluss.config.dynamic.ConfigEntry;
2929
import org.apache.fluss.exception.LeaderNotAvailableException;
3030
import org.apache.fluss.metadata.DatabaseDescriptor;
3131
import org.apache.fluss.metadata.DatabaseInfo;
@@ -44,14 +44,14 @@
4444
import org.apache.fluss.rpc.gateway.AdminGateway;
4545
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4646
import org.apache.fluss.rpc.gateway.TabletServerGateway;
47-
import org.apache.fluss.rpc.messages.AlterConfigsRequest;
47+
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
4848
import org.apache.fluss.rpc.messages.AlterTableRequest;
4949
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5050
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5151
import org.apache.fluss.rpc.messages.CreateTableRequest;
5252
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
5353
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
54-
import org.apache.fluss.rpc.messages.DescribeConfigsRequest;
54+
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
5555
import org.apache.fluss.rpc.messages.DropAclsRequest;
5656
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
5757
import org.apache.fluss.rpc.messages.DropTableRequest;
@@ -69,8 +69,6 @@
6969
import org.apache.fluss.rpc.messages.ListTablesRequest;
7070
import org.apache.fluss.rpc.messages.ListTablesResponse;
7171
import org.apache.fluss.rpc.messages.PbAlterConfig;
72-
import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
73-
import org.apache.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
7472
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7573
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7674
import org.apache.fluss.rpc.messages.PbTablePath;
@@ -96,6 +94,7 @@
9694
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9795
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
9896
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
97+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
9998
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
10099
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
101100
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
@@ -494,59 +493,43 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
494493
}
495494

496495
@Override
497-
public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
496+
public CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs() {
498497
CompletableFuture<Collection<ConfigEntry>> future = new CompletableFuture<>();
499-
DescribeConfigsRequest request = new DescribeConfigsRequest();
500-
gateway.describeConfigs(request)
498+
DescribeClusterConfigsRequest request = new DescribeClusterConfigsRequest();
499+
gateway.describeClusterConfigs(request)
501500
.whenComplete(
502501
(r, t) -> {
503502
if (t != null) {
504503
future.completeExceptionally(t);
504+
} else {
505+
future.complete(toConfigEntries(r.getConfigsList()));
505506
}
506-
507-
List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList();
508-
List<ConfigEntry> configEntries =
509-
responseInfos.stream()
510-
.map(
511-
responseInfo ->
512-
new ConfigEntry(
513-
responseInfo.getConfigKey(),
514-
responseInfo.hasConfigValue()
515-
? responseInfo
516-
.getConfigValue()
517-
: null,
518-
ConfigEntry.ConfigSource
519-
.valueOf(
520-
responseInfo
521-
.getConfigSource())))
522-
.collect(Collectors.toList());
523-
future.complete(configEntries);
524507
});
525508
return future;
526509
}
527510

528511
@Override
529-
public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs) {
512+
public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs) {
530513
CompletableFuture<Void> future = new CompletableFuture<>();
531514

532-
AlterConfigsRequest request = new AlterConfigsRequest();
533-
for (AlterConfigOp alterConfigOp : configs) {
534-
PbAlterConfigsRequestInfo requestInfo =
535-
request.addInfo()
536-
.setConfigKey(alterConfigOp.key())
537-
.setOpType(alterConfigOp.opType().id());
538-
if (alterConfigOp.value() != null) {
539-
requestInfo.setConfigValue(alterConfigOp.value());
515+
AlterClusterConfigsRequest request = new AlterClusterConfigsRequest();
516+
for (AlterConfig alterConfig : configs) {
517+
PbAlterConfig pBAlterConfig =
518+
request.addAlterConfig()
519+
.setConfigKey(alterConfig.key())
520+
.setOpType(alterConfig.opType().value);
521+
if (alterConfig.value() != null) {
522+
pBAlterConfig.setConfigValue(alterConfig.value());
540523
}
541524
}
542-
gateway.alterConfigs(request)
525+
gateway.alterClusterConfigs(request)
543526
.whenComplete(
544527
(r, t) -> {
545528
if (t != null) {
546529
future.completeExceptionally(t);
530+
} else {
531+
future.complete(null);
547532
}
548-
549-
future.complete(null);
550533
});
551534

552535
return future;

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.ConfigEntry;
2829
import org.apache.fluss.fs.FsPath;
2930
import org.apache.fluss.fs.FsPathAndFileName;
3031
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -46,6 +47,7 @@
4647
import org.apache.fluss.rpc.messages.LookupRequest;
4748
import org.apache.fluss.rpc.messages.MetadataRequest;
4849
import org.apache.fluss.rpc.messages.PbAlterConfig;
50+
import org.apache.fluss.rpc.messages.PbDescribeConfig;
4951
import org.apache.fluss.rpc.messages.PbKeyValue;
5052
import org.apache.fluss.rpc.messages.PbKvSnapshot;
5153
import org.apache.fluss.rpc.messages.PbLakeSnapshotForBucket;
@@ -369,4 +371,18 @@ public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {
369371
}
370372
return info;
371373
}
374+
375+
public static List<ConfigEntry> toConfigEntries(List<PbDescribeConfig> pbDescribeConfigs) {
376+
return pbDescribeConfigs.stream()
377+
.map(
378+
pbDescribeConfig ->
379+
new ConfigEntry(
380+
pbDescribeConfig.getConfigKey(),
381+
pbDescribeConfig.hasConfigValue()
382+
? pbDescribeConfig.getConfigValue()
383+
: null,
384+
ConfigEntry.ConfigSource.valueOf(
385+
pbDescribeConfig.getConfigSource())))
386+
.collect(Collectors.toList());
387+
}
372388
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import org.apache.fluss.client.metadata.KvSnapshots;
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
26+
import org.apache.fluss.cluster.AlterConfig;
27+
import org.apache.fluss.cluster.ConfigEntry;
2628
import org.apache.fluss.cluster.ServerNode;
2729
import org.apache.fluss.config.AutoPartitionTimeUnit;
2830
import org.apache.fluss.config.ConfigOptions;
2931
import org.apache.fluss.config.Configuration;
30-
import org.apache.fluss.config.dynamic.AlterConfigOp;
31-
import org.apache.fluss.config.dynamic.ConfigEntry;
3232
import org.apache.fluss.exception.DatabaseAlreadyExistException;
3333
import org.apache.fluss.exception.DatabaseNotEmptyException;
3434
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -47,6 +47,7 @@
4747
import org.apache.fluss.exception.TooManyPartitionsException;
4848
import org.apache.fluss.fs.FsPath;
4949
import org.apache.fluss.fs.FsPathAndFileName;
50+
import org.apache.fluss.metadata.AlterConfigOpType;
5051
import org.apache.fluss.metadata.DatabaseDescriptor;
5152
import org.apache.fluss.metadata.DatabaseInfo;
5253
import org.apache.fluss.metadata.KvFormat;
@@ -67,10 +68,13 @@
6768
import org.junit.jupiter.api.BeforeEach;
6869
import org.junit.jupiter.api.Test;
6970

71+
import javax.annotation.Nullable;
72+
7073
import java.time.Duration;
7174
import java.time.LocalDate;
7275
import java.util.ArrayList;
7376
import java.util.Arrays;
77+
import java.util.Collection;
7478
import java.util.Collections;
7579
import java.util.HashMap;
7680
import java.util.List;
@@ -951,42 +955,44 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException {
951955
.getDataLakeFormat())
952956
.isEqualTo(PAIMON);
953957

954-
admin.alterConfigs(
958+
admin.alterClusterConfigs(
955959
Collections.singletonList(
956-
new AlterConfigOp(
957-
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET)))
960+
new AlterConfig(
961+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
958962
.get();
959963
assertThat(
960964
FLUSS_CLUSTER_EXTENSION
961965
.getCoordinatorServer()
962966
.getCoordinatorService()
963967
.getDataLakeFormat())
964968
.isNull();
965-
assertThat(admin.describeConfigs().get())
966-
.contains(
967-
new ConfigEntry(
968-
DATALAKE_FORMAT.key(),
969-
null,
970-
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
969+
assertConfigEntry(
970+
DATALAKE_FORMAT.key(), null, ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
971971

972972
// Delete dynamic configs to use the initial value(from server.yaml)
973-
admin.alterConfigs(
973+
admin.alterClusterConfigs(
974974
Collections.singletonList(
975-
new AlterConfigOp(
976-
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE)))
975+
new AlterConfig(
976+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.DELETE)))
977977
.get();
978978
assertThat(
979979
FLUSS_CLUSTER_EXTENSION
980980
.getCoordinatorServer()
981981
.getCoordinatorService()
982982
.getDataLakeFormat())
983983
.isEqualTo(PAIMON);
984-
assertThat(admin.describeConfigs().get())
985-
.contains(
986-
new ConfigEntry(
987-
DATALAKE_FORMAT.key(),
988-
"paimon",
989-
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
984+
assertConfigEntry(
985+
DATALAKE_FORMAT.key(), "paimon", ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
986+
}
987+
988+
private void assertConfigEntry(
989+
String key, @Nullable String value, ConfigEntry.ConfigSource source)
990+
throws ExecutionException, InterruptedException {
991+
Collection<ConfigEntry> configEntries = admin.describeClusterConfigs().get();
992+
List<String> configKeys =
993+
configEntries.stream().map(ConfigEntry::key).collect(Collectors.toList());
994+
assertThat(configKeys).doesNotHaveDuplicates();
995+
assertThat(configEntries).contains(new ConfigEntry(key, value, source));
990996
}
991997

992998
private void assertNoBucketSnapshot(KvSnapshots snapshots, int expectBucketNum) {

0 commit comments

Comments
 (0)