Skip to content

Commit 1c358ef

Browse files
authored
[lake] pass table custom table property to lake writer (#1521)
1 parent ac1569c commit 1c358ef

File tree

4 files changed

+28
-2
lines changed

4 files changed

+28
-2
lines changed

fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import java.util.Map;
28+
2729
/**
2830
* The WriterInitContext interface provides the context needed to create a LakeWriter. It includes
2931
* methods to obtain the table path, table bucket, and an optional partition.
@@ -61,4 +63,11 @@ public interface WriterInitContext {
6163
* @return the table schema
6264
*/
6365
Schema schema();
66+
67+
/**
68+
* Returns the table custom properties.
69+
*
70+
* @return the table custom properties
71+
*/
72+
Map<String, String> customProperties();
6473
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
309309
currentTablePath,
310310
bucket,
311311
partitionName,
312-
currentTable.getTableInfo().getSchema()));
312+
currentTable.getTableInfo().getSchema(),
313+
currentTable.getTableInfo().getCustomProperties().toMap()));
313314
lakeWriters.put(bucket, lakeWriter);
314315
}
315316
return lakeWriter;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,28 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import java.util.Map;
28+
2729
/** The implementation of {@link WriterInitContext}. */
2830
public class TieringWriterInitContext implements WriterInitContext {
2931

3032
private final TablePath tablePath;
3133
private final TableBucket tableBucket;
3234
private final Schema schema;
3335
@Nullable private final String partition;
36+
private final Map<String, String> customProperties;
3437

3538
public TieringWriterInitContext(
3639
TablePath tablePath,
3740
TableBucket tableBucket,
3841
@Nullable String partition,
39-
Schema schema) {
42+
Schema schema,
43+
Map<String, String> customProperties) {
4044
this.tablePath = tablePath;
4145
this.tableBucket = tableBucket;
4246
this.partition = partition;
4347
this.schema = schema;
48+
this.customProperties = customProperties;
4449
}
4550

4651
@Override
@@ -63,4 +68,9 @@ public String partition() {
6368
public Schema schema() {
6469
return schema;
6570
}
71+
72+
@Override
73+
public Map<String, String> customProperties() {
74+
return customProperties;
75+
}
6676
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,12 @@ public String partition() {
716716
return partition;
717717
}
718718

719+
@Override
720+
public Map<String, String> customProperties() {
721+
// don't care about table custom properties for Paimon lake writer
722+
return new HashMap<>();
723+
}
724+
719725
@Override
720726
public com.alibaba.fluss.metadata.Schema schema() {
721727
throw new UnsupportedOperationException(

0 commit comments

Comments
 (0)