Skip to content

Commit 32c066f

Browse files
authored
[lake] Pass Fluss schema to lake writer (#1192)
1 parent 94c8073 commit 32c066f

File tree

4 files changed

+31
-6
lines changed

4 files changed

+31
-6
lines changed

fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.lake.writer;
1919

2020
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.metadata.Schema;
2122
import com.alibaba.fluss.metadata.TableBucket;
2223
import com.alibaba.fluss.metadata.TablePath;
2324

@@ -53,4 +54,11 @@ public interface WriterInitContext {
5354
*/
5455
@Nullable
5556
String partition();
57+
58+
/**
59+
* Returns the table schema.
60+
*
61+
* @return the table schema
62+
*/
63+
Schema schema();
5664
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,11 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
301301
if (lakeWriter == null) {
302302
lakeWriter =
303303
lakeTieringFactory.createLakeWriter(
304-
new TieringWriterInitContext(currentTablePath, bucket, partitionName));
304+
new TieringWriterInitContext(
305+
currentTablePath,
306+
bucket,
307+
partitionName,
308+
currentTable.getTableInfo().getSchema()));
305309
lakeWriters.put(bucket, lakeWriter);
306310
}
307311
return lakeWriter;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.flink.tiering.source;
1919

2020
import com.alibaba.fluss.lake.writer.WriterInitContext;
21+
import com.alibaba.fluss.metadata.Schema;
2122
import com.alibaba.fluss.metadata.TableBucket;
2223
import com.alibaba.fluss.metadata.TablePath;
2324

@@ -28,17 +29,18 @@ public class TieringWriterInitContext implements WriterInitContext {
2829

2930
private final TablePath tablePath;
3031
private final TableBucket tableBucket;
32+
private final Schema schema;
3133
@Nullable private final String partition;
3234

33-
public TieringWriterInitContext(TablePath tablePath, TableBucket tableBucket) {
34-
this(tablePath, tableBucket, null);
35-
}
36-
3735
public TieringWriterInitContext(
38-
TablePath tablePath, TableBucket tableBucket, @Nullable String partition) {
36+
TablePath tablePath,
37+
TableBucket tableBucket,
38+
@Nullable String partition,
39+
Schema schema) {
3940
this.tablePath = tablePath;
4041
this.tableBucket = tableBucket;
4142
this.partition = partition;
43+
this.schema = schema;
4244
}
4345

4446
@Override
@@ -56,4 +58,9 @@ public TableBucket tableBucket() {
5658
public String partition() {
5759
return partition;
5860
}
61+
62+
@Override
63+
public Schema schema() {
64+
return schema;
65+
}
5966
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,12 @@ public TableBucket tableBucket() {
658658
public String partition() {
659659
return partition;
660660
}
661+
662+
@Override
663+
public com.alibaba.fluss.metadata.Schema schema() {
664+
throw new UnsupportedOperationException(
665+
"The lake writer in Paimon currently uses paimonCatalog to determine the schema.");
666+
}
661667
});
662668
}
663669

0 commit comments

Comments
 (0)