3131import com .alibaba .fluss .connector .flink .source .split .LogSplit ;
3232import com .alibaba .fluss .connector .flink .source .split .SnapshotSplit ;
3333import com .alibaba .fluss .connector .flink .source .split .SourceSplitBase ;
34+ import com .alibaba .fluss .exception .PartitionNotExistException ;
3435import com .alibaba .fluss .metadata .TableBucket ;
3536import com .alibaba .fluss .metadata .TablePath ;
3637import com .alibaba .fluss .types .RowType ;
3738import com .alibaba .fluss .utils .CloseableIterator ;
39+ import com .alibaba .fluss .utils .ExceptionUtils ;
3840
3941import org .apache .flink .connector .base .source .reader .RecordsWithSplitIds ;
4042import org .apache .flink .connector .base .source .reader .splitreader .SplitReader ;
@@ -103,6 +105,8 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS
103105 private LakeSplitReaderGenerator lakeSplitReaderGenerator ;
104106
105107 private final Set <String > emptyLogSplits ;
108+ // track split IDs corresponding to removed partitions
109+ private final Set <String > removedSplits = new HashSet <>();
106110
107111 public FlinkSourceSplitReader (
108112 Configuration flussConf ,
@@ -128,6 +132,13 @@ public FlinkSourceSplitReader(
128132
129133 @ Override
130134 public RecordsWithSplitIds <RecordAndPos > fetch () throws IOException {
135+ if (!removedSplits .isEmpty ()) {
136+ FlinkRecordsWithSplitIds records =
137+ new FlinkRecordsWithSplitIds (
138+ new HashSet <>(removedSplits ), flinkSourceReaderMetrics );
139+ removedSplits .clear ();
140+ return records ;
141+ }
131142 checkSnapshotSplitOrStartNext ();
132143 if (currentBoundedSplitReader != null ) {
133144 CloseableIterator <RecordAndPos > recordIterator = currentBoundedSplitReader .readBatch ();
@@ -242,37 +253,78 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
242253 Long partitionId = tableBucket .getPartitionId ();
243254 int bucket = tableBucket .getBucket ();
244255 if (partitionId != null ) {
245- logScanner .subscribe (partitionId , bucket , startingOffset );
256+ // Try to subscribe using the partition id.
257+ try {
258+ logScanner .subscribe (partitionId , bucket , startingOffset );
259+ } catch (Exception e ) {
260+ // the PartitionNotExistException may still happens when partition is removed
261+ // but Flink source reader failover before aware of it
262+ // Traverse the exception chain to check for PartitionNotExistException.
263+ boolean partitionNotExist =
264+ ExceptionUtils .findThrowable (e , PartitionNotExistException .class )
265+ .isPresent ();
266+ if (partitionNotExist ) {
267+ // mark the not exist partition to be removed
268+ removedSplits .add (split .splitId ());
269+ LOG .warn (
270+ "Partition {} does not exist when subscribing to log for split {}. Skipping subscription." ,
271+ partitionId ,
272+ split .splitId ());
273+ return ;
274+ }
275+ }
246276 } else {
277+ // If no partition id, subscribe by bucket only.
247278 logScanner .subscribe (bucket , startingOffset );
248279 }
249280
250281 LOG .info (
251282 "Subscribe to read log for split {} from offset {}." ,
252283 split .splitId (),
253284 startingOffset );
254-
255- // Track the new bucket in metrics
285+ // Track the new bucket in metrics and internal state.
256286 flinkSourceReaderMetrics .registerTableBucket (tableBucket );
257287 subscribedBuckets .put (tableBucket , split .splitId ());
258288 }
259289 }
260290
261291 public Set <TableBucket > removePartitions (Map <Long , String > removedPartitions ) {
262- // todo, may consider to close the current snapshot reader if
263- // the current snapshot split is in the partition buckets
264-
265- // may remove from pending snapshot splits
292+ // Set to collect table buckets that are unsubscribed.
266293 Set <TableBucket > unsubscribedTableBuckets = new HashSet <>();
294+ // First, if the current active bounded split belongs to a removed partition,
295+ // finish it so it will not be restored.
296+ if (currentBoundedSplit != null ) {
297+ TableBucket currentBucket = currentBoundedSplit .getTableBucket ();
298+ if (removedPartitions .containsKey (currentBucket .getPartitionId ())) {
299+ try {
300+ // Mark the current split as finished.
301+ removedSplits .add (currentBoundedSplit .splitId ());
302+ closeCurrentBoundedSplit ();
303+ unsubscribedTableBuckets .add (currentBucket );
304+ LOG .info (
305+ "Mark current bounded split {} as finished for removed partition {}." ,
306+ currentBucket ,
307+ removedPartitions .get (currentBucket .getPartitionId ()));
308+ } catch (IOException e ) {
309+ LOG .warn (
310+ "Failed to close current bounded split for removed partition {}." ,
311+ removedPartitions .get (currentBucket .getPartitionId ()),
312+ e );
313+ }
314+ }
315+ }
316+
317+ // Remove pending snapshot splits whose table buckets belong to removed partitions.
267318 Iterator <SourceSplitBase > snapshotSplitIterator = boundedSplits .iterator ();
268319 while (snapshotSplitIterator .hasNext ()) {
269320 SourceSplitBase split = snapshotSplitIterator .next ();
270321 TableBucket tableBucket = split .getTableBucket ();
271322 if (removedPartitions .containsKey (tableBucket .getPartitionId ())) {
323+ removedSplits .add (split .splitId ());
272324 snapshotSplitIterator .remove ();
273325 unsubscribedTableBuckets .add (tableBucket );
274326 LOG .info (
275- "Cancel to read snapshot split {} for non-existed partition {}." ,
327+ "Cancel reading snapshot split {} for removed partition {}." ,
276328 split .splitId (),
277329 removedPartitions .get (tableBucket .getPartitionId ()));
278330 }
@@ -289,6 +341,7 @@ public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) {
289341 logScanner .unsubscribe (
290342 checkNotNull (tableBucket .getPartitionId (), "partition id must be not null" ),
291343 tableBucket .getBucket ());
344+ removedSplits .add (tableBucketAndSplit .getValue ());
292345 subscribeTableBucketIterator .remove ();
293346 unsubscribedTableBuckets .add (tableBucket );
294347 LOG .info (
@@ -438,7 +491,7 @@ private long getStoppingOffset(TableBucket tableBucket) {
438491 return stoppingOffsets .getOrDefault (tableBucket , Long .MAX_VALUE );
439492 }
440493
441- public FlinkRecordsWithSplitIds finishCurrentBoundedSplit () throws IOException {
494+ private FlinkRecordsWithSplitIds finishCurrentBoundedSplit () throws IOException {
442495 Set <String > finishedSplits =
443496 currentBoundedSplit instanceof HybridSnapshotLogSplit
444497 // is hybrid split, not to finish this split
0 commit comments