Skip to content

Commit c5ca56b

Browse files
[bugfix] FlinkSourceSplitReader sends PartitionBucketsUnsubscribedEvent to FlinkSourceEnumerator when subscribing to a removed partition (#1220) (#1248)
Co-authored-by: ocean.wy <[email protected]>
1 parent 0a8622b commit c5ca56b

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS
108108
private final Set<String> emptyLogSplits;
109109
// track split IDs corresponding to removed partitions
110110
private final Set<String> removedSplits = new HashSet<>();
111+
// Set to collect table buckets that are unsubscribed.
112+
private Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
111113

112114
public FlinkSourceSplitReader(
113115
Configuration flussConf,
@@ -267,6 +269,8 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
267269
if (partitionNotExist) {
268270
// mark the not exist partition to be removed
269271
removedSplits.add(split.splitId());
272+
// mark the table bucket to be unsubscribed
273+
unsubscribedTableBuckets.add(tableBucket);
270274
LOG.warn(
271275
"Partition {} does not exist when subscribing to log for split {}. Skipping subscription.",
272276
partitionId,
@@ -290,8 +294,6 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
290294
}
291295

292296
public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) {
293-
// Set to collect table buckets that are unsubscribed.
294-
Set<TableBucket> unsubscribedTableBuckets = new HashSet<>();
295297
// First, if the current active bounded split belongs to a removed partition,
296298
// finish it so it will not be restored.
297299
if (currentBoundedSplit != null) {
@@ -352,7 +354,9 @@ public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) {
352354
}
353355
}
354356

355-
return unsubscribedTableBuckets;
357+
Set<TableBucket> currentUnsubscribedTableBuckets = this.unsubscribedTableBuckets;
358+
this.unsubscribedTableBuckets = new HashSet<>();
359+
return currentUnsubscribedTableBuckets;
356360
}
357361

358362
private void checkSnapshotSplitOrStartNext() {

0 commit comments

Comments
 (0)