Skip to content

Commit 533fa9d

Browse files
committed
pass custom table property to lake writer
1 parent 7f70265 commit 533fa9d

File tree

4 files changed

+28
-2
lines changed

4 files changed

+28
-2
lines changed

fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import java.util.Map;
28+
2729
/**
2830
* The WriterInitContext interface provides the context needed to create a LakeWriter. It includes
2931
* methods to obtain the table path, table bucket, and an optional partition.
@@ -61,4 +63,11 @@ public interface WriterInitContext {
6163
* @return the table schema
6264
*/
6365
Schema schema();
66+
67+
/**
68+
* Returns the table custom properties.
69+
*
70+
* @return the table custom properties
71+
*/
72+
Map<String, String> properties();
6473
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
305305
currentTablePath,
306306
bucket,
307307
partitionName,
308-
currentTable.getTableInfo().getSchema()));
308+
currentTable.getTableInfo().getSchema(),
309+
currentTable.getTableInfo().getCustomProperties().toMap()));
309310
lakeWriters.put(bucket, lakeWriter);
310311
}
311312
return lakeWriter;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,28 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import java.util.Map;
28+
2729
/** The implementation of {@link WriterInitContext}. */
2830
public class TieringWriterInitContext implements WriterInitContext {
2931

3032
private final TablePath tablePath;
3133
private final TableBucket tableBucket;
3234
private final Schema schema;
3335
@Nullable private final String partition;
36+
private final Map<String, String> properties;
3437

3538
public TieringWriterInitContext(
3639
TablePath tablePath,
3740
TableBucket tableBucket,
3841
@Nullable String partition,
39-
Schema schema) {
42+
Schema schema,
43+
Map<String, String> properties) {
4044
this.tablePath = tablePath;
4145
this.tableBucket = tableBucket;
4246
this.partition = partition;
4347
this.schema = schema;
48+
this.properties = properties;
4449
}
4550

4651
@Override
@@ -63,4 +68,9 @@ public String partition() {
6368
public Schema schema() {
6469
return schema;
6570
}
71+
72+
@Override
73+
public Map<String, String> properties() {
74+
return properties;
75+
}
6676
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,12 @@ public String partition() {
692692
return partition;
693693
}
694694

695+
@Override
696+
public Map<String, String> properties() {
697+
// don't care about table custom properties for Paimon lake writer
698+
return new HashMap<>();
699+
}
700+
695701
@Override
696702
public com.alibaba.fluss.metadata.Schema schema() {
697703
throw new UnsupportedOperationException(

0 commit comments

Comments
 (0)