Skip to content

Commit 321d91a

Browse files
luoyuxiawuchong
authored andcommitted
[lake] Introduce tiering assignment manager to CoordinatorServer (apache#780)
--------- Co-authored-by: Jark Wu <[email protected]>
1 parent 3b99021 commit 321d91a

File tree

19 files changed

+1025
-23
lines changed

19 files changed

+1025
-23
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,15 @@ public class ConfigOptions {
11901190
+ "The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. "
11911191
+ "If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster.");
11921192

1193+
public static final ConfigOption<Duration> TABLE_DATALAKE_FRESHNESS =
1194+
key("table.datalake.freshness")
1195+
.durationType()
1196+
.defaultValue(Duration.ofMinutes(3))
1197+
.withDescription(
1198+
"It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. "
1199+
+ "Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. "
1200+
+ "If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs.");
1201+
11931202
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
11941203
key("table.merge-engine")
11951204
.enumType(MergeEngineType.class)

fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.metadata.MergeEngineType;
2525
import com.alibaba.fluss.utils.AutoPartitionStrategy;
2626

27+
import java.time.Duration;
2728
import java.util.Optional;
2829

2930
/**
@@ -84,6 +85,14 @@ public Optional<DataLakeFormat> getDataLakeFormat() {
8485
return config.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT);
8586
}
8687

88+
/**
89+
* Gets the data lake freshness of the table. It defines the maximum amount of time that the
90+
* datalake table's content should lag behind updates to the Fluss table.
91+
*/
92+
public Duration getDataLakeFreshness() {
93+
return config.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
94+
}
95+
8796
/** Gets the optional merge engine type of the table. */
8897
public Optional<MergeEngineType> getMergeEngineType() {
8998
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
/** Exception thrown when the tiering epoch is invalid. */
20+
public class FencedTieringEpochException extends ApiException {
21+
22+
public FencedTieringEpochException(String message) {
23+
super(message);
24+
}
25+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alibaba.fluss.exception.DatabaseNotExistException;
2727
import com.alibaba.fluss.exception.DuplicateSequenceException;
2828
import com.alibaba.fluss.exception.FencedLeaderEpochException;
29+
import com.alibaba.fluss.exception.FencedTieringEpochException;
2930
import com.alibaba.fluss.exception.InvalidColumnProjectionException;
3031
import com.alibaba.fluss.exception.InvalidConfigException;
3132
import com.alibaba.fluss.exception.InvalidCoordinatorException;
@@ -199,7 +200,9 @@ public enum Errors {
199200
SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new),
200201
AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new),
201202
BUCKET_MAX_NUM_EXCEPTION(
202-
49, "Exceed the maximum number of buckets", TooManyBucketsException::new);
203+
49, "Exceed the maximum number of buckets", TooManyBucketsException::new),
204+
FENCED_TIERING_EPOCH_EXCEPTION(
205+
50, "The tiering epoch is invalid.", FencedTieringEpochException::new);
203206

204207
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
205208

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
124124
private final MetadataManager metadataManager;
125125
private final TableManager tableManager;
126126
private final AutoPartitionManager autoPartitionManager;
127+
private final LakeTableTieringManager lakeTableTieringManager;
127128
private final TableChangeWatcher tableChangeWatcher;
128129
private final CoordinatorChannelManager coordinatorChannelManager;
129130
private final TabletServerChangeWatcher tabletServerChangeWatcher;
@@ -151,6 +152,7 @@ public CoordinatorEventProcessor(
151152
ServerMetadataCache serverMetadataCache,
152153
CoordinatorChannelManager coordinatorChannelManager,
153154
AutoPartitionManager autoPartitionManager,
155+
LakeTableTieringManager lakeTableTieringManager,
154156
CoordinatorMetricGroup coordinatorMetricGroup,
155157
Configuration conf,
156158
ExecutorService ioExecutor) {
@@ -160,6 +162,7 @@ public CoordinatorEventProcessor(
160162
coordinatorChannelManager,
161163
new CoordinatorContext(),
162164
autoPartitionManager,
165+
lakeTableTieringManager,
163166
coordinatorMetricGroup,
164167
conf,
165168
ioExecutor);
@@ -171,6 +174,7 @@ public CoordinatorEventProcessor(
171174
CoordinatorChannelManager coordinatorChannelManager,
172175
CoordinatorContext coordinatorContext,
173176
AutoPartitionManager autoPartitionManager,
177+
LakeTableTieringManager lakeTableTieringManager,
174178
CoordinatorMetricGroup coordinatorMetricGroup,
175179
Configuration conf,
176180
ExecutorService ioExecutor) {
@@ -211,6 +215,7 @@ public CoordinatorEventProcessor(
211215
ioExecutor,
212216
zooKeeperClient);
213217
this.autoPartitionManager = autoPartitionManager;
218+
this.lakeTableTieringManager = lakeTableTieringManager;
214219
this.coordinatorMetricGroup = coordinatorMetricGroup;
215220
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
216221
registerMetrics();
@@ -326,13 +331,18 @@ private void initCoordinatorContext() throws Exception {
326331

327332
// load all tables
328333
List<TableInfo> autoPartitionTables = new ArrayList<>();
334+
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
329335
for (String database : metadataManager.listDatabases()) {
330336
for (String tableName : metadataManager.listTables(database)) {
331337
TablePath tablePath = TablePath.of(database, tableName);
332338
TableInfo tableInfo = metadataManager.getTable(tablePath);
333339
coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
334340
coordinatorContext.putTableInfo(tableInfo);
335-
341+
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
342+
// always set to current time,
343+
// todo: should get from the last lake snapshot
344+
lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
345+
}
336346
if (tableInfo.isPartitioned()) {
337347
Map<String, Long> partitions =
338348
zooKeeperClient.getPartitionNameAndIds(tablePath);
@@ -351,6 +361,7 @@ private void initCoordinatorContext() throws Exception {
351361
}
352362
}
353363
autoPartitionManager.initAutoPartitionTables(autoPartitionTables);
364+
lakeTableTieringManager.initWithLakeTables(lakeTables);
354365

355366
// load all assignment
356367
loadTableAssignment();
@@ -550,6 +561,9 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
550561
if (createTableEvent.isAutoPartitionTable()) {
551562
autoPartitionManager.addAutoPartitionTable(tableInfo, true);
552563
}
564+
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
565+
lakeTableTieringManager.addNewLakeTable(tableInfo);
566+
}
553567
}
554568

555569
private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
@@ -585,6 +599,9 @@ private void processDropTable(DropTableEvent dropTableEvent) {
585599
if (dropTableEvent.isAutoPartitionTable()) {
586600
autoPartitionManager.removeAutoPartitionTable(dropTableEvent.getTableId());
587601
}
602+
if (dropTableEvent.isDataLakeEnabled()) {
603+
lakeTableTieringManager.removeLakeTable(dropTableEvent.getTableId());
604+
}
588605
}
589606

590607
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ public class CoordinatorServer extends ServerBase {
127127
@GuardedBy("lock")
128128
private AutoPartitionManager autoPartitionManager;
129129

130+
@GuardedBy("lock")
131+
private LakeTableTieringManager lakeTableTieringManager;
132+
130133
@GuardedBy("lock")
131134
private ExecutorService ioExecutor;
132135

@@ -172,6 +175,8 @@ protected void startServices() throws Exception {
172175
authorizer.startup();
173176
}
174177

178+
this.lakeTableTieringManager = new LakeTableTieringManager();
179+
175180
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
176181
this.coordinatorService =
177182
new CoordinatorService(
@@ -221,6 +226,7 @@ protected void startServices() throws Exception {
221226
metadataCache,
222227
coordinatorChannelManager,
223228
autoPartitionManager,
229+
lakeTableTieringManager,
224230
serverMetricGroup,
225231
conf,
226232
ioExecutor);
@@ -366,6 +372,14 @@ CompletableFuture<Void> stopServices() {
366372
exception = ExceptionUtils.firstOrSuppressed(t, exception);
367373
}
368374

375+
try {
376+
if (lakeTableTieringManager != null) {
377+
lakeTableTieringManager.close();
378+
}
379+
} catch (Throwable t) {
380+
exception = ExceptionUtils.firstOrSuppressed(t, exception);
381+
}
382+
369383
try {
370384
if (zkClient != null) {
371385
zkClient.close();

0 commit comments

Comments
 (0)