-
Notifications
You must be signed in to change notification settings - Fork 458
[lake] Record a file path storing log offsets in lake snapshot property #2223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
33969e9 to
5442462
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
5442462 to
68a5039
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 30 out of 30 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
Outdated
Show resolved
Hide resolved
...ink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
Outdated
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
Outdated
Show resolved
Hide resolved
fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
Outdated
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java
Show resolved
Hide resolved
06af001 to
3889069
Compare
a6c7f69 to
747e91b
Compare
747e91b to
f31d6f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 42 out of 42 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 43 out of 43 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
Show resolved
Hide resolved
...common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
Show resolved
Hide resolved
fluss-common/src/test/java/org/apache/fluss/utils/json/TableBucketOffsetsJsonSerdeTest.java
Show resolved
Hide resolved
37c8016 to
f7a09fa
Compare
f7a09fa to
f470b32
Compare
|
@wuchong Could you please help review this pr? The pr also handle the back compabitlity when use v2 to serialize lake table snapshot |
f470b32 to
0451cbf
Compare
|
|
||
| message CommitLakeTableSnapshotRequest { | ||
| repeated PbLakeTableSnapshotInfo tables_req = 1; | ||
| message PrepareCommitLakeTableSnapshotRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrepareLakeTableSnapshotRequest
prepare and commit are 2 different phases.
| message PrepareCommitLakeTableSnapshotResponse { | ||
| repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| optional int64 max_timestamp = 6; | ||
| } | ||
|
|
||
| message PbPrepareCommitLakeTableRespForTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a table_id field, as the PrepareCommitLakeTableSnapshotRequest request has multiple table ids, we need to distinguish which table is the PbPrepareCommitLakeTableRespForTable belong to.
| optional int32 error_code = 2; | ||
| optional string error_message = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's follow a standard that error_code and error_message as the first 2 fields, because we will add more fields at the end.
| optional string error_message = 3; | ||
| } | ||
|
|
||
| message PbTableBucketOffsets { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to PbTableOffsets? This makes it more clear it is a set of offsets for a table.
| * @return the LakeTableSnapshot | ||
| */ | ||
| public LakeTableSnapshot getLatestTableSnapshot() throws Exception { | ||
| public LakeTableSnapshot getLatestTableSnapshot() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming this method to getOrReadLatestTableSnapshot to explicitly indicate that it may perform I/O operations (e.g., reading from remote storage) when the snapshot isn't already available in memory. This makes the method’s behavior clearer to callers and improves code readability.
| // Version 1: ZK node contains full snapshot data, use LakeTableSnapshotJsonSerde | ||
| LakeTableSnapshotJsonSerde.INSTANCE.serialize( | ||
| lakeTable.getLatestTableSnapshot(), generator); | ||
| } else { | ||
| generator.writeStartObject(); | ||
| generator.writeNumberField(VERSION_KEY, CURRENT_VERSION); | ||
|
|
||
| generator.writeArrayFieldStart(LAKE_SNAPSHOTS); | ||
| for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to extract separate serializeV1() and serializeV2() methods to improve maintainability and readability. The same applies to the deserialize.
| * @see LakeTableJsonSerde for the current format (version 2) that uses this serde for legacy | ||
| * compatibility | ||
| */ | ||
| public class LakeTableSnapshotJsonSerde |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to rename this to LakeTableSnapshotLegacyJsonSerde (Currenlty there is too many LakeXxxSerde)
| } | ||
|
|
||
| @Test | ||
| void testBackwardCompatibility() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need this for backward compatibility for the max_timestamp and log_start_offset fields.
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this method before commitLakeTableSnapshot.
Purpose
Linked issue: close #2224
Brief change log
TieringCommitOperator, first prepare commit log offsets to fluss cluster which will write a file to store the log offsetsTests
Existing test
API and Format
Documentation