Skip to content

Commit 8dfd695

Browse files
authored
[server] Support alter "table.datalake.enabled" option (#1753)
]
1 parent 1e68a26 commit 8dfd695

File tree

7 files changed

+346
-13
lines changed

7 files changed

+346
-13
lines changed

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,23 @@ public class FlussConfigUtils {
3636
public static final String CLIENT_PREFIX = "client.";
3737
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
3838

39-
public static final List<String> ALTERABLE_TABLE_CONFIG;
39+
public static final List<String> ALTERABLE_TABLE_OPTIONS;
4040

4141
static {
4242
TABLE_OPTIONS = extractConfigOptions("table.");
4343
CLIENT_OPTIONS = extractConfigOptions("client.");
44-
ALTERABLE_TABLE_CONFIG = Collections.emptyList();
44+
ALTERABLE_TABLE_OPTIONS =
45+
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
4546
}
4647

4748
public static boolean isTableStorageConfig(String key) {
4849
return key.startsWith(TABLE_PREFIX);
4950
}
5051

52+
public static boolean isAlterableTableOption(String key) {
53+
return ALTERABLE_TABLE_OPTIONS.contains(key);
54+
}
55+
5156
@VisibleForTesting
5257
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
5358
Map<String, ConfigOption<?>> options = new HashMap<>();

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.exception.FlussRuntimeException;
2626
import org.apache.fluss.exception.InvalidTableException;
2727
import org.apache.fluss.metadata.Schema;
28+
import org.apache.fluss.metadata.TableChange;
2829
import org.apache.fluss.metadata.TableDescriptor;
2930
import org.apache.fluss.metadata.TablePath;
3031
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -48,7 +49,9 @@
4849

4950
import java.nio.file.Files;
5051
import java.util.Arrays;
52+
import java.util.Collections;
5153
import java.util.HashMap;
54+
import java.util.List;
5255
import java.util.Map;
5356
import java.util.stream.Stream;
5457

@@ -364,6 +367,70 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
364367
BUCKET_NUM);
365368
}
366369

370+
@Test
371+
void testAlterLakeEnabledLogTable() throws Exception {
372+
Map<String, String> customProperties = new HashMap<>();
373+
customProperties.put("k1", "v1");
374+
customProperties.put("paimon.file.format", "parquet");
375+
376+
// log table with lake disabled
377+
TableDescriptor logTable =
378+
TableDescriptor.builder()
379+
.schema(
380+
Schema.newBuilder()
381+
.column("log_c1", DataTypes.INT())
382+
.column("log_c2", DataTypes.STRING())
383+
.build())
384+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
385+
.customProperties(customProperties)
386+
.distributedBy(BUCKET_NUM, "log_c1", "log_c2")
387+
.build();
388+
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
389+
admin.createTable(logTablePath, logTable, false).get();
390+
391+
assertThatThrownBy(
392+
() ->
393+
paimonCatalog.getTable(
394+
Identifier.create(DATABASE, logTablePath.getTableName())))
395+
.isInstanceOf(Catalog.TableNotExistException.class);
396+
397+
// enable lake
398+
TableChange.SetOption enableLake =
399+
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
400+
List<TableChange> changes = Collections.singletonList(enableLake);
401+
402+
admin.alterTable(logTablePath, changes, false).get();
403+
404+
Table enabledPaimonLogTable =
405+
paimonCatalog.getTable(Identifier.create(DATABASE, logTablePath.getTableName()));
406+
407+
Map<String, String> updatedProperties = new HashMap<>();
408+
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
409+
TableDescriptor updatedLogTable = logTable.withProperties(updatedProperties);
410+
// check the gotten log table
411+
verifyPaimonTable(
412+
enabledPaimonLogTable,
413+
updatedLogTable,
414+
RowType.of(
415+
new DataType[] {
416+
org.apache.paimon.types.DataTypes.INT(),
417+
org.apache.paimon.types.DataTypes.STRING(),
418+
// for __bucket, __offset, __timestamp
419+
org.apache.paimon.types.DataTypes.INT(),
420+
org.apache.paimon.types.DataTypes.BIGINT(),
421+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
422+
},
423+
new String[] {
424+
"log_c1",
425+
"log_c2",
426+
BUCKET_COLUMN_NAME,
427+
OFFSET_COLUMN_NAME,
428+
TIMESTAMP_COLUMN_NAME
429+
}),
430+
"log_c1,log_c2",
431+
BUCKET_NUM);
432+
}
433+
367434
@Test
368435
void testThrowExceptionWhenConflictWithSystemColumn() {
369436
for (String systemColumn :

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,16 @@ protected long createLogTable(TablePath tablePath) throws Exception {
256256
}
257257

258258
protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception {
259-
return createLogTable(tablePath, bucketNum, false);
259+
return createLogTable(
260+
tablePath, bucketNum, false, Collections.emptyMap(), Collections.emptyMap());
260261
}
261262

262-
protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
263+
protected long createLogTable(
264+
TablePath tablePath,
265+
int bucketNum,
266+
boolean isPartitioned,
267+
Map<String, String> properties,
268+
Map<String, String> customProperties)
263269
throws Exception {
264270
Schema.Builder schemaBuilder =
265271
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
@@ -277,6 +283,8 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
277283
tableBuilder.property(
278284
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
279285
}
286+
tableBuilder.properties(properties);
287+
tableBuilder.customProperties(customProperties);
280288
tableBuilder.schema(schemaBuilder.build());
281289
return createTable(tablePath, tableBuilder.build());
282290
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
2323
import org.apache.fluss.metadata.Schema;
2424
import org.apache.fluss.metadata.TableBucket;
25+
import org.apache.fluss.metadata.TableChange;
2526
import org.apache.fluss.metadata.TableDescriptor;
2627
import org.apache.fluss.metadata.TablePath;
2728
import org.apache.fluss.row.InternalRow;
@@ -175,10 +176,7 @@ void testTiering() throws Exception {
175176
{
176177
put(
177178
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
178-
"["
179-
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
180-
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
181-
+ "]");
179+
getPartitionOffsetStr(partitionNameByIds));
182180
}
183181
};
184182
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
@@ -187,6 +185,150 @@ void testTiering() throws Exception {
187185
}
188186
}
189187

188+
@Test
189+
void testTieringForAlterTable() throws Exception {
190+
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");
191+
Map<String, String> tableProperties = new HashMap<>();
192+
tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");
193+
194+
long t1Id = createPkTable(t1, 1, tableProperties, Collections.emptyMap());
195+
196+
TableChange.SetOption setOption =
197+
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
198+
List<TableChange> changes = Collections.singletonList(setOption);
199+
admin.alterTable(t1, changes, false).get();
200+
201+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
202+
203+
// write records
204+
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
205+
writeRows(t1, rows, false);
206+
waitUntilSnapshot(t1Id, 1, 0);
207+
208+
// then start tiering job
209+
JobClient jobClient = buildTieringJob(execEnv);
210+
211+
try {
212+
// check the status of replica after synced
213+
assertReplicaStatus(t1Bucket, 3);
214+
// check data in paimon
215+
checkDataInPaimonPrimaryKeyTable(t1, rows);
216+
// check snapshot property in paimon
217+
Map<String, String> properties =
218+
new HashMap<String, String>() {
219+
{
220+
put(
221+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
222+
"[{\"bucket_id\":0,\"log_offset\":3}]");
223+
}
224+
};
225+
checkSnapshotPropertyInPaimon(t1, properties);
226+
227+
// then, create another log table
228+
TablePath t2 = TablePath.of(DEFAULT_DB, "logTableAlter");
229+
230+
Map<String, String> logTableProperties = new HashMap<>();
231+
logTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");
232+
long t2Id = createLogTable(t2, 1, false, logTableProperties, Collections.emptyMap());
233+
// enable lake
234+
admin.alterTable(t2, changes, false).get();
235+
236+
TableBucket t2Bucket = new TableBucket(t2Id, 0);
237+
List<InternalRow> flussRows = new ArrayList<>();
238+
// write records
239+
for (int i = 0; i < 10; i++) {
240+
rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
241+
flussRows.addAll(rows);
242+
// write records
243+
writeRows(t2, rows, true);
244+
}
245+
// check the status of replica after synced;
246+
// note: we can't update log start offset for unaware bucket mode log table
247+
assertReplicaStatus(t2Bucket, 30);
248+
249+
// check data in paimon
250+
checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
251+
252+
// then write data to the pk tables
253+
// write records
254+
rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
255+
// write records
256+
writeRows(t1, rows, false);
257+
258+
// check the status of replica of t2 after synced
259+
// not check start offset since we won't
260+
// update start log offset for primary key table
261+
assertReplicaStatus(t1Bucket, 9);
262+
263+
checkDataInPaimonPrimaryKeyTable(t1, rows);
264+
265+
// then create partitioned table and wait partitions are ready
266+
TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTableAlter");
267+
Map<String, String> partitionTableProperties = new HashMap<>();
268+
partitionTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");
269+
270+
Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
271+
createPartitionedTable(
272+
partitionedTablePath, partitionTableProperties, Collections.emptyMap());
273+
274+
admin.alterTable(partitionedTablePath, changes, false).get();
275+
276+
Map<Long, String> partitionNameByIds = waitUntilPartitions(partitionedTablePath);
277+
278+
// now, write rows into partitioned table
279+
TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
280+
Map<String, List<InternalRow>> writtenRowsByPartition =
281+
writeRowsIntoPartitionedTable(
282+
partitionedTablePath, partitionedTableDescriptor, partitionNameByIds);
283+
long tableId = tableIdAndDescriptor.f0;
284+
285+
// wait until synced to paimon
286+
for (Long partitionId : partitionNameByIds.keySet()) {
287+
TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
288+
assertReplicaStatus(tableBucket, 3);
289+
}
290+
291+
// now, let's check data in paimon per partition
292+
// check data in paimon
293+
String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0);
294+
for (String partitionName : partitionNameByIds.values()) {
295+
checkDataInPaimonAppendOnlyPartitionedTable(
296+
partitionedTablePath,
297+
Collections.singletonMap(partitionCol, partitionName),
298+
writtenRowsByPartition.get(partitionName),
299+
0);
300+
}
301+
302+
properties =
303+
new HashMap<String, String>() {
304+
{
305+
put(
306+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
307+
getPartitionOffsetStr(partitionNameByIds));
308+
}
309+
};
310+
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
311+
} finally {
312+
jobClient.cancel().get();
313+
}
314+
}
315+
316+
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
317+
String raw =
318+
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
319+
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
320+
Collections.sort(partitionIds);
321+
List<String> partitionOffsetStrs = new ArrayList<>();
322+
323+
for (Long partitionId : partitionIds) {
324+
String partitionName = partitionNameByIds.get(partitionId);
325+
String partitionOffsetStr = String.format(raw, partitionId, partitionName);
326+
partitionOffsetStrs.add(partitionOffsetStr);
327+
}
328+
329+
return "[" + String.join(",", partitionOffsetStrs) + "]";
330+
}
331+
190332
@Test
191333
void testTieringToDvEnabledTable() throws Exception {
192334
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
@@ -214,6 +356,15 @@ void testTieringToDvEnabledTable() throws Exception {
214356

215357
private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partitionedTablePath)
216358
throws Exception {
359+
return createPartitionedTable(
360+
partitionedTablePath, Collections.emptyMap(), Collections.emptyMap());
361+
}
362+
363+
private Tuple2<Long, TableDescriptor> createPartitionedTable(
364+
TablePath partitionedTablePath,
365+
Map<String, String> properties,
366+
Map<String, String> customProperties)
367+
throws Exception {
217368
TableDescriptor partitionedTableDescriptor =
218369
TableDescriptor.builder()
219370
.schema(
@@ -229,6 +380,8 @@ private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partition
229380
AutoPartitionTimeUnit.YEAR)
230381
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
231382
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500))
383+
.properties(properties)
384+
.customProperties(customProperties)
232385
.build();
233386
return Tuple2.of(
234387
createTable(partitionedTablePath, partitionedTableDescriptor),

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.fluss.server.coordinator.event.EventManager;
9191
import org.apache.fluss.server.entity.CommitKvSnapshotData;
9292
import org.apache.fluss.server.entity.LakeTieringTableInfo;
93+
import org.apache.fluss.server.entity.TablePropertyChanges;
9394
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
9495
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
9596
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
@@ -297,10 +298,16 @@ public CompletableFuture<AlterTablePropertiesResponse> alterTableProperties(
297298
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.table(tablePath));
298299
}
299300

301+
TablePropertyChanges tablePropertyChanges =
302+
toTablePropertyChanges(request.getConfigChangesList());
303+
300304
metadataManager.alterTableProperties(
301305
tablePath,
302-
toTablePropertyChanges(request.getConfigChangesList()),
303-
request.isIgnoreIfNotExists());
306+
tablePropertyChanges,
307+
request.isIgnoreIfNotExists(),
308+
lakeCatalog,
309+
dataLakeFormat,
310+
lakeTableTieringManager);
304311

305312
return CompletableFuture.completedFuture(new AlterTablePropertiesResponse());
306313
}

0 commit comments

Comments
 (0)