Skip to content

Commit f534087

Browse files
committed
POC: add dynamic config to enable lakehouse.
1 parent 1b516ad commit f534087

File tree

52 files changed

+2240
-72
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2240
-72
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2424
import com.alibaba.fluss.cluster.ServerNode;
2525
import com.alibaba.fluss.config.ConfigOptions;
26+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
27+
import com.alibaba.fluss.config.dynamic.ConfigResource;
2628
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2729
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2830
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -55,6 +57,7 @@
5557

5658
import java.util.Collection;
5759
import java.util.List;
60+
import java.util.Map;
5861
import java.util.concurrent.CompletableFuture;
5962

6063
/**
@@ -450,4 +453,9 @@ ListOffsetsResult listOffsets(
450453
* @return A CompletableFuture indicating completion of the operation.
451454
*/
452455
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
456+
457+
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources);
458+
459+
AlterConfigsResult incrementalAlterConfigs(
460+
Map<ConfigResource, Collection<AlterConfigOp>> configs);
453461
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.config.dynamic.ConfigResource;
21+
import com.alibaba.fluss.rpc.messages.PbCreateAclRespInfo;
22+
import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
23+
import com.alibaba.fluss.rpc.protocol.Errors;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.stream.Collectors;
29+
30+
/** The result of an alter configs operation. */
31+
public class AlterConfigsResult {
32+
private final Map<ConfigResource, CompletableFuture<Void>> futures;
33+
34+
AlterConfigsResult(Map<ConfigResource, CompletableFuture<Void>> futures) {
35+
this.futures = futures;
36+
}
37+
38+
/**
39+
* Return a map from resources to futures which can be used to check the status of the operation
40+
* on each resource.
41+
*/
42+
public Map<ConfigResource, CompletableFuture<Void>> values() {
43+
return futures;
44+
}
45+
46+
/** Return a future which succeeds only if all the alter configs operations succeed. */
47+
public CompletableFuture<Void> all() {
48+
return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
49+
}
50+
51+
public void completeExceptionally(Throwable t) {
52+
futures.values().forEach(future -> future.completeExceptionally(t));
53+
}
54+
55+
/**
56+
* Completes individual futures based on RPC response information.
57+
*
58+
* <p>For each {@link PbCreateAclRespInfo} in the collection, Completes the future with success
59+
* or failure based on the response's error code.
60+
*/
61+
public void complete(List<PbDescribeConfigsResponseInfo> requestInfos) {
62+
Map<ConfigResource, PbDescribeConfigsResponseInfo> configResMap =
63+
requestInfos.stream()
64+
.collect(
65+
Collectors.toMap(
66+
p ->
67+
new ConfigResource(
68+
ConfigResource.Type.forId(
69+
(byte) p.getResourceType()),
70+
p.getResourceName()),
71+
p -> p));
72+
73+
for (ConfigResource config : futures.keySet()) {
74+
CompletableFuture<Void> future = futures.get(config);
75+
76+
PbDescribeConfigsResponseInfo responseInfo = configResMap.get(config);
77+
if (responseInfo.hasErrorCode()) {
78+
future.completeExceptionally(
79+
Errors.forCode(responseInfo.getErrorCode())
80+
.exception(responseInfo.getErrorMessage()));
81+
continue;
82+
}
83+
84+
future.complete(null);
85+
}
86+
}
87+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.config.dynamic.Config;
21+
import com.alibaba.fluss.config.dynamic.ConfigEntry;
22+
import com.alibaba.fluss.config.dynamic.ConfigResource;
23+
import com.alibaba.fluss.exception.UnknownServerException;
24+
import com.alibaba.fluss.rpc.messages.PbCreateAclRespInfo;
25+
import com.alibaba.fluss.rpc.messages.PbDescribeConfig;
26+
import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
27+
import com.alibaba.fluss.rpc.protocol.Errors;
28+
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ExecutionException;
35+
import java.util.stream.Collectors;
36+
37+
/** DescribeConfigsResult is the result of a describeConfigs call. */
38+
public class DescribeConfigsResult {
39+
private final Map<ConfigResource, CompletableFuture<Config>> futures;
40+
41+
DescribeConfigsResult(Map<ConfigResource, CompletableFuture<Config>> futures) {
42+
this.futures = futures;
43+
}
44+
45+
/**
46+
* Return a map from resources to futures which can be used to check the status of the
47+
* configuration for each resource.
48+
*/
49+
public Map<ConfigResource, CompletableFuture<Config>> values() {
50+
return futures;
51+
}
52+
53+
/** Return a future which succeeds only if all the config descriptions succeed. */
54+
public CompletableFuture<Map<ConfigResource, Config>> all() {
55+
return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
56+
.thenApply(
57+
(v) -> {
58+
Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
59+
for (Map.Entry<ConfigResource, CompletableFuture<Config>> entry :
60+
futures.entrySet()) {
61+
try {
62+
configs.put(entry.getKey(), entry.getValue().get());
63+
} catch (InterruptedException | ExecutionException e) {
64+
// This should be unreachable, because allOf ensured that all
65+
// the futures
66+
// completed successfully.
67+
throw new RuntimeException(e);
68+
}
69+
}
70+
return configs;
71+
});
72+
}
73+
74+
public void completeExceptionally(Throwable t) {
75+
futures.values().forEach(future -> future.completeExceptionally(t));
76+
}
77+
78+
/**
79+
* Completes individual futures based on RPC response information.
80+
*
81+
* <p>For each {@link PbCreateAclRespInfo} in the collection, Completes the future with success
82+
* or failure based on the response's error code.
83+
*/
84+
public void complete(List<PbDescribeConfigsResponseInfo> pbDescribeConfigsInfos) {
85+
Map<ConfigResource, PbDescribeConfigsResponseInfo> configResMap =
86+
pbDescribeConfigsInfos.stream()
87+
.collect(
88+
Collectors.toMap(
89+
p ->
90+
new ConfigResource(
91+
ConfigResource.Type.forId(
92+
(byte) p.getResourceType()),
93+
p.getResourceName()),
94+
p -> p));
95+
for (ConfigResource config : futures.keySet()) {
96+
CompletableFuture<Config> future = futures.get(config);
97+
if (!configResMap.containsKey(config)) {
98+
future.completeExceptionally(
99+
new UnknownServerException(
100+
"Malformed broker response: missing config for" + config));
101+
continue;
102+
}
103+
104+
PbDescribeConfigsResponseInfo responseInfo = configResMap.get(config);
105+
if (configResMap.get(config).hasErrorCode()) {
106+
future.completeExceptionally(
107+
Errors.forCode(responseInfo.getErrorCode())
108+
.exception(responseInfo.getErrorMessage()));
109+
continue;
110+
}
111+
112+
ArrayList<ConfigEntry> configEntries = new ArrayList<>();
113+
for (PbDescribeConfig pbDescribeConfig : responseInfo.getConfigsList()) {
114+
configEntries.add(
115+
new ConfigEntry(
116+
pbDescribeConfig.getConfigName(),
117+
pbDescribeConfig.hasConfigValue()
118+
? pbDescribeConfig.getConfigValue()
119+
: null,
120+
// todo: 看看是否用code
121+
ConfigEntry.ConfigSource.valueOf(
122+
pbDescribeConfig.getConfigSource())));
123+
}
124+
125+
future.complete(new Config(configEntries));
126+
}
127+
}
128+
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2525
import com.alibaba.fluss.cluster.Cluster;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
28+
import com.alibaba.fluss.config.dynamic.Config;
29+
import com.alibaba.fluss.config.dynamic.ConfigResource;
2730
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2831
import com.alibaba.fluss.metadata.DatabaseInfo;
2932
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -39,11 +42,13 @@
3942
import com.alibaba.fluss.rpc.RpcClient;
4043
import com.alibaba.fluss.rpc.gateway.AdminGateway;
4144
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
45+
import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
4246
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
4347
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
4448
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
4549
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
4650
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
51+
import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
4752
import com.alibaba.fluss.rpc.messages.DropAclsRequest;
4853
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
4954
import com.alibaba.fluss.rpc.messages.DropTableRequest;
@@ -60,6 +65,9 @@
6065
import com.alibaba.fluss.rpc.messages.ListPartitionInfosRequest;
6166
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
6267
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
68+
import com.alibaba.fluss.rpc.messages.PbAlterConfigs;
69+
import com.alibaba.fluss.rpc.messages.PbAlterConfigsRequestInfo;
70+
import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
6371
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
6472
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
6573
import com.alibaba.fluss.rpc.messages.PbTablePath;
@@ -451,6 +459,76 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
451459
return result;
452460
}
453461

462+
@Override
463+
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources) {
464+
final Map<ConfigResource, CompletableFuture<Config>> requestFutures =
465+
new HashMap<>(configResources.size());
466+
DescribeConfigsResult result = new DescribeConfigsResult(requestFutures);
467+
468+
DescribeConfigsRequest request = new DescribeConfigsRequest();
469+
for (ConfigResource configResource : configResources) {
470+
requestFutures.put(configResource, new CompletableFuture<>());
471+
request.addInfo()
472+
.setResourceName(configResource.name())
473+
.setResourceType(configResource.type().id());
474+
}
475+
gateway.describeConfigs(request)
476+
.whenComplete(
477+
(r, t) -> {
478+
if (t != null) {
479+
result.completeExceptionally(t);
480+
}
481+
482+
List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList();
483+
result.complete(responseInfos);
484+
});
485+
return result;
486+
}
487+
488+
@Override
489+
public AlterConfigsResult incrementalAlterConfigs(
490+
Map<ConfigResource, Collection<AlterConfigOp>> configs) {
491+
final Map<ConfigResource, CompletableFuture<Void>> requestFutures =
492+
new HashMap<>(configs.size());
493+
AlterConfigsResult result = new AlterConfigsResult(requestFutures);
494+
495+
AlterConfigsRequest request = new AlterConfigsRequest();
496+
for (Map.Entry<ConfigResource, Collection<AlterConfigOp>> entry : configs.entrySet()) {
497+
ConfigResource configResource = entry.getKey();
498+
Collection<AlterConfigOp> alterConfigOps = entry.getValue();
499+
500+
requestFutures.put(configResource, new CompletableFuture<>());
501+
PbAlterConfigsRequestInfo pbAlterConfigsRequestInfo =
502+
request.addInfo()
503+
.setResourceName(configResource.name())
504+
.setResourceType(configResource.type().id());
505+
for (AlterConfigOp alterConfigOp : alterConfigOps) {
506+
PbAlterConfigs pbAlterConfigs =
507+
pbAlterConfigsRequestInfo
508+
.addConfig()
509+
.setConfigName(alterConfigOp.configEntry().name())
510+
// todo: 确定config source如何传递
511+
.setConfigSource(alterConfigOp.configEntry().source().name())
512+
.setConfigOperation(alterConfigOp.opType().id());
513+
if (alterConfigOp.configEntry().value() != null) {
514+
pbAlterConfigs.setConfigValue(alterConfigOp.configEntry().value());
515+
}
516+
}
517+
}
518+
gateway.alterConfigs(request)
519+
.whenComplete(
520+
(r, t) -> {
521+
if (t != null) {
522+
result.completeExceptionally(t);
523+
}
524+
525+
List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList();
526+
result.complete(responseInfos);
527+
});
528+
529+
return result;
530+
}
531+
454532
@Override
455533
public void close() {
456534
// nothing to do yet

0 commit comments

Comments
 (0)