Skip to content

Commit 50b9ce9

Browse files
committed
[wip] pass TableInfo
1 parent 8385bc1 commit 50b9ce9

File tree

10 files changed

+56
-71
lines changed

10 files changed

+56
-71
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.fluss.lake.writer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
import org.apache.fluss.metadata.Schema;
2221
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TableInfo;
2323
import org.apache.fluss.metadata.TablePath;
2424

2525
import javax.annotation.Nullable;
2626

27-
import java.util.Map;
28-
2927
/**
3028
* The WriterInitContext interface provides the context needed to create a LakeWriter. It includes
3129
* methods to obtain the table path, table bucket, and an optional partition.
@@ -58,16 +56,9 @@ public interface WriterInitContext {
5856
String partition();
5957

6058
/**
61-
* Returns the table schema.
62-
*
63-
* @return the table schema
64-
*/
65-
Schema schema();
66-
67-
/**
68-
* Returns the table custom properties.
59+
* Returns the table info.
6960
*
70-
* @return the table custom properties
61+
* @return the table info
7162
*/
72-
Map<String, String> customProperties();
63+
TableInfo tableInfo();
7364
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,7 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
309309
currentTablePath,
310310
bucket,
311311
partitionName,
312-
currentTable.getTableInfo().getSchema(),
313-
currentTable.getTableInfo().getCustomProperties().toMap()));
312+
currentTable.getTableInfo()));
314313
lakeWriters.put(bucket, lakeWriter);
315314
}
316315
return lakeWriter;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,29 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.lake.writer.WriterInitContext;
21-
import org.apache.fluss.metadata.Schema;
2221
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TableInfo;
2323
import org.apache.fluss.metadata.TablePath;
2424

2525
import javax.annotation.Nullable;
2626

27-
import java.util.Map;
28-
2927
/** The implementation of {@link WriterInitContext}. */
3028
public class TieringWriterInitContext implements WriterInitContext {
3129

3230
private final TablePath tablePath;
3331
private final TableBucket tableBucket;
34-
private final Schema schema;
3532
@Nullable private final String partition;
36-
private final Map<String, String> customProperties;
33+
private final TableInfo tableInfo;
3734

3835
public TieringWriterInitContext(
3936
TablePath tablePath,
4037
TableBucket tableBucket,
4138
@Nullable String partition,
42-
Schema schema,
43-
Map<String, String> customProperties) {
39+
TableInfo tableInfo) {
4440
this.tablePath = tablePath;
4541
this.tableBucket = tableBucket;
4642
this.partition = partition;
47-
this.schema = schema;
48-
this.customProperties = customProperties;
43+
this.tableInfo = tableInfo;
4944
}
5045

5146
@Override
@@ -65,12 +60,7 @@ public String partition() {
6560
}
6661

6762
@Override
68-
public Schema schema() {
69-
return schema;
70-
}
71-
72-
@Override
73-
public Map<String, String> customProperties() {
74-
return customProperties;
63+
public TableInfo tableInfo() {
64+
return tableInfo;
7565
}
7666
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public AppendOnlyTaskWriter(
3535
super(
3636
taskWriter,
3737
icebergTable.schema(),
38-
writerInitContext.schema().getRowType(),
38+
writerInitContext.tableInfo().getRowType(),
3939
writerInitContext.tableBucket());
4040
}
4141

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public DeltaTaskWriter(
3535
super(
3636
taskWriter,
3737
icebergTable.schema(),
38-
writerInitContext.schema().getRowType(),
38+
writerInitContext.tableInfo().getRowType(),
3939
writerInitContext.tableBucket());
4040
}
4141

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.fluss.lake.iceberg.tiering;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.lake.committer.LakeCommitter;
2223
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
2324
import org.apache.fluss.lake.writer.LakeWriter;
2425
import org.apache.fluss.lake.writer.WriterInitContext;
2526
import org.apache.fluss.metadata.TableBucket;
27+
import org.apache.fluss.metadata.TableDescriptor;
28+
import org.apache.fluss.metadata.TableInfo;
2629
import org.apache.fluss.metadata.TablePath;
2730
import org.apache.fluss.record.ChangeType;
2831
import org.apache.fluss.record.GenericRecord;
@@ -119,6 +122,17 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
119122
isPartitionedTable ? "partitioned" : "unpartitioned"));
120123
createTable(tablePath, isPrimaryKeyTable, isPartitionedTable);
121124

125+
TableDescriptor descriptor =
126+
TableDescriptor.builder()
127+
.schema(org.apache.fluss.metadata.Schema.newBuilder()
128+
.column("c1", DataTypes.INT())
129+
.column("c2", DataTypes.STRING())
130+
.column("c3", DataTypes.STRING())
131+
.build())
132+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
133+
.build();
134+
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor,1L, 1L);
135+
122136
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
123137

124138
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>();
@@ -144,7 +158,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
144158
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
145159
String partition = entry.getValue();
146160
try (LakeWriter<IcebergWriteResult> writer =
147-
createLakeWriter(tablePath, bucket, partition, entry.getKey())) {
161+
createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) {
148162
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
149163
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
150164
isPrimaryKeyTable
@@ -198,7 +212,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
198212
}
199213

200214
private LakeWriter<IcebergWriteResult> createLakeWriter(
201-
TablePath tablePath, int bucket, @Nullable String partition, @Nullable Long partitionId)
215+
TablePath tablePath, int bucket, @Nullable String partition, @Nullable Long partitionId, TableInfo tableInfo)
202216
throws IOException {
203217
return icebergLakeTieringFactory.createLakeWriter(
204218
new WriterInitContext() {
@@ -219,17 +233,8 @@ public String partition() {
219233
}
220234

221235
@Override
222-
public Map<String, String> customProperties() {
223-
return Collections.emptyMap();
224-
}
225-
226-
@Override
227-
public org.apache.fluss.metadata.Schema schema() {
228-
return org.apache.fluss.metadata.Schema.newBuilder()
229-
.column("c1", DataTypes.INT())
230-
.column("c2", DataTypes.STRING())
231-
.column("c3", DataTypes.STRING())
232-
.build();
236+
public TableInfo tableInfo() {
237+
return tableInfo;
233238
}
234239
});
235240
}

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
4545
LanceConfig config =
4646
LanceConfig.from(
4747
options.toMap(),
48-
writerInitContext.customProperties(),
48+
writerInitContext.tableInfo().getCustomProperties().toMap(),
4949
writerInitContext.tablePath().getDatabaseName(),
5050
writerInitContext.tablePath().getTableName());
5151
int batchSize = LanceConfig.getBatchSize(config);
@@ -56,7 +56,9 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
5656

5757
this.arrowWriter =
5858
LanceDatasetAdapter.getArrowWriter(
59-
schema.get(), batchSize, writerInitContext.schema().getRowType());
59+
schema.get(),
60+
batchSize,
61+
writerInitContext.tableInfo().getRowType());
6062

6163
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
6264
Callable<List<FragmentMetadata>> fragmentCreator =

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.lance.tiering;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
2223
import org.apache.fluss.lake.committer.LakeCommitter;
@@ -28,6 +29,8 @@
2829
import org.apache.fluss.lake.writer.WriterInitContext;
2930
import org.apache.fluss.metadata.Schema;
3031
import org.apache.fluss.metadata.TableBucket;
32+
import org.apache.fluss.metadata.TableDescriptor;
33+
import org.apache.fluss.metadata.TableInfo;
3134
import org.apache.fluss.metadata.TablePath;
3235
import org.apache.fluss.record.ChangeType;
3336
import org.apache.fluss.record.GenericRecord;
@@ -96,6 +99,14 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
9699
tablePath.getTableName());
97100
Schema schema = createTable(config);
98101

102+
TableDescriptor descriptor =
103+
TableDescriptor.builder()
104+
.schema(schema)
105+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
106+
.customProperties(customProperties)
107+
.build();
108+
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor,1L, 1L);
109+
99110
List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
100111
SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
101112
lanceLakeTieringFactory.getWriteResultSerializer();
@@ -126,7 +137,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
126137
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
127138
String partition = entry.getValue();
128139
try (LakeWriter<LanceWriteResult> lakeWriter =
129-
createLakeWriter(tablePath, bucket, partition, schema, customProperties)) {
140+
createLakeWriter(tablePath, bucket, partition, tableInfo)) {
130141
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
131142
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
132143
genLogTableRecords(partition, bucket, 10);
@@ -242,8 +253,7 @@ private LakeWriter<LanceWriteResult> createLakeWriter(
242253
TablePath tablePath,
243254
int bucket,
244255
@Nullable String partition,
245-
Schema schema,
246-
Map<String, String> customProperties)
256+
TableInfo tableInfo)
247257
throws IOException {
248258
return lanceLakeTieringFactory.createLakeWriter(
249259
new WriterInitContext() {
@@ -265,13 +275,8 @@ public String partition() {
265275
}
266276

267277
@Override
268-
public org.apache.fluss.metadata.Schema schema() {
269-
return schema;
270-
}
271-
272-
@Override
273-
public Map<String, String> customProperties() {
274-
return customProperties;
278+
public TableInfo tableInfo() {
279+
return tableInfo;
275280
}
276281
});
277282
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
6262
private final Catalog paimonCatalog;
6363

6464
// for fluss config
65-
public static final String FLUSS_CONF_PREFIX = "fluss.";
65+
private static final String FLUSS_CONF_PREFIX = "fluss.";
6666
// for paimon config
6767
private static final String PAIMON_CONF_PREFIX = "paimon.";
6868

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.lake.paimon.tiering.mergetree.MergeTreeWriter;
2222
import org.apache.fluss.lake.writer.LakeWriter;
2323
import org.apache.fluss.lake.writer.WriterInitContext;
24+
import org.apache.fluss.metadata.TableInfo;
2425
import org.apache.fluss.metadata.TablePath;
2526
import org.apache.fluss.record.LogRecord;
2627

@@ -33,10 +34,8 @@
3334
import java.util.Collections;
3435
import java.util.List;
3536
import java.util.Map;
36-
import java.util.Objects;
3737

3838
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION;
39-
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.FLUSS_CONF_PREFIX;
4039
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4140

4241
/** Implementation of {@link LakeWriter} for Paimon. */
@@ -49,7 +48,8 @@ public PaimonLakeWriter(
4948
PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext)
5049
throws IOException {
5150
this.paimonCatalog = paimonCatalogProvider.get();
52-
FileStoreTable fileStoreTable = getTable(writerInitContext.tablePath());
51+
FileStoreTable fileStoreTable =
52+
getTable(writerInitContext.tablePath(), writerInitContext.tableInfo());
5353

5454
List<String> partitionKeys = fileStoreTable.partitionKeys();
5555

@@ -101,20 +101,13 @@ public void close() throws IOException {
101101
}
102102
}
103103

104-
private FileStoreTable getTable(TablePath tablePath) throws IOException {
104+
private FileStoreTable getTable(TablePath tablePath, TableInfo tableInfo) throws IOException {
105105
try {
106106
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
107107
Map<String, String> compactionOptions =
108108
Collections.singletonMap(
109109
CoreOptions.WRITE_ONLY.key(),
110-
Objects.equals(
111-
table.schema()
112-
.options()
113-
.get(
114-
FLUSS_CONF_PREFIX
115-
+ TABLE_DATALAKE_AUTO_COMPACTION
116-
.key()),
117-
"true")
110+
tableInfo.getProperties().getBoolean(TABLE_DATALAKE_AUTO_COMPACTION)
118111
? Boolean.FALSE.toString()
119112
: Boolean.TRUE.toString());
120113
return table.copy(compactionOptions);

0 commit comments

Comments
 (0)