Skip to content

Commit 5442462

Browse files
committed
resolve conflict
1 parent 968f096 commit 5442462

File tree

16 files changed

+150
-140
lines changed

16 files changed

+150
-140
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.fluss.lake.committer;
1919

20-
import java.util.HashMap;
2120
import java.util.Map;
2221
import java.util.Objects;
2322

@@ -30,10 +29,11 @@ public class CommittedLakeSnapshot {
3029
private final long lakeSnapshotId;
3130
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
3231
// partition bucket
33-
private final Map<String, String> lakeSnapshotProperties = new HashMap<>();
32+
private final Map<String, String> lakeSnapshotProperties;
3433

3534
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> lakeSnapshotProperties) {
3635
this.lakeSnapshotId = lakeSnapshotId;
36+
this.lakeSnapshotProperties = lakeSnapshotProperties;
3737
}
3838

3939
public long getLakeSnapshotId() {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
9393
// get the prepare commit lake resp for table
9494
if (prepareCommitResp.hasError()) {
9595
throw new IOException(
96-
"Fail to prepare commit table lake snapshot.",
96+
"Fail to prepare commit table lake snapshot for %s to Fluss.",
9797
ApiError.fromErrorMessage(prepareCommitResp.getError()).exception());
9898
}
9999
return prepareCommitResp.getLakeTableSnapshotFilePath();
@@ -120,6 +120,9 @@ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRe
120120
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
121121
prepareCommitLakeTableSnapshotRequest.addTablesReq();
122122
pbLakeTableSnapshotInfo.setTableId(tableId);
123+
124+
// in prepare phase, we don't know the snapshot id,
125+
// set -1 since the field is required
123126
pbLakeTableSnapshotInfo.setSnapshotId(-1L);
124127
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
125128
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.fluss.lake.writer.LakeTieringFactory;
3434
import org.apache.fluss.lake.writer.LakeWriter;
3535
import org.apache.fluss.metadata.TableBucket;
36-
import org.apache.fluss.metadata.TableInfo;
3736
import org.apache.fluss.metadata.TablePath;
3837
import org.apache.fluss.utils.ExceptionUtils;
3938

@@ -51,7 +50,6 @@
5150

5251
import java.util.ArrayList;
5352
import java.util.Collections;
54-
import java.util.Collections;
5553
import java.util.HashMap;
5654
import java.util.HashSet;
5755
import java.util.List;
@@ -202,7 +200,9 @@ private Committable commitWriteResults(
202200
}
203201
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
204202
lakeTieringFactory.createLakeCommitter(
205-
new TieringCommitterInitContext(tablePath, admin.getTableInfo(tablePath).get(),
203+
new TieringCommitterInitContext(
204+
tablePath,
205+
admin.getTableInfo(tablePath).get(),
206206
lakeTieringConfig))) {
207207
List<WriteResult> writeResults =
208208
committableWriteResults.stream()
@@ -215,13 +215,6 @@ private Committable commitWriteResults(
215215
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
216216
}
217217

218-
String lakeSnapshotFile =
219-
flussTableLakeSnapshotCommitter.prepareCommit(
220-
tableId, tablePath, logEndOffsets);
221-
222-
Map<String, String> logOffsetsProperty =
223-
Collections.singletonMap(
224-
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, lakeSnapshotFile);
225218
// to committable
226219
Committable committable = lakeCommitter.toCommittable(writeResults);
227220
// before commit to lake, check fluss not missing any lake snapshot committed by fluss
@@ -234,8 +227,21 @@ private Committable commitWriteResults(
234227
flussCurrentLakeSnapshot == null
235228
? null
236229
: flussCurrentLakeSnapshot.getSnapshotId());
237-
long committedSnapshotId = lakeCommitter.commit(committable, logOffsetsProperty);
238-
flussTableLakeSnapshotCommitter.commit(tableId, committedSnapshotId, lakeSnapshotFile);
230+
231+
// get the lake snapshot file storing the log end offsets
232+
String lakeSnapshotMetadataFile =
233+
flussTableLakeSnapshotCommitter.prepareCommit(
234+
tableId, tablePath, logEndOffsets);
235+
236+
// record the lake snapshot metadata file to snapshot property
237+
long committedSnapshotId =
238+
lakeCommitter.commit(
239+
committable,
240+
Collections.singletonMap(
241+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
242+
lakeSnapshotMetadataFile));
243+
flussTableLakeSnapshotCommitter.commit(
244+
tableId, committedSnapshotId, lakeSnapshotMetadataFile);
239245
return committable;
240246
}
241247
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.lake.writer.LakeTieringFactory;
2727
import org.apache.fluss.lake.writer.LakeWriter;
2828
import org.apache.fluss.lake.writer.WriterInitContext;
29+
import org.apache.fluss.metadata.TablePath;
2930
import org.apache.fluss.record.LogRecord;
3031

3132
import javax.annotation.Nullable;
@@ -52,7 +53,7 @@ public TestingLakeTieringFactory() {
5253
@Override
5354
public LakeWriter<TestingWriteResult> createLakeWriter(WriterInitContext writerInitContext)
5455
throws IOException {
55-
return new TestingLakeWriter();
56+
return new TestingLakeWriter(writerInitContext.tablePath());
5657
}
5758

5859
@Override
@@ -77,8 +78,13 @@ public SimpleVersionedSerializer<TestingCommittable> getCommittableSerializer()
7778

7879
private static final class TestingLakeWriter implements LakeWriter<TestingWriteResult> {
7980

81+
private final TablePath tablePath;
8082
private int writtenRecords;
8183

84+
private TestingLakeWriter(TablePath tablePath) {
85+
this.tablePath = tablePath;
86+
}
87+
8288
@Override
8389
public void write(LogRecord record) throws IOException {
8490
writtenRecords += 1;
@@ -99,16 +105,14 @@ public static final class TestingLakeCommitter
99105

100106
private long currentSnapshot;
101107

102-
@Nullable private final CommittedLakeSnapshot mockCommittedSnapshot;
108+
private @Nullable CommittedLakeSnapshot missingCommittedLakeSnapshot;
103109

104110
public TestingLakeCommitter() {
105111
this(null);
106112
}
107113

108-
public TestingLakeCommitter(@Nullable CommittedLakeSnapshot mockCommittedSnapshot) {
109-
this.mockCommittedSnapshot = mockCommittedSnapshot;
110-
this.currentSnapshot =
111-
mockCommittedSnapshot == null ? 0 : mockCommittedSnapshot.getLakeSnapshotId();
114+
public TestingLakeCommitter(CommittedLakeSnapshot missingCommittedLakeSnapshot) {
115+
this.missingCommittedLakeSnapshot = missingCommittedLakeSnapshot;
112116
}
113117

114118
@Override
@@ -135,11 +139,10 @@ public void abort(TestingCommittable committable) throws IOException {
135139
@Override
136140
public @Nullable CommittedLakeSnapshot getMissingLakeSnapshot(
137141
@Nullable Long knownSnapshotId) throws IOException {
138-
if (knownSnapshotId == null) {
139-
return mockCommittedSnapshot;
140-
} else {
141-
return null;
142+
if (missingCommittedLakeSnapshot != null && knownSnapshotId == null) {
143+
return missingCommittedLakeSnapshot;
142144
}
145+
return null;
143146
}
144147

145148
@Override

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,12 @@
4949

5050
import javax.annotation.Nullable;
5151

52-
import java.io.IOException;
5352
import java.util.ArrayList;
5453
import java.util.Collections;
5554
import java.util.HashMap;
5655
import java.util.List;
5756
import java.util.Map;
5857

59-
import static org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
60-
import static org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
6158
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
6259
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
6360
import static org.assertj.core.api.Assertions.assertThat;
@@ -181,7 +178,6 @@ void testCommitPartitionedTable() throws Exception {
181178
Map<String, Long> partitionIdByNames =
182179
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
183180
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
184-
Map<TableBucket, Long> expectedMaxTimestamps = new HashMap<>();
185181
int numberOfWriteResults = 3 * partitionIdByNames.size();
186182
long offset = 0;
187183
long timestamp = System.currentTimeMillis();
@@ -201,7 +197,6 @@ void testCommitPartitionedTable() throws Exception {
201197
currentTimestamp,
202198
numberOfWriteResults));
203199
expectedLogEndOffsets.put(tableBucket, currentOffset);
204-
expectedMaxTimestamps.put(tableBucket, currentTimestamp);
205200
}
206201
if (bucket == 2) {
207202
verifyLakeSnapshot(tablePath, tableId, 1, expectedLogEndOffsets);
@@ -261,10 +256,17 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
261256
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
262257
int numberOfWriteResults = 3;
263258

264-
CommittedLakeSnapshot mockCommittedSnapshot =
265-
mockCommittedLakeSnapshot(Collections.singletonList(null), tableId, 2);
259+
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
260+
for (int bucket = 0; bucket < 3; bucket++) {
261+
TableBucket tableBucket = new TableBucket(tableId, bucket);
262+
expectedLogEndOffsets.put(tableBucket, 3L);
263+
}
264+
265+
CommittedLakeSnapshot mockMissingCommittedLakeSnapshot =
266+
mockCommittedLakeSnapshot(tableId, tablePath, 0, expectedLogEndOffsets);
266267
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
267-
new TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
268+
new TestingLakeTieringFactory.TestingLakeCommitter(
269+
mockMissingCommittedLakeSnapshot);
268270
committerOperator =
269271
new TieringCommitOperator<>(
270272
parameters,
@@ -283,18 +285,18 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
283285
verifyLakeSnapshot(
284286
tablePath,
285287
tableId,
286-
2,
287-
getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
288+
0,
289+
expectedLogEndOffsets,
288290
String.format(
289291
"The current Fluss's lake snapshot %s is less than lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d},"
290292
+ " missing snapshot: %s.",
291293
null,
292-
mockCommittedSnapshot.getLakeSnapshotId(),
294+
mockMissingCommittedLakeSnapshot.getLakeSnapshotId(),
293295
tablePath,
294296
tableId,
295-
mockCommittedSnapshot));
297+
mockMissingCommittedLakeSnapshot));
296298

297-
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
299+
expectedLogEndOffsets = new HashMap<>();
298300
for (int bucket = 0; bucket < 3; bucket++) {
299301
TableBucket tableBucket = new TableBucket(tableId, bucket);
300302
long offset = bucket * bucket;
@@ -305,7 +307,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
305307
expectedLogEndOffsets.put(tableBucket, offset);
306308
}
307309

308-
verifyLakeSnapshot(tablePath, tableId, 3, expectedLogEndOffsets);
310+
verifyLakeSnapshot(tablePath, tableId, 1, expectedLogEndOffsets);
309311
}
310312

311313
@Test
@@ -318,10 +320,21 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
318320
Map<String, Long> partitionIdByNames =
319321
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
320322

321-
CommittedLakeSnapshot mockCommittedSnapshot =
322-
mockCommittedLakeSnapshot(Collections.singletonList(null), tableId, 3);
323+
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
324+
for (int bucket = 0; bucket < 3; bucket++) {
325+
for (String partitionName : partitionIdByNames.keySet()) {
326+
long partitionId = partitionIdByNames.get(partitionName);
327+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
328+
expectedLogEndOffsets.put(tableBucket, 3L);
329+
}
330+
}
331+
332+
CommittedLakeSnapshot mockMissingCommittedLakeSnapshot =
333+
mockCommittedLakeSnapshot(tableId, tablePath, 0, expectedLogEndOffsets);
334+
323335
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
324-
new TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
336+
new TestingLakeTieringFactory.TestingLakeCommitter(
337+
mockMissingCommittedLakeSnapshot);
325338
committerOperator =
326339
new TieringCommitOperator<>(
327340
parameters,
@@ -351,35 +364,15 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
351364
verifyLakeSnapshot(
352365
tablePath,
353366
tableId,
354-
3,
355-
getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
367+
0,
368+
expectedLogEndOffsets,
356369
String.format(
357370
"The current Fluss's lake snapshot %s is less than lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d}, missing snapshot: %s.",
358371
null,
359-
mockCommittedSnapshot.getLakeSnapshotId(),
372+
mockMissingCommittedLakeSnapshot.getLakeSnapshotId(),
360373
tablePath,
361374
tableId,
362-
mockCommittedSnapshot));
363-
}
364-
365-
private CommittedLakeSnapshot mockCommittedLakeSnapshot(
366-
List<Long> partitions, long tableId, int snapshotId) throws IOException {
367-
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
368-
for (Long partition : partitions) {
369-
for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
370-
logEndOffsets.put(new TableBucket(tableId, partition, bucket), bucket + 1L);
371-
}
372-
}
373-
return new CommittedLakeSnapshot(snapshotId, toBucketOffsetsProperty(logEndOffsets));
374-
}
375-
376-
private Map<TableBucket, Long> getExpectedLogEndOffsets(
377-
long tableId, CommittedLakeSnapshot committedLakeSnapshot) throws IOException {
378-
return fromLogOffsetProperty(
379-
tableId,
380-
committedLakeSnapshot
381-
.getSnapshotProperties()
382-
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
375+
mockMissingCommittedLakeSnapshot));
383376
}
384377

385378
private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
@@ -461,10 +454,25 @@ private void verifyLakeSnapshot(
461454
List<OperatorEvent> operatorEvents = mockOperatorEventGateway.getEventsSent();
462455
SourceEventWrapper sourceEventWrapper =
463456
(SourceEventWrapper) operatorEvents.get(operatorEvents.size() - 1);
464-
FailedTieringEvent finishTieringEvent =
457+
FailedTieringEvent failedTieringEvent =
465458
(FailedTieringEvent) sourceEventWrapper.getSourceEvent();
466-
assertThat(finishTieringEvent.getTableId()).isEqualTo(tableId);
467-
assertThat(finishTieringEvent.failReason()).contains(failedReason);
459+
assertThat(failedTieringEvent.getTableId()).isEqualTo(tableId);
460+
assertThat(failedTieringEvent.failReason()).contains(failedReason);
461+
}
462+
463+
private CommittedLakeSnapshot mockCommittedLakeSnapshot(
464+
long tableId, TablePath tablePath, int snapshotId, Map<TableBucket, Long> logEndOffsets)
465+
throws Exception {
466+
try (FlussTableLakeSnapshotCommitter lakeSnapshotCommitter =
467+
new FlussTableLakeSnapshotCommitter(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
468+
lakeSnapshotCommitter.open();
469+
String lakeSnapshotFile =
470+
lakeSnapshotCommitter.prepareCommit(tableId, tablePath, logEndOffsets);
471+
return new CommittedLakeSnapshot(
472+
snapshotId,
473+
Collections.singletonMap(
474+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, lakeSnapshotFile));
475+
}
468476
}
469477

470478
private static class MockOperatorEventDispatcher implements OperatorEventDispatcher {

0 commit comments

Comments
 (0)