Skip to content

Commit 4c2ddc3

Browse files
[flink] Add restore test for streaming union read pk table (#1674)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent 409a955 commit 4c2ddc3

File tree

8 files changed

+271
-45
lines changed

8 files changed

+271
-45
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ public void emitRecord(
5757
}
5858

5959
ScanRecord scanRecord = recordAndPos.record();
60-
// todo: may need a state to mark snapshot phase is finished
61-
// just like what we do for HybridSnapshotLogSplitState
6260
if (scanRecord.logOffset() >= 0) {
6361
// record is with a valid offset, means it's in incremental phase,
6462
// update the log offset

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits
5555
if (split instanceof LakeSnapshotSplit) {
5656
boundedSplits.add(split);
5757
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
58-
boundedSplits.add(split);
58+
// lake split not finished, add to it
59+
if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished()) {
60+
boundedSplits.add(split);
61+
}
5962
} else {
6063
throw new UnsupportedOperationException(
6164
String.format("The split type of %s is not supported.", split.getClass()));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IO
7878
.orElse(LogSplit.NO_STOPPING_OFFSET));
7979
out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
8080
out.writeInt(lakeSnapshotAndFlussLogSplit.getCurrentLakeSplitIndex());
81+
out.writeBoolean(lakeSnapshotAndFlussLogSplit.isLakeSplitFinished());
8182
} else {
8283
throw new UnsupportedOperationException(
8384
"Unsupported split type: " + split.getClass().getName());
@@ -115,14 +116,16 @@ public SourceSplitBase deserialize(
115116
long stoppingOffset = input.readLong();
116117
long recordsToSkip = input.readLong();
117118
int splitIndex = input.readInt();
119+
boolean isLakeSplitFinished = input.readBoolean();
118120
return new LakeSnapshotAndFlussLogSplit(
119121
tableBucket,
120122
partition,
121123
lakeSplits,
122124
startingOffset,
123125
stoppingOffset,
124126
recordsToSkip,
125-
splitIndex);
127+
splitIndex,
128+
isLakeSplitFinished);
126129
} else {
127130
throw new UnsupportedOperationException("Unsupported split kind: " + splitKind);
128131
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ public class LakeSnapshotAndFlussLogSplit extends SourceSplitBase {
4040
* lake split via this lake split index.
4141
*/
4242
private int currentLakeSplitIndex;
43-
/** The records to skip when reading a split. */
44-
private long recordOffset;
43+
/** The records to skip when reading a lake split. */
44+
private long recordToSkip;
45+
46+
/** Whether the lake split has been finished. */
47+
private boolean isLakeSplitFinished;
4548

4649
private long startingOffset;
4750
private final long stoppingOffset;
@@ -52,7 +55,16 @@ public LakeSnapshotAndFlussLogSplit(
5255
@Nullable List<LakeSplit> snapshotSplits,
5356
long startingOffset,
5457
long stoppingOffset) {
55-
this(tableBucket, partitionName, snapshotSplits, startingOffset, stoppingOffset, 0, 0);
58+
this(
59+
tableBucket,
60+
partitionName,
61+
snapshotSplits,
62+
startingOffset,
63+
stoppingOffset,
64+
0,
65+
0,
66+
// if lake splits is null, no lake splits, also means LakeSplitFinished
67+
snapshotSplits == null);
5668
}
5769

5870
public LakeSnapshotAndFlussLogSplit(
@@ -62,17 +74,19 @@ public LakeSnapshotAndFlussLogSplit(
6274
long startingOffset,
6375
long stoppingOffset,
6476
long recordsToSkip,
65-
int currentLakeSplitIndex) {
77+
int currentLakeSplitIndex,
78+
boolean isLakeSplitFinished) {
6679
super(tableBucket, partitionName);
6780
this.lakeSnapshotSplits = snapshotSplits;
6881
this.startingOffset = startingOffset;
6982
this.stoppingOffset = stoppingOffset;
70-
this.recordOffset = recordsToSkip;
83+
this.recordToSkip = recordsToSkip;
7184
this.currentLakeSplitIndex = currentLakeSplitIndex;
85+
this.isLakeSplitFinished = isLakeSplitFinished;
7286
}
7387

7488
public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long recordsToSkip) {
75-
this.recordOffset = recordsToSkip;
89+
this.recordToSkip = recordsToSkip;
7690
return this;
7791
}
7892

@@ -86,14 +100,23 @@ public LakeSnapshotAndFlussLogSplit updateWithStartingOffset(long startingOffset
86100
return this;
87101
}
88102

103+
public LakeSnapshotAndFlussLogSplit updateWithLakeSplitFinished(boolean isLakeSplitFinished) {
104+
this.isLakeSplitFinished = isLakeSplitFinished;
105+
return this;
106+
}
107+
89108
public long getRecordsToSkip() {
90-
return recordOffset;
109+
return recordToSkip;
91110
}
92111

93112
public long getStartingOffset() {
94113
return startingOffset;
95114
}
96115

116+
public boolean isLakeSplitFinished() {
117+
return isLakeSplitFinished;
118+
}
119+
97120
public Optional<Long> getStoppingOffset() {
98121
return stoppingOffset >= 0 ? Optional.of(stoppingOffset) : Optional.empty();
99122
}
@@ -130,8 +153,8 @@ public String toString() {
130153
return "LakeSnapshotAndFlussLogSplit{"
131154
+ "lakeSnapshotSplits="
132155
+ lakeSnapshotSplits
133-
+ ", recordOffset="
134-
+ recordOffset
156+
+ ", recordToSkip="
157+
+ recordToSkip
135158
+ ", currentLakeSplitIndex="
136159
+ currentLakeSplitIndex
137160
+ ", startingOffset="

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@ public class LakeSnapshotAndFlussLogSplitState extends SourceSplitState {
3030
private int currentLakeSplitIndex;
3131
private long nextLogOffset;
3232

33+
private boolean isLakeSplitFinished;
34+
3335
public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit split) {
3436
super(split);
3537
this.recordsToSkip = split.getRecordsToSkip();
3638
this.split = split;
3739
this.currentLakeSplitIndex = split.getCurrentLakeSplitIndex();
3840
this.nextLogOffset = split.getStartingOffset();
41+
this.isLakeSplitFinished = split.isLakeSplitFinished();
3942
}
4043

4144
public void setRecordsToSkip(long recordsToSkip) {
@@ -47,13 +50,16 @@ public void setCurrentLakeSplitIndex(int currentLakeSplitIndex) {
4750
}
4851

4952
public void setNextLogOffset(long nextOffset) {
53+
// if set offset, means lake splits is finished
54+
isLakeSplitFinished = true;
5055
this.nextLogOffset = nextOffset;
5156
}
5257

5358
@Override
5459
public SourceSplitBase toSourceSplit() {
5560
return split.updateWithCurrentLakeSplitIndex(currentLakeSplitIndex)
5661
.updateWithRecordsToSkip(recordsToSkip)
57-
.updateWithStartingOffset(nextLogOffset);
62+
.updateWithStartingOffset(nextLogOffset)
63+
.updateWithLakeSplitFinished(isLakeSplitFinished);
5864
}
5965
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@
4444

4545
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
4646
import org.apache.flink.runtime.source.event.SourceEventWrapper;
47-
import org.apache.flink.runtime.state.StateInitializationContext;
47+
import org.apache.flink.streaming.api.graph.StreamConfig;
4848
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
4949
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
50+
import org.apache.flink.streaming.api.operators.Output;
5051
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
5152
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
53+
import org.apache.flink.streaming.runtime.tasks.StreamTask;
5254

5355
import javax.annotation.Nullable;
5456

@@ -109,34 +111,38 @@ public TieringCommitOperator(
109111
this.flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(flussConf);
110112
this.collectedTableBucketWriteResults = new HashMap<>();
111113
this.flussConfig = flussConf;
114+
this.operatorEventGateway =
115+
parameters
116+
.getOperatorEventDispatcher()
117+
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
112118
this.setup(
113119
parameters.getContainingTask(),
114120
parameters.getStreamConfig(),
115121
parameters.getOutput());
116-
operatorEventGateway =
117-
parameters
118-
.getOperatorEventDispatcher()
119-
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
120-
}
121-
122-
@Override
123-
public void open() {
124-
flussTableLakeSnapshotCommitter.open();
125-
connection = ConnectionFactory.createConnection(flussConfig);
126-
admin = connection.getAdmin();
127122
}
128123

129124
@Override
130-
public void initializeState(StateInitializationContext context) throws Exception {
131-
super.initializeState(context);
125+
public void setup(
126+
StreamTask<?, ?> containingTask,
127+
StreamConfig config,
128+
Output<StreamRecord<CommittableMessage<Committable>>> output) {
129+
super.setup(containingTask, config, output);
132130
int attemptNumber = getRuntimeContext().getAttemptNumber();
133131
if (attemptNumber > 0) {
132+
LOG.info("Send TieringRestoreEvent");
134133
// attempt number is greater than zero, the job must failover
135134
operatorEventGateway.sendEventToCoordinator(
136135
new SourceEventWrapper(new TieringRestoreEvent()));
137136
}
138137
}
139138

139+
@Override
140+
public void open() {
141+
flussTableLakeSnapshotCommitter.open();
142+
connection = ConnectionFactory.createConnection(flussConfig);
143+
admin = connection.getAdmin();
144+
}
145+
140146
@Override
141147
public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
142148
throws Exception {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ void testSerializeAndDeserializeLakeSnapshotAndFlussLogSplit() throws IOExceptio
9090
"2025-08-18",
9191
Collections.singletonList(LAKE_SPLIT),
9292
EARLIEST_OFFSET,
93-
STOPPING_OFFSET);
93+
STOPPING_OFFSET,
94+
2,
95+
1,
96+
true);
9497

9598
DataOutputSerializer output = new DataOutputSerializer(STOPPING_OFFSET);
9699
serializer.serialize(output, originalSplit);
@@ -105,11 +108,14 @@ void testSerializeAndDeserializeLakeSnapshotAndFlussLogSplit() throws IOExceptio
105108
assertThat(deserializedSplit instanceof LakeSnapshotAndFlussLogSplit).isTrue();
106109
LakeSnapshotAndFlussLogSplit result = (LakeSnapshotAndFlussLogSplit) deserializedSplit;
107110

108-
assertThat(tableBucket).isEqualTo(result.getTableBucket());
109-
assertThat("2025-08-18").isEqualTo(result.getPartitionName());
110-
assertThat(Collections.singletonList(LAKE_SPLIT)).isEqualTo(result.getLakeSplits());
111-
assertThat(EARLIEST_OFFSET).isEqualTo(result.getStartingOffset());
112-
assertThat((long) STOPPING_OFFSET).isEqualTo(result.getStoppingOffset().get());
111+
assertThat(result.getTableBucket()).isEqualTo(tableBucket);
112+
assertThat(result.getPartitionName()).isEqualTo("2025-08-18");
113+
assertThat(result.getLakeSplits()).isEqualTo(Collections.singletonList(LAKE_SPLIT));
114+
assertThat(result.getStartingOffset()).isEqualTo(EARLIEST_OFFSET);
115+
assertThat(result.getStoppingOffset().get()).isEqualTo(STOPPING_OFFSET);
116+
assertThat(result.getCurrentLakeSplitIndex()).isEqualTo(1);
117+
assertThat(result.getRecordsToSkip()).isEqualTo(2);
118+
assertThat(result.isLakeSplitFinished()).isEqualTo(true);
113119
}
114120

115121
@Test
@@ -149,8 +155,8 @@ public int getVersion() {
149155

150156
private static class TestLakeSplit implements LakeSplit {
151157

152-
private int bucket;
153-
private List<String> partition;
158+
private final int bucket;
159+
private final List<String> partition;
154160

155161
public TestLakeSplit(int bucket, List<String> partition) {
156162
this.bucket = bucket;

0 commit comments

Comments
 (0)