Skip to content

Commit 8c4004c

Browse files
[FIP-12] Support dynamic cluster config to enable lakehouse (#1567)
1 parent a8f9574 commit 8c4004c

File tree

50 files changed

+1887
-128
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1887
-128
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.config.ConfigOptions;
26+
import org.apache.fluss.config.cluster.AlterConfig;
27+
import org.apache.fluss.config.cluster.ConfigEntry;
2628
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2729
import org.apache.fluss.exception.DatabaseNotEmptyException;
2830
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -475,4 +477,19 @@ ListOffsetsResult listOffsets(
475477
* @return A CompletableFuture indicating completion of the operation.
476478
*/
477479
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
480+
481+
/**
482+
* Describe the configs of the cluster.
483+
*
484+
* @return A CompletableFuture containing the configs of the cluster.
485+
*/
486+
CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs();
487+
488+
/**
489+
* Alter the configs of the cluster.
490+
*
491+
* @param configs List of configs to alter.
492+
* @return A CompletableFuture indicating completion of the operation.
493+
*/
494+
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);
478495
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.config.cluster.AlterConfig;
28+
import org.apache.fluss.config.cluster.ConfigEntry;
2729
import org.apache.fluss.exception.LeaderNotAvailableException;
2830
import org.apache.fluss.metadata.DatabaseDescriptor;
2931
import org.apache.fluss.metadata.DatabaseInfo;
@@ -42,12 +44,14 @@
4244
import org.apache.fluss.rpc.gateway.AdminGateway;
4345
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4446
import org.apache.fluss.rpc.gateway.TabletServerGateway;
47+
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
4548
import org.apache.fluss.rpc.messages.AlterTableRequest;
4649
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4750
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4851
import org.apache.fluss.rpc.messages.CreateTableRequest;
4952
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
5053
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
54+
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
5155
import org.apache.fluss.rpc.messages.DropAclsRequest;
5256
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
5357
import org.apache.fluss.rpc.messages.DropTableRequest;
@@ -90,6 +94,7 @@
9094
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9195
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
9296
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
97+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
9398
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
9499
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
95100
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
@@ -487,6 +492,49 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
487492
return result;
488493
}
489494

495+
@Override
496+
public CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs() {
497+
CompletableFuture<Collection<ConfigEntry>> future = new CompletableFuture<>();
498+
DescribeClusterConfigsRequest request = new DescribeClusterConfigsRequest();
499+
gateway.describeClusterConfigs(request)
500+
.whenComplete(
501+
(r, t) -> {
502+
if (t != null) {
503+
future.completeExceptionally(t);
504+
} else {
505+
future.complete(toConfigEntries(r.getConfigsList()));
506+
}
507+
});
508+
return future;
509+
}
510+
511+
@Override
512+
public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs) {
513+
CompletableFuture<Void> future = new CompletableFuture<>();
514+
515+
AlterClusterConfigsRequest request = new AlterClusterConfigsRequest();
516+
for (AlterConfig alterConfig : configs) {
517+
PbAlterConfig pBAlterConfig =
518+
request.addAlterConfig()
519+
.setConfigKey(alterConfig.key())
520+
.setOpType(alterConfig.opType().value);
521+
if (alterConfig.value() != null) {
522+
pBAlterConfig.setConfigValue(alterConfig.value());
523+
}
524+
}
525+
gateway.alterClusterConfigs(request)
526+
.whenComplete(
527+
(r, t) -> {
528+
if (t != null) {
529+
future.completeExceptionally(t);
530+
} else {
531+
future.complete(null);
532+
}
533+
});
534+
535+
return future;
536+
}
537+
490538
@Override
491539
public void close() {
492540
// nothing to do yet

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.config.cluster.AlterConfigOpType;
29+
import org.apache.fluss.config.cluster.ConfigEntry;
2830
import org.apache.fluss.fs.FsPath;
2931
import org.apache.fluss.fs.FsPathAndFileName;
3032
import org.apache.fluss.fs.token.ObtainedSecurityToken;
31-
import org.apache.fluss.metadata.AlterConfigOpType;
3233
import org.apache.fluss.metadata.PartitionInfo;
3334
import org.apache.fluss.metadata.PartitionSpec;
3435
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -46,6 +47,7 @@
4647
import org.apache.fluss.rpc.messages.LookupRequest;
4748
import org.apache.fluss.rpc.messages.MetadataRequest;
4849
import org.apache.fluss.rpc.messages.PbAlterConfig;
50+
import org.apache.fluss.rpc.messages.PbDescribeConfig;
4951
import org.apache.fluss.rpc.messages.PbKeyValue;
5052
import org.apache.fluss.rpc.messages.PbKvSnapshot;
5153
import org.apache.fluss.rpc.messages.PbLakeSnapshotForBucket;
@@ -369,4 +371,18 @@ public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {
369371
}
370372
return info;
371373
}
374+
375+
public static List<ConfigEntry> toConfigEntries(List<PbDescribeConfig> pbDescribeConfigs) {
376+
return pbDescribeConfigs.stream()
377+
.map(
378+
pbDescribeConfig ->
379+
new ConfigEntry(
380+
pbDescribeConfig.getConfigKey(),
381+
pbDescribeConfig.hasConfigValue()
382+
? pbDescribeConfig.getConfigValue()
383+
: null,
384+
ConfigEntry.ConfigSource.valueOf(
385+
pbDescribeConfig.getConfigSource())))
386+
.collect(Collectors.toList());
387+
}
372388
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.fluss.config.AutoPartitionTimeUnit;
2828
import org.apache.fluss.config.ConfigOptions;
2929
import org.apache.fluss.config.Configuration;
30+
import org.apache.fluss.config.cluster.AlterConfig;
31+
import org.apache.fluss.config.cluster.AlterConfigOpType;
32+
import org.apache.fluss.config.cluster.ConfigEntry;
3033
import org.apache.fluss.exception.DatabaseAlreadyExistException;
3134
import org.apache.fluss.exception.DatabaseNotEmptyException;
3235
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -45,7 +48,6 @@
4548
import org.apache.fluss.exception.TooManyPartitionsException;
4649
import org.apache.fluss.fs.FsPath;
4750
import org.apache.fluss.fs.FsPathAndFileName;
48-
import org.apache.fluss.metadata.DataLakeFormat;
4951
import org.apache.fluss.metadata.DatabaseDescriptor;
5052
import org.apache.fluss.metadata.DatabaseInfo;
5153
import org.apache.fluss.metadata.KvFormat;
@@ -66,10 +68,13 @@
6668
import org.junit.jupiter.api.BeforeEach;
6769
import org.junit.jupiter.api.Test;
6870

71+
import javax.annotation.Nullable;
72+
6973
import java.time.Duration;
7074
import java.time.LocalDate;
7175
import java.util.ArrayList;
7276
import java.util.Arrays;
77+
import java.util.Collection;
7378
import java.util.Collections;
7479
import java.util.HashMap;
7580
import java.util.List;
@@ -79,6 +84,8 @@
7984
import java.util.stream.Collectors;
8085
import java.util.stream.Stream;
8186

87+
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
88+
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
8289
import static org.apache.fluss.testutils.DataTestUtils.row;
8390
import static org.assertj.core.api.Assertions.assertThat;
8491
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -166,7 +173,7 @@ void testGetTableInfoAndSchema() throws Exception {
166173
.isEqualTo(
167174
DEFAULT_TABLE_DESCRIPTOR
168175
.withReplicationFactor(3)
169-
.withDataLakeFormat(DataLakeFormat.PAIMON));
176+
.withDataLakeFormat(PAIMON));
170177
assertThat(schemaInfo2).isEqualTo(schemaInfo);
171178
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
172179
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
@@ -191,7 +198,7 @@ void testGetTableInfoAndSchema() throws Exception {
191198
.isEqualTo(
192199
DEFAULT_TABLE_DESCRIPTOR
193200
.withReplicationFactor(3)
194-
.withDataLakeFormat(DataLakeFormat.PAIMON));
201+
.withDataLakeFormat(PAIMON));
195202
assertThat(schemaInfo2).isEqualTo(schemaInfo);
196203
// assert created time
197204
assertThat(tableInfo.getCreatedTime())
@@ -466,7 +473,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
466473
.isEqualTo(
467474
DEFAULT_TABLE_DESCRIPTOR
468475
.withReplicationFactor(3)
469-
.withDataLakeFormat(DataLakeFormat.PAIMON));
476+
.withDataLakeFormat(PAIMON));
470477
}
471478
}
472479

@@ -939,6 +946,55 @@ tablePath, newPartitionSpec("age", "11"), false)
939946
.isInstanceOf(TooManyPartitionsException.class);
940947
}
941948

949+
@Test
950+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
951+
assertThat(
952+
FLUSS_CLUSTER_EXTENSION
953+
.getCoordinatorServer()
954+
.getCoordinatorService()
955+
.getDataLakeFormat())
956+
.isEqualTo(PAIMON);
957+
958+
admin.alterClusterConfigs(
959+
Collections.singletonList(
960+
new AlterConfig(
961+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
962+
.get();
963+
assertThat(
964+
FLUSS_CLUSTER_EXTENSION
965+
.getCoordinatorServer()
966+
.getCoordinatorService()
967+
.getDataLakeFormat())
968+
.isNull();
969+
assertConfigEntry(
970+
DATALAKE_FORMAT.key(), null, ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
971+
972+
// Delete dynamic configs to use the initial value(from server.yaml)
973+
admin.alterClusterConfigs(
974+
Collections.singletonList(
975+
new AlterConfig(
976+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.DELETE)))
977+
.get();
978+
assertThat(
979+
FLUSS_CLUSTER_EXTENSION
980+
.getCoordinatorServer()
981+
.getCoordinatorService()
982+
.getDataLakeFormat())
983+
.isEqualTo(PAIMON);
984+
assertConfigEntry(
985+
DATALAKE_FORMAT.key(), "paimon", ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
986+
}
987+
988+
private void assertConfigEntry(
989+
String key, @Nullable String value, ConfigEntry.ConfigSource source)
990+
throws ExecutionException, InterruptedException {
991+
Collection<ConfigEntry> configEntries = admin.describeClusterConfigs().get();
992+
List<String> configKeys =
993+
configEntries.stream().map(ConfigEntry::key).collect(Collectors.toList());
994+
assertThat(configKeys).doesNotHaveDuplicates();
995+
assertThat(configEntries).contains(new ConfigEntry(key, value, source));
996+
}
997+
942998
private void assertNoBucketSnapshot(KvSnapshots snapshots, int expectBucketNum) {
943999
assertThat(snapshots.getBucketIds()).hasSize(expectBucketNum);
9441000
for (int i = 0; i < expectBucketNum; i++) {

fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import org.apache.fluss.config.ConfigOptions;
2929
import org.apache.fluss.config.Configuration;
3030
import org.apache.fluss.config.MemorySize;
31+
import org.apache.fluss.config.cluster.AlterConfig;
32+
import org.apache.fluss.config.cluster.AlterConfigOpType;
33+
import org.apache.fluss.config.cluster.ConfigEntry;
3134
import org.apache.fluss.exception.AuthorizationException;
3235
import org.apache.fluss.metadata.DataLakeFormat;
3336
import org.apache.fluss.metadata.DatabaseDescriptor;
@@ -69,6 +72,7 @@
6972
import java.util.List;
7073
import java.util.concurrent.ExecutionException;
7174

75+
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
7276
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
7377
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
7478
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
@@ -653,6 +657,81 @@ void testProduceAndConsumer() throws Exception {
653657
}
654658
}
655659

660+
@Test
661+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
662+
assertThatThrownBy(() -> guestAdmin.describeClusterConfigs().get())
663+
.rootCause()
664+
.hasMessageContaining(
665+
String.format(
666+
"Principal %s have no authorization to operate DESCRIBE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
667+
guestPrincipal));
668+
rootAdmin
669+
.createAcls(
670+
Collections.singletonList(
671+
new AclBinding(
672+
Resource.cluster(),
673+
new AccessControlEntry(
674+
guestPrincipal,
675+
"*",
676+
OperationType.DESCRIBE,
677+
PermissionType.ALLOW))))
678+
.all()
679+
.get();
680+
Collection<ConfigEntry> configToResourceConfigs = guestAdmin.describeClusterConfigs().get();
681+
assertThat(configToResourceConfigs)
682+
.contains(
683+
new ConfigEntry(
684+
DATALAKE_FORMAT.key(),
685+
"paimon",
686+
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
687+
688+
assertThatThrownBy(
689+
() ->
690+
guestAdmin
691+
.alterClusterConfigs(
692+
Collections.singletonList(
693+
new AlterConfig(
694+
DATALAKE_FORMAT.key(),
695+
null,
696+
AlterConfigOpType.SET)))
697+
.get())
698+
.rootCause()
699+
.hasMessageContaining(
700+
String.format(
701+
"Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}",
702+
guestPrincipal));
703+
704+
rootAdmin
705+
.createAcls(
706+
Collections.singletonList(
707+
new AclBinding(
708+
Resource.cluster(),
709+
new AccessControlEntry(
710+
guestPrincipal,
711+
"*",
712+
OperationType.ALTER,
713+
PermissionType.ALLOW))))
714+
.all()
715+
.get();
716+
guestAdmin
717+
.alterClusterConfigs(
718+
Collections.singletonList(
719+
new AlterConfig(
720+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
721+
.get();
722+
assertThat(guestAdmin.describeClusterConfigs().get())
723+
.contains(
724+
new ConfigEntry(
725+
DATALAKE_FORMAT.key(),
726+
null,
727+
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG))
728+
.doesNotContain(
729+
new ConfigEntry(
730+
DATALAKE_FORMAT.key(),
731+
"paimon",
732+
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
733+
}
734+
656735
private static Configuration initConfig() {
657736
Configuration conf = new Configuration();
658737
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
@@ -661,7 +740,7 @@ private static Configuration initConfig() {
661740
// set a shorter max lag time to make tests in FlussFailServerTableITCase faster
662741
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(10));
663742
// set default datalake format for the cluster and enable datalake tables
664-
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
743+
conf.set(DATALAKE_FORMAT, DataLakeFormat.PAIMON);
665744

666745
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
667746
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));

0 commit comments

Comments
 (0)