Skip to content

Commit 4d9104b

Browse files
authored
[lake/flink] Implement read fluss and write to lake in Flink source reader
This closes #904.
1 parent 478469d commit 4d9104b

30 files changed

+1656
-19
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/BoundedSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public BoundedSplitReader(BatchScanner splitScanner, final long toSkip) {
6262

6363
/** Read next batch of data. Return null when no data is available. */
6464
@Nullable
65-
CloseableIterator<RecordAndPos> readBatch() throws IOException {
65+
public CloseableIterator<RecordAndPos> readBatch() throws IOException {
6666
// pool a RecordAndPosBatch, pool size is 1, the underlying implementation does not allow
6767
// multiple batches to be read at the same time
6868
RecordAndPosBatch recordAndPosBatch = pollRecordAndPosBatch();
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.lakehouse.writer.LakeWriter;
20+
import com.alibaba.fluss.metadata.TableBucket;
21+
import com.alibaba.fluss.metadata.TablePath;
22+
23+
import javax.annotation.Nullable;
24+
25+
import java.io.Serializable;
26+
27+
/**
28+
* This class contains the {@link WriteResult} of {@link LakeWriter}, the table path and the bucket
29+
* that the write result is for, the end log offset of tiering. It'll be passed to downstream
30+
* operators to collect all the write results of a table and do commit.
31+
*/
32+
public class TableBucketWriteResult<WriteResult> implements Serializable {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
private final TablePath tablePath;
37+
38+
private final TableBucket tableBucket;
39+
40+
@Nullable private final WriteResult writeResult;
41+
42+
// the end offset of tiering, should be the last tiered record's offset + 1
43+
private final long logEndOffset;
44+
45+
public TableBucketWriteResult(
46+
TablePath tablePath,
47+
TableBucket tableBucket,
48+
@Nullable WriteResult writeResult,
49+
long logEndOffset) {
50+
this.tablePath = tablePath;
51+
this.tableBucket = tableBucket;
52+
this.writeResult = writeResult;
53+
this.logEndOffset = logEndOffset;
54+
}
55+
56+
public TablePath tablePath() {
57+
return tablePath;
58+
}
59+
60+
public TableBucket tableBucket() {
61+
return tableBucket;
62+
}
63+
64+
@Nullable
65+
public WriteResult writeResult() {
66+
return writeResult;
67+
}
68+
69+
public long logEndOffset() {
70+
return logEndOffset;
71+
}
72+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.flink.tiering.source.state.TieringSplitState;
20+
import com.alibaba.fluss.lakehouse.committer.LakeCommitter;
21+
22+
import org.apache.flink.api.connector.source.SourceOutput;
23+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
24+
25+
/** The emitter to emit {@link TableBucketWriteResult} to downstream {@link LakeCommitter}. */
26+
public class TableBucketWriteResultEmitter<WriteResult>
27+
implements RecordEmitter<
28+
TableBucketWriteResult<WriteResult>,
29+
TableBucketWriteResult<WriteResult>,
30+
TieringSplitState> {
31+
@Override
32+
public void emitRecord(
33+
TableBucketWriteResult<WriteResult> writeResult,
34+
SourceOutput<TableBucketWriteResult<WriteResult>> sourceOutput,
35+
TieringSplitState splitState) {
36+
sourceOutput.collect(writeResult);
37+
}
38+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
import com.alibaba.fluss.metadata.TablePath;
21+
22+
import org.apache.flink.core.io.SimpleVersionedSerializer;
23+
import org.apache.flink.core.memory.DataInputDeserializer;
24+
import org.apache.flink.core.memory.DataOutputSerializer;
25+
26+
import java.io.IOException;
27+
28+
/** The serializer for {@link TableBucketWriteResult}. */
29+
public class TableBucketWriteResultSerializer<WriteResult>
30+
implements SimpleVersionedSerializer<TableBucketWriteResult<WriteResult>> {
31+
32+
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
33+
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
34+
35+
private static final int CURRENT_VERSION = 1;
36+
37+
private final com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer<WriteResult>
38+
writeResultSerializer;
39+
40+
public TableBucketWriteResultSerializer(
41+
com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer<WriteResult>
42+
writeResultSerializer) {
43+
this.writeResultSerializer = writeResultSerializer;
44+
}
45+
46+
@Override
47+
public int getVersion() {
48+
return CURRENT_VERSION;
49+
}
50+
51+
@Override
52+
public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResult)
53+
throws IOException {
54+
final DataOutputSerializer out = SERIALIZER_CACHE.get();
55+
// serialize table path
56+
TablePath tablePath = tableBucketWriteResult.tablePath();
57+
out.writeUTF(tablePath.getDatabaseName());
58+
out.writeUTF(tablePath.getTableName());
59+
60+
// serialize bucket
61+
TableBucket tableBucket = tableBucketWriteResult.tableBucket();
62+
out.writeLong(tableBucket.getTableId());
63+
// write partition
64+
if (tableBucket.getPartitionId() != null) {
65+
out.writeBoolean(true);
66+
out.writeLong(tableBucket.getPartitionId());
67+
} else {
68+
out.writeBoolean(false);
69+
}
70+
out.writeInt(tableBucket.getBucket());
71+
72+
// serialize write result
73+
WriteResult writeResult = tableBucketWriteResult.writeResult();
74+
if (writeResult == null) {
75+
// write -1 to mark write result as null
76+
out.writeInt(-1);
77+
} else {
78+
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
79+
out.writeInt(serializeBytes.length);
80+
out.write(serializeBytes);
81+
}
82+
83+
// serialize log end offset
84+
out.writeLong(tableBucketWriteResult.logEndOffset());
85+
86+
final byte[] result = out.getCopyOfBuffer();
87+
out.clear();
88+
return result;
89+
}
90+
91+
@Override
92+
public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] serialized)
93+
throws IOException {
94+
if (version != CURRENT_VERSION) {
95+
throw new IOException("Unknown version " + version);
96+
}
97+
final DataInputDeserializer in = new DataInputDeserializer(serialized);
98+
// deserialize table path
99+
String databaseName = in.readUTF();
100+
String tableName = in.readUTF();
101+
TablePath tablePath = new TablePath(databaseName, tableName);
102+
103+
// deserialize bucket
104+
long tableId = in.readLong();
105+
Long partitionId = null;
106+
if (in.readBoolean()) {
107+
partitionId = in.readLong();
108+
}
109+
int bucketId = in.readInt();
110+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
111+
112+
// deserialize write result
113+
int writeResultLength = in.readInt();
114+
WriteResult writeResult;
115+
if (writeResultLength >= 0) {
116+
byte[] writeResultBytes = new byte[writeResultLength];
117+
in.readFully(writeResultBytes);
118+
writeResult = writeResultSerializer.deserialize(version, writeResultBytes);
119+
} else {
120+
writeResult = null;
121+
}
122+
123+
// deserialize log end offset
124+
long logEndOffset = in.readLong();
125+
return new TableBucketWriteResult<>(tablePath, tableBucket, writeResult, logEndOffset);
126+
}
127+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer;
20+
21+
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.api.common.typeutils.TypeSerializer;
24+
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
25+
import org.apache.flink.util.function.SerializableSupplier;
26+
27+
/** A {@link TypeInformation} for {@link TableBucketWriteResult} . */
28+
public class TableBucketWriteResultTypeInfo<WriteResult>
29+
extends TypeInformation<TableBucketWriteResult<WriteResult>> {
30+
31+
private final SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
32+
writeResultSerializerFactory;
33+
34+
private TableBucketWriteResultTypeInfo(
35+
SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
36+
writeResultSerializerFactory) {
37+
this.writeResultSerializerFactory = writeResultSerializerFactory;
38+
}
39+
40+
public static <WriteResult> TypeInformation<TableBucketWriteResult<WriteResult>> of(
41+
SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
42+
writeResultSerializerFactory) {
43+
return new TableBucketWriteResultTypeInfo<>(writeResultSerializerFactory);
44+
}
45+
46+
@Override
47+
public boolean isBasicType() {
48+
return false;
49+
}
50+
51+
@Override
52+
public boolean isTupleType() {
53+
return false;
54+
}
55+
56+
@Override
57+
public int getArity() {
58+
return 1;
59+
}
60+
61+
@Override
62+
public int getTotalFields() {
63+
return 1;
64+
}
65+
66+
@SuppressWarnings({"unchecked", "rawtypes"})
67+
@Override
68+
public Class<TableBucketWriteResult<WriteResult>> getTypeClass() {
69+
return (Class) TableBucketWriteResult.class;
70+
}
71+
72+
@Override
73+
public boolean isKeyType() {
74+
return false;
75+
}
76+
77+
@Override
78+
public TypeSerializer<TableBucketWriteResult<WriteResult>> createSerializer(
79+
ExecutionConfig executionConfig) {
80+
// no copy, so that data from lake writer is directly going into lake committer while
81+
// chaining
82+
return new SimpleVersionedSerializerTypeSerializerProxy<
83+
TableBucketWriteResult<WriteResult>>(
84+
() -> new TableBucketWriteResultSerializer<>(writeResultSerializerFactory.get())) {
85+
@Override
86+
public TableBucketWriteResult<WriteResult> copy(
87+
TableBucketWriteResult<WriteResult> from) {
88+
return from;
89+
}
90+
91+
@Override
92+
public TableBucketWriteResult<WriteResult> copy(
93+
TableBucketWriteResult<WriteResult> from,
94+
TableBucketWriteResult<WriteResult> reuse) {
95+
return from;
96+
}
97+
};
98+
}
99+
100+
@Override
101+
public String toString() {
102+
return "TableBucketWriteResultTypeInfo";
103+
}
104+
105+
@Override
106+
public boolean equals(Object obj) {
107+
return obj instanceof TableBucketWriteResultTypeInfo;
108+
}
109+
110+
@Override
111+
public int hashCode() {
112+
return getClass().hashCode();
113+
}
114+
115+
@Override
116+
public boolean canEqual(Object obj) {
117+
return obj instanceof TableBucketWriteResultTypeInfo;
118+
}
119+
}

0 commit comments

Comments
 (0)