Skip to content

Commit ec14e1b

Browse files
authored
[hotfix] Tiering Source Enumerator should clear pending split when failover (apache#1820)
1 parent e455223 commit ec14e1b

File tree

5 files changed

+75
-9
lines changed

5 files changed

+75
-9
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
2626
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2727
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
28-
import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
28+
import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
2929
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
3030
import org.apache.fluss.flink.tiering.source.TieringSource;
3131
import org.apache.fluss.lake.committer.BucketOffset;
@@ -129,10 +129,10 @@ public void setup(
129129
super.setup(containingTask, config, output);
130130
int attemptNumber = getRuntimeContext().getAttemptNumber();
131131
if (attemptNumber > 0) {
132-
LOG.info("Send TieringRestoreEvent");
132+
LOG.info("Send TieringFailoverEvent, current attempt number: {}", attemptNumber);
133133
// attempt number is greater than zero, the job must failover
134134
operatorEventGateway.sendEventToCoordinator(
135-
new SourceEventWrapper(new TieringRestoreEvent()));
135+
new SourceEventWrapper(new TieringFailOverEvent()));
136136
}
137137
}
138138

@@ -175,6 +175,9 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
175175
new SourceEventWrapper(
176176
new FailedTieringEvent(
177177
tableId, ExceptionUtils.stringifyException(e))));
178+
LOG.warn(
179+
"Fail to commit tiering write result, will try to tier again in next round.",
180+
e);
178181
} finally {
179182
collectedTableBucketWriteResults.remove(tableId);
180183
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import org.apache.flink.api.connector.source.SourceEvent;
2222

23-
/** SourceEvent used to represent tiering is restoring. */
24-
public class TieringRestoreEvent implements SourceEvent {
23+
/** SourceEvent used to represent tiering is failover. */
24+
public class TieringFailOverEvent implements SourceEvent {
2525
private static final long serialVersionUID = 1L;
2626
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
2727
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2828
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
29-
import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
29+
import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
3030
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
3131
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
3232
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -218,13 +218,15 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
218218
}
219219
}
220220

221-
if (sourceEvent instanceof TieringRestoreEvent) {
221+
if (sourceEvent instanceof TieringFailOverEvent) {
222222
LOG.info(
223-
"Receiving tiering restore event, mark current tiering table epoch {} as failed.",
223+
"Receiving tiering failover event, mark current tiering table epoch {} as failed.",
224224
tieringTableEpochs);
225225
// we need to make all as failed
226226
failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
227227
tieringTableEpochs.clear();
228+
// also clean all pending splits since we mark all as failed
229+
pendingSplits.clear();
228230
}
229231

230232
if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2222
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
23+
import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
2324
import org.apache.fluss.flink.tiering.source.TieringTestBase;
2425
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
2526
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
@@ -644,6 +645,48 @@ void testHandleFailedTieringTableEvent() throws Throwable {
644645
}
645646
}
646647

648+
@Test
649+
void testHandleFailOverEvent() throws Throwable {
650+
TablePath tablePath1 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table1");
651+
createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
652+
653+
TablePath tablePath2 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table2");
654+
createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
655+
656+
int numSubtasks = 1;
657+
try (MockSplitEnumeratorContext<TieringSplit> context =
658+
new MockSplitEnumeratorContext<>(numSubtasks)) {
659+
TieringSourceEnumerator enumerator =
660+
new TieringSourceEnumerator(flussConf, context, 500);
661+
662+
enumerator.start();
663+
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
664+
665+
// register one reader
666+
int subtaskId = 0;
667+
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
668+
669+
// handle split request
670+
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
671+
672+
// should get one tiering split, and the split is for tablePath1
673+
verifyTieringSplitAssignment(context, 1, tablePath1);
674+
675+
// clean assignment
676+
context.getSplitsAssignmentSequence().clear();
677+
678+
// enumerator handle TieringFailOverEvent, which will mark current tiering tablePath1 as
679+
// fail, and all pending splits should be clear
680+
enumerator.handleSourceEvent(subtaskId, new TieringFailOverEvent());
681+
682+
// handle split request
683+
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
684+
// now, should get another one tiering split, the split is for tablePath2 since all
685+
// pending split for tablePath1 is clear
686+
verifyTieringSplitAssignment(context, 1, tablePath2);
687+
}
688+
}
689+
647690
private static CommitLakeTableSnapshotRequest genCommitLakeTableSnapshotRequest(
648691
long tableId,
649692
@Nullable Long partitionId,
@@ -697,4 +740,23 @@ private static List<TieringSplit> sortSplits(List<TieringSplit> splits) {
697740
.sorted(Comparator.comparing(Object::toString))
698741
.collect(Collectors.toList());
699742
}
743+
744+
private void verifyTieringSplitAssignment(
745+
MockSplitEnumeratorContext<TieringSplit> context,
746+
int expectedSplitSize,
747+
TablePath expectedTablePath)
748+
throws Throwable {
749+
waitUntilTieringTableSplitAssignmentReady(context, 1, 200);
750+
List<SplitsAssignment<TieringSplit>> actualAssignment =
751+
context.getSplitsAssignmentSequence();
752+
753+
List<TieringSplit> allTieringSplits =
754+
actualAssignment.stream()
755+
.flatMap(assignments -> assignments.assignment().values().stream())
756+
.flatMap(List::stream)
757+
.collect(Collectors.toList());
758+
assertThat(allTieringSplits).hasSize(expectedSplitSize);
759+
assertThat(allTieringSplits)
760+
.allMatch(tieringSplit -> tieringSplit.getTablePath().equals(expectedTablePath));
761+
}
700762
}

fluss-test-coverage/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@
395395
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
396396
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
397397
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
398-
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
399398
<exclude>
400399
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
401400
</exclude>

0 commit comments

Comments
 (0)