Skip to content

Commit 37c8016

Browse files
committed
minor fix
1 parent c2a824f commit 37c8016

File tree

10 files changed

+47
-73
lines changed

10 files changed

+47
-73
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
112112
if (prepareCommitResp.hasErrorCode()) {
113113
throw ApiError.fromErrorMessage(prepareCommitResp).exception();
114114
} else {
115-
return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
115+
return checkNotNull(prepareCommitResp).getLakeTableBucketOffsetsPath();
116116
}
117117
} catch (Exception e) {
118118
throw new IOException(
@@ -126,7 +126,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
126126
void commit(
127127
long tableId,
128128
long lakeSnapshotId,
129-
String lakeSnapshotPath,
129+
String lakeBucketOffsetsPath,
130130
Map<TableBucket, Long> logEndOffsets,
131131
Map<TableBucket, Long> logMaxTieredTimestamps)
132132
throws IOException {
@@ -135,7 +135,7 @@ void commit(
135135
toCommitLakeTableSnapshotRequest(
136136
tableId,
137137
lakeSnapshotId,
138-
lakeSnapshotPath,
138+
lakeBucketOffsetsPath,
139139
logEndOffsets,
140140
logMaxTieredTimestamps);
141141
List<PbCommitLakeTableSnapshotRespForTable> commitLakeTableSnapshotRespForTables =
@@ -200,15 +200,15 @@ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRe
200200
*
201201
* @param tableId the table ID
202202
* @param snapshotId the lake snapshot ID
203-
* @param lakeSnapshotPath the file path where the snapshot metadata is stored
203+
* @param bucketOffsetsPath the file path where the bucket offsets is stored
204204
* @param logEndOffsets the log end offsets for each bucket
205205
* @param logMaxTieredTimestamps the max tiered timestamps for each bucket
206206
* @return the commit request
207207
*/
208208
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
209209
long tableId,
210210
long snapshotId,
211-
String lakeSnapshotPath,
211+
String bucketOffsetsPath,
212212
Map<TableBucket, Long> logEndOffsets,
213213
Map<TableBucket, Long> logMaxTieredTimestamps) {
214214
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
@@ -220,8 +220,8 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
220220
pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
221221
pbLakeTableSnapshotMetadata.setTableId(tableId);
222222
// tiered snapshot file path is equal to readable snapshot currently
223-
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
224-
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
223+
pbLakeTableSnapshotMetadata.setTieredBucketOffsetsFilePath(bucketOffsetsPath);
224+
pbLakeTableSnapshotMetadata.setReadableBucketOffsetsFilePath(bucketOffsetsPath);
225225

226226
// Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
227227
// synchronized log end offsets and max timestamps)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,22 +230,21 @@ private Committable commitWriteResults(
230230
? null
231231
: flussCurrentLakeSnapshot.getSnapshotId());
232232

233-
// get the lake snapshot file storing the log end offsets
234-
String lakeSnapshotMetadataFile =
233+
// get the lake bucket offsets file storing the log end offsets
234+
String lakeBucketOffsetsFile =
235235
flussTableLakeSnapshotCommitter.prepareCommit(
236236
tableId, tablePath, logEndOffsets);
237237

238-
// record the lake snapshot metadata file to snapshot property
238+
// record the lake snapshot bucket offsets file to snapshot property
239239
long committedSnapshotId =
240240
lakeCommitter.commit(
241241
committable,
242242
Collections.singletonMap(
243-
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
244-
lakeSnapshotMetadataFile));
243+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, lakeBucketOffsetsFile));
245244
flussTableLakeSnapshotCommitter.commit(
246245
tableId,
247246
committedSnapshotId,
248-
lakeSnapshotMetadataFile,
247+
lakeBucketOffsetsFile,
249248
logEndOffsets,
250249
logMaxTieredTimestamps);
251250
return committable;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void testPartitionsExpiredInFlussButExistInLake(
617617
new TableBucket(tableId, hybridPartitionId, 1), lakeEndOffset,
618618
new TableBucket(tableId, hybridPartitionId, 2), lakeEndOffset));
619619
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
620-
lakeTableHelper.upsertLakeTableV1(tableId, lakeTableSnapshot);
620+
lakeTableHelper.upsertLakeTable(tableId, lakeTableSnapshot);
621621

622622
// Create PartitionInfo for lake partitions
623623
List<PartitionInfo> lakePartitionInfos = new ArrayList<>();

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,6 @@ CompletableFuture<DescribeClusterConfigsResponse> describeClusterConfigs(
211211
* <li>Returns the file path where the snapshot is stored
212212
* </ul>
213213
*
214-
* <p>The returned file path points to a file that stores the bucket log end offset information
215-
* for the table. This file path will be used in the subsequent commit phase to reference the
216-
* stored snapshot. If any error occurs during processing for a specific table, an error
217-
* response will be set for that table in the response, while other tables will continue to be
218-
* processed.
219-
*
220214
* @param request the request containing lake table snapshot information for one or more tables
221215
* @return a future that completes with a response containing the file paths where snapshots
222216
* (containing bucket log end offset information) are stored, or error information for

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,8 @@ message PbDescribeConfig {
958958
message PbLakeTableSnapshotMetadata {
959959
required int64 table_id = 1;
960960
required int64 snapshot_id = 2;
961-
required string tiered_snapshot_file_path = 3;
962-
optional string readable_snapshot_file_path = 4;
961+
required string tiered_bucket_offsets_file_path = 3;
962+
optional string readable_bucket_offsets_file_path = 4;
963963
}
964964

965965
message PbLakeTableSnapshotInfo {
@@ -982,7 +982,7 @@ message PbLakeTableOffsetForBucket {
982982
}
983983

984984
message PbPrepareCommitLakeTableRespForTable {
985-
optional string lake_table_snapshot_file_path = 1;
985+
optional string lake_table_bucket_offsets_path = 1;
986986
optional int32 error_code = 2;
987987
optional string error_message = 3;
988988
}

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLa
633633
FsPath fsPath =
634634
lakeTableHelper.storeLakeTableBucketOffsets(
635635
tablePath, tableBucketOffsets);
636-
pbPrepareCommitLakeTableRespForTable.setLakeTableSnapshotFilePath(
636+
pbPrepareCommitLakeTableRespForTable.setLakeTableBucketOffsetsPath(
637637
fsPath.toString());
638638
} catch (Exception e) {
639639
Errors error = ApiError.fromThrowable(e).error();

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1305,7 +1305,7 @@ private void handleCommitLakeTableSnapshotV1(
13051305
}
13061306

13071307
// this involves IO operation (ZK), so we do it in ioExecutor
1308-
lakeTableHelper.upsertLakeTableV1(
1308+
lakeTableHelper.upsertLakeTable(
13091309
tableId, lakeTableSnapshotEntry.getValue());
13101310
} catch (Exception e) {
13111311
ApiError error = ApiError.fromThrowable(e);

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@
173173
import org.apache.fluss.utils.json.DataTypeJsonSerde;
174174
import org.apache.fluss.utils.json.JsonSerdeUtils;
175175
import org.apache.fluss.utils.json.TableBucketOffsets;
176-
import org.apache.fluss.utils.types.Tuple2;
177176

178177
import javax.annotation.Nullable;
179178

@@ -190,7 +189,6 @@
190189
import java.util.Optional;
191190
import java.util.OptionalInt;
192191
import java.util.Set;
193-
import java.util.function.Consumer;
194192
import java.util.stream.Collectors;
195193
import java.util.stream.Stream;
196194

@@ -1556,17 +1554,28 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15561554
Map<TableBucket, Long> tableBucketsMaxTimestamp = new HashMap<>();
15571555
for (PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo : request.getTablesReqsList()) {
15581556
long tableId = pbLakeTableSnapshotInfo.getTableId();
1559-
Consumer<Tuple2<TableBucket, PbLakeTableOffsetForBucket>>
1560-
pbLakeTableOffsetForBucketConsumer =
1561-
(tableBucketAndOffset -> {
1562-
if (tableBucketAndOffset.f1.hasMaxTimestamp()) {
1563-
tableBucketsMaxTimestamp.put(
1564-
tableBucketAndOffset.f0,
1565-
tableBucketAndOffset.f1.getMaxTimestamp());
1566-
}
1567-
});
1557+
long snapshotId = pbLakeTableSnapshotInfo.getSnapshotId();
1558+
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
1559+
for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
1560+
pbLakeTableSnapshotInfo.getBucketsReqsList()) {
1561+
Long partitionId =
1562+
lakeTableOffsetForBucket.hasPartitionId()
1563+
? lakeTableOffsetForBucket.getPartitionId()
1564+
: null;
1565+
int bucketId = lakeTableOffsetForBucket.getBucketId();
1566+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
1567+
Long logEndOffset =
1568+
lakeTableOffsetForBucket.hasLogEndOffset()
1569+
? lakeTableOffsetForBucket.getLogEndOffset()
1570+
: null;
1571+
if (lakeTableOffsetForBucket.hasMaxTimestamp()) {
1572+
tableBucketsMaxTimestamp.put(
1573+
tableBucket, lakeTableOffsetForBucket.getMaxTimestamp());
1574+
}
1575+
bucketLogEndOffset.put(tableBucket, logEndOffset);
1576+
}
15681577
LakeTableSnapshot lakeTableSnapshot =
1569-
toLakeSnapshot(pbLakeTableSnapshotInfo, pbLakeTableOffsetForBucketConsumer);
1578+
new LakeTableSnapshot(snapshotId, bucketLogEndOffset);
15701579
lakeTableInfoByTableId.put(tableId, lakeTableSnapshot);
15711580
}
15721581

@@ -1578,11 +1587,12 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15781587
pbLakeTableSnapshotMetadata.getTableId(),
15791588
new LakeTable.LakeSnapshotMetadata(
15801589
pbLakeTableSnapshotMetadata.getSnapshotId(),
1581-
new FsPath(pbLakeTableSnapshotMetadata.getTieredSnapshotFilePath()),
1582-
pbLakeTableSnapshotMetadata.hasReadableSnapshotFilePath()
1590+
new FsPath(
1591+
pbLakeTableSnapshotMetadata.getTieredBucketOffsetsFilePath()),
1592+
pbLakeTableSnapshotMetadata.hasReadableBucketOffsetsFilePath()
15831593
? new FsPath(
15841594
pbLakeTableSnapshotMetadata
1585-
.getReadableSnapshotFilePath())
1595+
.getReadableBucketOffsetsFilePath())
15861596
: null));
15871597
}
15881598
return new CommitLakeTableSnapshotData(
@@ -1606,36 +1616,6 @@ public static TableBucketOffsets toTableBucketOffsets(
16061616
return new TableBucketOffsets(tableId, bucketOffsets);
16071617
}
16081618

1609-
private static LakeTableSnapshot toLakeSnapshot(
1610-
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo,
1611-
@Nullable
1612-
Consumer<Tuple2<TableBucket, PbLakeTableOffsetForBucket>>
1613-
pbLakeTableOffsetForBucketConsumer) {
1614-
long tableId = pbLakeTableSnapshotInfo.getTableId();
1615-
long snapshotId = pbLakeTableSnapshotInfo.getSnapshotId();
1616-
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
1617-
for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
1618-
pbLakeTableSnapshotInfo.getBucketsReqsList()) {
1619-
Long partitionId =
1620-
lakeTableOffsetForBucket.hasPartitionId()
1621-
? lakeTableOffsetForBucket.getPartitionId()
1622-
: null;
1623-
int bucketId = lakeTableOffsetForBucket.getBucketId();
1624-
1625-
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
1626-
Long logEndOffset =
1627-
lakeTableOffsetForBucket.hasLogEndOffset()
1628-
? lakeTableOffsetForBucket.getLogEndOffset()
1629-
: null;
1630-
if (pbLakeTableOffsetForBucketConsumer != null) {
1631-
pbLakeTableOffsetForBucketConsumer.accept(
1632-
Tuple2.of(tableBucket, lakeTableOffsetForBucket));
1633-
}
1634-
bucketLogEndOffset.put(tableBucket, logEndOffset);
1635-
}
1636-
return new LakeTableSnapshot(snapshotId, bucketLogEndOffset);
1637-
}
1638-
16391619
public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket(
16401620
TableBucket tableBucket,
16411621
LakeTableSnapshot lakeTableSnapshot,

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
4646
}
4747

4848
/**
49-
* Upserts a lake table snapshot for the given table, stored in v1 format.
49+
* Upserts a lake table snapshot for the given table, stored in v1 format. Note: this method is
50+
* just for back compatibility.
5051
*
5152
* @param tableId the table ID
5253
* @param lakeTableSnapshot the new snapshot to upsert
5354
* @throws Exception if the operation fails
5455
*/
55-
public void upsertLakeTableV1(long tableId, LakeTableSnapshot lakeTableSnapshot)
56+
public void upsertLakeTable(long tableId, LakeTableSnapshot lakeTableSnapshot)
5657
throws Exception {
5758
Optional<LakeTable> optPreviousLakeTable = zkClient.getLakeTable(tableId);
5859
// Merge with previous snapshot if exists

fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception {
111111
LakeTableSnapshot lakeTableSnapshot =
112112
new LakeTableSnapshot(snapshotId, bucketLogEndOffset);
113113
// Write version 1 format data(simulating old system behavior)
114-
lakeTableHelper.upsertLakeTableV1(tableId, lakeTableSnapshot);
114+
lakeTableHelper.upsertLakeTable(tableId, lakeTableSnapshot);
115115

116116
// Verify version 1 data can be read
117117
Optional<LakeTable> optionalLakeTable = zooKeeperClient.getLakeTable(tableId);

0 commit comments

Comments
 (0)