Skip to content

Commit d96b62b

Browse files
authored
[lake/flink] Introduce committer operator
This closes #990.
1 parent 60e0490 commit d96b62b

32 files changed

+1355
-183
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.committer;
18+
19+
import java.io.Serializable;
20+
21+
/** A class wrapping {@link Committable} to commit to lake. */
22+
public class CommittableMessage<Committable> implements Serializable {
23+
private static final long serialVersionUID = 1L;
24+
25+
private final Committable committable;
26+
27+
public CommittableMessage(Committable committable) {
28+
this.committable = committable;
29+
}
30+
31+
public Committable committable() {
32+
return committable;
33+
}
34+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.committer;
18+
19+
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
20+
import com.alibaba.fluss.lakehouse.serializer.SimpleVersionedSerializer;
21+
22+
import org.apache.flink.api.common.ExecutionConfig;
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.common.typeutils.TypeSerializer;
25+
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
26+
import org.apache.flink.util.function.SerializableSupplier;
27+
28+
import java.io.IOException;
29+
30+
/** A {@link TypeInformation} for {@link CommittableMessage}. */
31+
public class CommittableMessageTypeInfo<Committable>
32+
extends TypeInformation<CommittableMessage<Committable>> {
33+
34+
private final SerializableSupplier<SimpleVersionedSerializer<Committable>>
35+
committableSerializerFactory;
36+
37+
private CommittableMessageTypeInfo(
38+
SerializableSupplier<SimpleVersionedSerializer<Committable>>
39+
committableSerializerFactory) {
40+
this.committableSerializerFactory = committableSerializerFactory;
41+
}
42+
43+
public static <Committable> TypeInformation<CommittableMessage<Committable>> of(
44+
SerializableSupplier<SimpleVersionedSerializer<Committable>>
45+
committableSerializerFactory) {
46+
return new CommittableMessageTypeInfo<>(committableSerializerFactory);
47+
}
48+
49+
@Override
50+
public boolean isBasicType() {
51+
return false;
52+
}
53+
54+
@Override
55+
public boolean isTupleType() {
56+
return false;
57+
}
58+
59+
@Override
60+
public int getArity() {
61+
return 1;
62+
}
63+
64+
@Override
65+
public int getTotalFields() {
66+
return 1;
67+
}
68+
69+
@SuppressWarnings({"unchecked", "rawtypes"})
70+
@Override
71+
public Class<CommittableMessage<Committable>> getTypeClass() {
72+
return (Class) TableBucketWriteResult.class;
73+
}
74+
75+
@Override
76+
public boolean isKeyType() {
77+
return false;
78+
}
79+
80+
@Override
81+
public TypeSerializer<CommittableMessage<Committable>> createSerializer(
82+
ExecutionConfig executionConfig) {
83+
// no copy, so that data from writer is directly going into upstream operator while chaining
84+
SimpleVersionedSerializer<Committable> committableSerializer =
85+
committableSerializerFactory.get();
86+
return new SimpleVersionedSerializerTypeSerializerProxy<CommittableMessage<Committable>>(
87+
() ->
88+
new org.apache.flink.core.io.SimpleVersionedSerializer<
89+
CommittableMessage<Committable>>() {
90+
@Override
91+
public int getVersion() {
92+
return committableSerializer.getVersion();
93+
}
94+
95+
@Override
96+
public byte[] serialize(
97+
CommittableMessage<Committable> committableCommittableMessage)
98+
throws IOException {
99+
return committableSerializer.serialize(
100+
committableCommittableMessage.committable());
101+
}
102+
103+
@Override
104+
public CommittableMessage<Committable> deserialize(
105+
int version, byte[] serialized) throws IOException {
106+
return new CommittableMessage<>(
107+
committableSerializer.deserialize(version, serialized));
108+
}
109+
}) {
110+
// nothing
111+
@Override
112+
public CommittableMessage<Committable> copy(CommittableMessage<Committable> from) {
113+
return from;
114+
}
115+
116+
@Override
117+
public CommittableMessage<Committable> copy(
118+
CommittableMessage<Committable> from, CommittableMessage<Committable> reuse) {
119+
return from;
120+
}
121+
};
122+
}
123+
124+
@Override
125+
public String toString() {
126+
return "LakeCommittableTypeInfo";
127+
}
128+
129+
@Override
130+
public boolean equals(Object obj) {
131+
return obj instanceof CommittableMessageTypeInfo;
132+
}
133+
134+
@Override
135+
public int hashCode() {
136+
return getClass().hashCode();
137+
}
138+
139+
@Override
140+
public boolean canEqual(Object obj) {
141+
return obj instanceof CommittableMessageTypeInfo;
142+
}
143+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.committer;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
21+
import java.util.Map;
22+
23+
/** A lake snapshot for a Fluss table. */
24+
public class FlussTableLakeSnapshot {
25+
26+
private final long tableId;
27+
28+
private final long lakeSnapshotId;
29+
30+
private final Map<TableBucket, Long> logEndOffsets;
31+
32+
public FlussTableLakeSnapshot(
33+
long tableId, long lakeSnapshotId, Map<TableBucket, Long> logEndOffsets) {
34+
this.tableId = tableId;
35+
this.lakeSnapshotId = lakeSnapshotId;
36+
this.logEndOffsets = logEndOffsets;
37+
}
38+
39+
public long tableId() {
40+
return tableId;
41+
}
42+
43+
public long lakeSnapshotId() {
44+
return lakeSnapshotId;
45+
}
46+
47+
public Map<TableBucket, Long> logEndOffsets() {
48+
return logEndOffsets;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "FlussTableLakeSnapshot{"
54+
+ "tableId="
55+
+ tableId
56+
+ ", lakeSnapshotId="
57+
+ lakeSnapshotId
58+
+ ", logEndOffsets="
59+
+ logEndOffsets
60+
+ '}';
61+
}
62+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.committer;
18+
19+
import com.alibaba.fluss.client.metadata.MetadataUpdater;
20+
import com.alibaba.fluss.config.ConfigOptions;
21+
import com.alibaba.fluss.config.Configuration;
22+
import com.alibaba.fluss.metadata.TableBucket;
23+
import com.alibaba.fluss.metrics.registry.MetricRegistry;
24+
import com.alibaba.fluss.rpc.GatewayClientProxy;
25+
import com.alibaba.fluss.rpc.RpcClient;
26+
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
27+
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
28+
import com.alibaba.fluss.rpc.messages.PbLakeTableOffsetForBucket;
29+
import com.alibaba.fluss.rpc.messages.PbLakeTableSnapshotInfo;
30+
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
31+
import com.alibaba.fluss.utils.ExceptionUtils;
32+
33+
import java.io.IOException;
34+
import java.util.Map;
35+
36+
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
37+
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
38+
39+
private final Configuration flussConf;
40+
41+
private CoordinatorGateway coordinatorGateway;
42+
private RpcClient rpcClient;
43+
44+
public FlussTableLakeSnapshotCommitter(Configuration flussConf) {
45+
this.flussConf = flussConf;
46+
}
47+
48+
public void open() {
49+
// init coordinator gateway
50+
String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
51+
MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
52+
// don't care about metrics, but pass a ClientMetricGroup to make compiler happy
53+
rpcClient = RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId));
54+
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
55+
this.coordinatorGateway =
56+
GatewayClientProxy.createGatewayProxy(
57+
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
58+
}
59+
60+
public void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
61+
try {
62+
CommitLakeTableSnapshotRequest request =
63+
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
64+
coordinatorGateway.commitLakeTableSnapshot(request).get();
65+
} catch (Exception e) {
66+
throw new IOException(
67+
String.format(
68+
"Fail to commit table lake snapshot %s to Fluss.",
69+
flussTableLakeSnapshot),
70+
ExceptionUtils.stripExecutionException(e));
71+
}
72+
}
73+
74+
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
75+
FlussTableLakeSnapshot flussTableLakeSnapshot) {
76+
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
77+
new CommitLakeTableSnapshotRequest();
78+
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
79+
commitLakeTableSnapshotRequest.addTablesReq();
80+
81+
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
82+
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
83+
for (Map.Entry<TableBucket, Long> bucketEndOffsetEntry :
84+
flussTableLakeSnapshot.logEndOffsets().entrySet()) {
85+
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
86+
pbLakeTableSnapshotInfo.addBucketsReq();
87+
TableBucket tableBucket = bucketEndOffsetEntry.getKey();
88+
long endOffset = bucketEndOffsetEntry.getValue();
89+
if (tableBucket.getPartitionId() != null) {
90+
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
91+
}
92+
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
93+
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
94+
}
95+
return commitLakeTableSnapshotRequest;
96+
}
97+
98+
@Override
99+
public void close() throws Exception {
100+
if (rpcClient != null) {
101+
rpcClient.close();
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)