Skip to content

Commit 2f99ece

Browse files
committed
Fluss Flink Lookup function return null rather than PartitionNotExistException if partition not exists.
1 parent 97da41b commit 2f99ece

File tree

9 files changed

+50
-14
lines changed

9 files changed

+50
-14
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.bucketing.BucketingFunction;
2121
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2222
import com.alibaba.fluss.client.table.getter.PartitionGetter;
23+
import com.alibaba.fluss.exception.PartitionNotExistException;
2324
import com.alibaba.fluss.metadata.DataLakeFormat;
2425
import com.alibaba.fluss.metadata.TableBucket;
2526
import com.alibaba.fluss.metadata.TableInfo;
@@ -150,9 +151,18 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
150151

151152
Long partitionId = null;
152153
if (partitionGetter != null) {
153-
partitionId =
154-
getPartitionId(
155-
prefixKey, partitionGetter, tableInfo.getTablePath(), metadataUpdater);
154+
try {
155+
partitionId =
156+
getPartitionId(
157+
prefixKey,
158+
partitionGetter,
159+
tableInfo.getTablePath(),
160+
metadataUpdater);
161+
} catch (PartitionNotExistException e) {
162+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
163+
future.completeExceptionally(e);
164+
return future;
165+
}
156166
}
157167

158168
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.bucketing.BucketingFunction;
2121
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2222
import com.alibaba.fluss.client.table.getter.PartitionGetter;
23+
import com.alibaba.fluss.exception.PartitionNotExistException;
2324
import com.alibaba.fluss.metadata.DataLakeFormat;
2425
import com.alibaba.fluss.metadata.TableBucket;
2526
import com.alibaba.fluss.metadata.TableInfo;
@@ -107,14 +108,22 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
107108
bucketKeyEncoder == primaryKeyEncoder
108109
? pkBytes
109110
: bucketKeyEncoder.encodeKey(lookupKey);
110-
Long partitionId =
111-
partitionGetter == null
112-
? null
113-
: getPartitionId(
111+
Long partitionId = null;
112+
if (partitionGetter != null) {
113+
try {
114+
partitionId =
115+
getPartitionId(
114116
lookupKey,
115117
partitionGetter,
116118
tableInfo.getTablePath(),
117119
metadataUpdater);
120+
} catch (PartitionNotExistException e) {
121+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
122+
future.completeExceptionally(e);
123+
return future;
124+
}
125+
}
126+
118127
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
119128
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
120129
return lookupClient

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alibaba.fluss.config.ConfigOptions;
2727
import com.alibaba.fluss.config.Configuration;
2828
import com.alibaba.fluss.exception.FlussRuntimeException;
29+
import com.alibaba.fluss.exception.PartitionNotExistException;
2930
import com.alibaba.fluss.exception.RetriableException;
3031
import com.alibaba.fluss.metadata.PhysicalTablePath;
3132
import com.alibaba.fluss.metadata.TableBucket;
@@ -187,7 +188,8 @@ public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
187188
*
188189
* <p>and update partition metadata .
189190
*/
190-
public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePath) {
191+
public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePath)
192+
throws PartitionNotExistException {
191193
if (!cluster.getPartitionId(physicalTablePath).isPresent()) {
192194
updateMetadata(null, Collections.singleton(physicalTablePath), null);
193195
}
@@ -251,7 +253,8 @@ public void updatePhysicalTableMetadata(Set<PhysicalTablePath> physicalTablePath
251253
protected void updateMetadata(
252254
@Nullable Set<TablePath> tablePaths,
253255
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
254-
@Nullable Collection<Long> tablePartitionIds) {
256+
@Nullable Collection<Long> tablePartitionIds)
257+
throws PartitionNotExistException {
255258
try {
256259
synchronized (this) {
257260
cluster =
@@ -266,6 +269,9 @@ protected void updateMetadata(
266269
Throwable t = ExceptionUtils.stripExecutionException(e);
267270
if (t instanceof RetriableException || t instanceof TimeoutException) {
268271
LOG.warn("Failed to update metadata, but the exception is re-triable.", t);
272+
} else if (t instanceof PartitionNotExistException) {
273+
LOG.warn("Failed to update metadata because partition is not exist", t);
274+
throw (PartitionNotExistException) t;
269275
} else {
270276
throw new FlussRuntimeException("Failed to update metadata", t);
271277
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public static Long getPartitionId(
124124
InternalRow row,
125125
PartitionGetter partitionGetter,
126126
TablePath tablePath,
127-
MetadataUpdater metadataUpdater) {
127+
MetadataUpdater metadataUpdater)
128+
throws PartitionNotExistException {
128129
checkNotNull(partitionGetter, "partitionGetter shouldn't be null.");
129130
String partitionName = partitionGetter.getPartition(row);
130131
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);

fluss-client/src/main/java/com/alibaba/fluss/client/write/DynamicPartitionCreator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private boolean forceCheckPartitionExist(PhysicalTablePath physicalTablePath) {
108108
idExist = metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
109109
} catch (Exception e) {
110110
Throwable t = ExceptionUtils.stripExecutionException(e);
111-
if (t.getCause() instanceof PartitionNotExistException) {
111+
if (t instanceof PartitionNotExistException) {
112112
if (!dynamicPartitionEnabled) {
113113
throw new PartitionNotExistException(
114114
String.format(

fluss-client/src/test/java/com/alibaba/fluss/client/table/AutoPartitionedTableITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,6 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception {
331331
// test scan a not exist partition's log
332332
LogScanner logScanner = table.newScan().createLogScanner();
333333
assertThatThrownBy(() -> logScanner.subscribe(100L, 0, 0))
334-
.cause()
335334
.isInstanceOf(PartitionNotExistException.class)
336335
.hasMessageContaining("Partition not exist for partition ids: [100]");
337336
logScanner.close();

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.client.lookup.Lookuper;
2525
import com.alibaba.fluss.client.table.Table;
2626
import com.alibaba.fluss.config.Configuration;
27+
import com.alibaba.fluss.exception.PartitionNotExistException;
2728
import com.alibaba.fluss.exception.TableNotExistException;
2829
import com.alibaba.fluss.flink.row.FlinkAsFlussRow;
2930
import com.alibaba.fluss.flink.source.lookup.LookupNormalizer.RemainingFilter;
@@ -163,7 +164,10 @@ private void handleLookupFailed(
163164
int currentRetry,
164165
InternalRow keyRow,
165166
@Nullable RemainingFilter remainingFilter) {
166-
if (throwable instanceof TableNotExistException) {
167+
if (throwable instanceof PartitionNotExistException) {
168+
LOG.debug("Partition is not exist. Ignore and return null. ", throwable);
169+
resultFuture.complete(Collections.emptyList());
170+
} else if (throwable instanceof TableNotExistException) {
167171
LOG.error("Table '{}' not found ", tablePath, throwable);
168172
resultFuture.completeExceptionally(
169173
new RuntimeException("Fluss table '" + tablePath + "' not found.", throwable));

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/lookup/FlinkLookupFunction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.client.lookup.Lookuper;
2525
import com.alibaba.fluss.client.table.Table;
2626
import com.alibaba.fluss.config.Configuration;
27+
import com.alibaba.fluss.exception.PartitionNotExistException;
2728
import com.alibaba.fluss.flink.row.FlinkAsFlussRow;
2829
import com.alibaba.fluss.flink.utils.FlinkConversions;
2930
import com.alibaba.fluss.flink.utils.FlinkUtils;
@@ -141,6 +142,11 @@ public Collection<RowData> lookup(RowData keyRow) {
141142
}
142143
}
143144
return projectedRows;
145+
} catch (PartitionNotExistException partitionNotExistException) {
146+
LOG.debug(
147+
"Partition is not exist. Ignore and return null. ",
148+
partitionNotExistException);
149+
return Collections.emptyList();
144150
} catch (Exception e) {
145151
LOG.error(String.format("Fluss lookup error, retry times = %d", retry), e);
146152
if (retry >= maxRetryTimes) {

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,8 @@ private String prepareDimTableAndSourceTable(
12551255
Row.of(1, "name1", 11, partition1),
12561256
Row.of(2, "name2", 2, partition1),
12571257
Row.of(3, "name33", 33, partition2),
1258-
Row.of(10, "name0", 44, partition2));
1258+
Row.of(10, "name0", 44, partition2),
1259+
Row.of(1, "name1", 11, "empty-partition"));
12591260
Schema.Builder builder =
12601261
Schema.newBuilder()
12611262
.column("a", DataTypes.INT())

0 commit comments

Comments
 (0)