Skip to content

Commit 128615c

Browse files
committed
remove unnecessary information in zk lake table node
1 parent 6eaa89d commit 128615c

File tree

12 files changed

+75
-355
lines changed

12 files changed

+75
-355
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,6 +1149,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
11491149

11501150
private void processNotifyLakeTableOffsetEvent(NotifyLakeTableOffsetEvent event) {
11511151
Map<Long, LakeTableSnapshot> lakeTableSnapshots = event.getLakeTableSnapshots();
1152+
Map<TableBucket, Long> tableBucketMaxTieredTimestamps =
1153+
event.getTableBucketMaxTieredTimestamps();
11521154
coordinatorRequestBatch.newBatch();
11531155
for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
11541156
lakeTableSnapshots.entrySet()) {
@@ -1164,7 +1166,8 @@ private void processNotifyLakeTableOffsetEvent(NotifyLakeTableOffsetEvent event)
11641166
.addNotifyLakeTableOffsetRequestForTableServers(
11651167
coordinatorContext.getAssignment(tb),
11661168
tb,
1167-
lakeTableSnapshot));
1169+
lakeTableSnapshot,
1170+
tableBucketMaxTieredTimestamps.get(tb)));
11681171
}
11691172
}
11701173
coordinatorRequestBatch.sendNotifyLakeTableOffsetRequest(
@@ -1271,7 +1274,10 @@ private void tryProcessCommitLakeTableSnapshot(
12711274

12721275
// send notify lakehouse data request to all replicas via coordinator event
12731276
coordinatorEventManager.put(
1274-
new NotifyLakeTableOffsetEvent(lakeTableSnapshots));
1277+
new NotifyLakeTableOffsetEvent(
1278+
lakeTableSnapshots,
1279+
commitLakeTableSnapshotData
1280+
.getTableBucketsMaxTieredTimestamp()));
12751281
callback.complete(response);
12761282
} catch (Exception e) {
12771283
callback.completeExceptionally(e);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,8 @@ public void addNotifyKvSnapshotOffsetRequestForTabletServers(
378378
public void addNotifyLakeTableOffsetRequestForTableServers(
379379
List<Integer> tabletServers,
380380
TableBucket tableBucket,
381-
LakeTableSnapshot lakeTableSnapshot) {
381+
LakeTableSnapshot lakeTableSnapshot,
382+
@Nullable Long maxTieredTimestamp) {
382383
tabletServers.stream()
383384
.filter(s -> s >= 0)
384385
.forEach(
@@ -390,7 +391,7 @@ public void addNotifyLakeTableOffsetRequestForTableServers(
390391
notifyLakeTableOffsetReqForBucketMap.put(
391392
tableBucket,
392393
makeNotifyLakeTableOffsetForBucket(
393-
tableBucket, lakeTableSnapshot));
394+
tableBucket, lakeTableSnapshot, maxTieredTimestamp));
394395
});
395396
}
396397

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyLakeTableOffsetEvent.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.coordinator.event;
1919

20+
import org.apache.fluss.metadata.TableBucket;
2021
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
2122

2223
import java.util.Map;
@@ -25,12 +26,20 @@
2526
public class NotifyLakeTableOffsetEvent implements CoordinatorEvent {
2627

2728
private final Map<Long, LakeTableSnapshot> lakeTableSnapshots;
29+
private final Map<TableBucket, Long> tableBucketMaxTieredTimestamps;
2830

29-
public NotifyLakeTableOffsetEvent(Map<Long, LakeTableSnapshot> lakeTableSnapshots) {
31+
public NotifyLakeTableOffsetEvent(
32+
Map<Long, LakeTableSnapshot> lakeTableSnapshots,
33+
Map<TableBucket, Long> tableBucketMaxTieredTimestamps) {
3034
this.lakeTableSnapshots = lakeTableSnapshots;
35+
this.tableBucketMaxTieredTimestamps = tableBucketMaxTieredTimestamps;
3136
}
3237

3338
public Map<Long, LakeTableSnapshot> getLakeTableSnapshots() {
3439
return lakeTableSnapshots;
3540
}
41+
42+
public Map<TableBucket, Long> getTableBucketMaxTieredTimestamps() {
43+
return tableBucketMaxTieredTimestamps;
44+
}
3645
}

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.entity;
1919

20+
import org.apache.fluss.metadata.TableBucket;
2021
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
2122
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
2223

@@ -27,34 +28,46 @@
2728
public class CommitLakeTableSnapshotData {
2829

2930
private final Map<Long, LakeTableSnapshot> lakeTableSnapshots;
31+
private final Map<TableBucket, Long> tableBucketsMaxTieredTimestamp;
3032

31-
public CommitLakeTableSnapshotData(Map<Long, LakeTableSnapshot> lakeTableSnapshots) {
33+
public CommitLakeTableSnapshotData(
34+
Map<Long, LakeTableSnapshot> lakeTableSnapshots,
35+
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp) {
3236
this.lakeTableSnapshots = lakeTableSnapshots;
37+
this.tableBucketsMaxTieredTimestamp = tableBucketsMaxTieredTimestamp;
3338
}
3439

3540
public Map<Long, LakeTableSnapshot> getLakeTableSnapshot() {
3641
return lakeTableSnapshots;
3742
}
3843

44+
public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
45+
return tableBucketsMaxTieredTimestamp;
46+
}
47+
3948
@Override
4049
public boolean equals(Object o) {
41-
if (this == o) {
42-
return true;
43-
}
44-
if (!(o instanceof CommitLakeTableSnapshotData)) {
50+
if (o == null || getClass() != o.getClass()) {
4551
return false;
4652
}
4753
CommitLakeTableSnapshotData that = (CommitLakeTableSnapshotData) o;
48-
return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots);
54+
return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots)
55+
&& Objects.equals(
56+
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp);
4957
}
5058

5159
@Override
5260
public int hashCode() {
53-
return Objects.hashCode(lakeTableSnapshots);
61+
return Objects.hash(lakeTableSnapshots, tableBucketsMaxTieredTimestamp);
5462
}
5563

5664
@Override
5765
public String toString() {
58-
return "CommitLakeTableSnapshotData{" + "lakeTableInfos=" + lakeTableSnapshots + '}';
66+
return "CommitLakeTableSnapshotData{"
67+
+ "lakeTableSnapshots="
68+
+ lakeTableSnapshots
69+
+ ", tableBucketsMaxTieredTimestamp="
70+
+ tableBucketsMaxTieredTimestamp
71+
+ '}';
5972
}
6073
}

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -820,18 +820,9 @@ private void updateWithLakeTableSnapshot(Replica replica) throws Exception {
820820
LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
821821
long snapshotId = optLakeTableSnapshot.get().getSnapshotId();
822822
replica.getLogTablet().updateLakeTableSnapshotId(snapshotId);
823-
824-
lakeTableSnapshot
825-
.getLogStartOffset(tb)
826-
.ifPresent(replica.getLogTablet()::updateLakeLogStartOffset);
827-
828823
lakeTableSnapshot
829824
.getLogEndOffset(tb)
830825
.ifPresent(replica.getLogTablet()::updateLakeLogEndOffset);
831-
832-
lakeTableSnapshot
833-
.getMaxTimestamp(tb)
834-
.ifPresent(replica.getLogTablet()::updateLakeMaxTimestamp);
835826
}
836827
}
837828

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1545,13 +1545,11 @@ public static PbPartitionSpec makePbPartitionSpec(ResolvedPartitionSpec spec) {
15451545
public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15461546
CommitLakeTableSnapshotRequest request) {
15471547
Map<Long, LakeTableSnapshot> lakeTableInfoByTableId = new HashMap<>();
1548+
Map<TableBucket, Long> tableBucketsMaxTimestamp = new HashMap<>();
15481549
for (PbLakeTableSnapshotInfo pdLakeTableSnapshotInfo : request.getTablesReqsList()) {
15491550
long tableId = pdLakeTableSnapshotInfo.getTableId();
15501551
long snapshotId = pdLakeTableSnapshotInfo.getSnapshotId();
1551-
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
15521552
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
1553-
Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
1554-
Map<Long, String> partitionNameByPartitionId = new HashMap<>();
15551553

15561554
for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
15571555
pdLakeTableSnapshotInfo.getBucketsReqsList()) {
@@ -1562,42 +1560,27 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15621560
int bucketId = lakeTableOffsetForBucket.getBucketId();
15631561

15641562
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
1565-
Long logStartOffset =
1566-
lakeTableOffsetForBucket.hasLogStartOffset()
1567-
? lakeTableOffsetForBucket.getLogStartOffset()
1568-
: null;
15691563
Long logEndOffset =
15701564
lakeTableOffsetForBucket.hasLogEndOffset()
15711565
? lakeTableOffsetForBucket.getLogEndOffset()
15721566
: null;
1573-
Long logMaxTimestamp =
1574-
lakeTableOffsetForBucket.hasMaxTimestamp()
1575-
? lakeTableOffsetForBucket.getMaxTimestamp()
1576-
: null;
1577-
bucketLogStartOffset.put(tableBucket, logStartOffset);
15781567
bucketLogEndOffset.put(tableBucket, logEndOffset);
1579-
bucketMaxTimestamp.put(tableBucket, logMaxTimestamp);
15801568

1581-
if (lakeTableOffsetForBucket.hasPartitionName()) {
1582-
partitionNameByPartitionId.put(
1583-
partitionId, lakeTableOffsetForBucket.getPartitionName());
1569+
if (lakeTableOffsetForBucket.hasMaxTimestamp()) {
1570+
tableBucketsMaxTimestamp.put(
1571+
tableBucket, lakeTableOffsetForBucket.getMaxTimestamp());
15841572
}
15851573
}
15861574
lakeTableInfoByTableId.put(
1587-
tableId,
1588-
new LakeTableSnapshot(
1589-
snapshotId,
1590-
tableId,
1591-
bucketLogStartOffset,
1592-
bucketLogEndOffset,
1593-
bucketMaxTimestamp,
1594-
partitionNameByPartitionId));
1575+
tableId, new LakeTableSnapshot(snapshotId, tableId, bucketLogEndOffset));
15951576
}
1596-
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId);
1577+
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);
15971578
}
15981579

15991580
public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket(
1600-
TableBucket tableBucket, LakeTableSnapshot lakeTableSnapshot) {
1581+
TableBucket tableBucket,
1582+
LakeTableSnapshot lakeTableSnapshot,
1583+
@Nullable Long maxTimestamp) {
16011584
PbNotifyLakeTableOffsetReqForBucket reqForBucket =
16021585
new PbNotifyLakeTableOffsetReqForBucket();
16031586
if (tableBucket.getPartitionId() != null) {
@@ -1608,11 +1591,11 @@ public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBu
16081591
.setBucketId(tableBucket.getBucket())
16091592
.setSnapshotId(lakeTableSnapshot.getSnapshotId());
16101593

1611-
lakeTableSnapshot.getLogStartOffset(tableBucket).ifPresent(reqForBucket::setLogStartOffset);
1612-
16131594
lakeTableSnapshot.getLogEndOffset(tableBucket).ifPresent(reqForBucket::setLogEndOffset);
16141595

1615-
lakeTableSnapshot.getMaxTimestamp(tableBucket).ifPresent(reqForBucket::setMaxTimestamp);
1596+
if (maxTimestamp != null) {
1597+
reqForBucket.setMaxTimestamp(maxTimestamp);
1598+
}
16161599

16171600
return reqForBucket;
16181601
}
@@ -1662,13 +1645,6 @@ public static GetLatestLakeSnapshotResponse makeGetLatestLakeSnapshotResponse(
16621645
.setLogOffset(logEndLogOffsetEntry.getValue());
16631646
if (tableBucket.getPartitionId() != null) {
16641647
pbLakeSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
1665-
String partitionName =
1666-
lakeTableSnapshot
1667-
.getPartitionNameIdByPartitionId()
1668-
.get(tableBucket.getPartitionId());
1669-
if (partitionName != null) {
1670-
pbLakeSnapshotForBucket.setPartitionName(partitionName);
1671-
}
16721648
}
16731649
}
16741650

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,15 @@ private LakeTableSnapshot mergeLakeTable(
9393
// may not carry all buckets for the table. It typically only carries buckets
9494
// that were written after the previous commit.
9595

96-
// merge log startup offset, current will override the previous
97-
Map<TableBucket, Long> bucketLogStartOffset =
98-
new HashMap<>(previousLakeTableSnapshot.getBucketLogStartOffset());
99-
bucketLogStartOffset.putAll(newLakeTableSnapshot.getBucketLogStartOffset());
100-
10196
// merge log end offsets, current will override the previous
10297
Map<TableBucket, Long> bucketLogEndOffset =
10398
new HashMap<>(previousLakeTableSnapshot.getBucketLogEndOffset());
10499
bucketLogEndOffset.putAll(newLakeTableSnapshot.getBucketLogEndOffset());
105100

106-
// merge max timestamp, current will override the previous
107-
Map<TableBucket, Long> bucketMaxTimestamp =
108-
new HashMap<>(previousLakeTableSnapshot.getBucketMaxTimestamp());
109-
bucketMaxTimestamp.putAll(newLakeTableSnapshot.getBucketMaxTimestamp());
110-
111-
Map<Long, String> partitionNameById =
112-
new HashMap<>(previousLakeTableSnapshot.getPartitionNameIdByPartitionId());
113-
partitionNameById.putAll(newLakeTableSnapshot.getPartitionNameIdByPartitionId());
114-
115101
return new LakeTableSnapshot(
116102
newLakeTableSnapshot.getSnapshotId(),
117103
newLakeTableSnapshot.getTableId(),
118-
bucketLogStartOffset,
119-
bucketLogEndOffset,
120-
bucketMaxTimestamp,
121-
partitionNameById);
104+
bucketLogEndOffset);
122105
}
123106

124107
private FsPath storeLakeTableSnapshot(

0 commit comments

Comments
 (0)