Skip to content

Commit 608d566

Browse files
committed
minor fix from Jark
1 parent 495b82d commit 608d566

File tree

8 files changed

+39
-25
lines changed

8 files changed

+39
-25
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ private void handleLookupResponse(
316316
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
317317
if (pbLookupRespForBucket.hasErrorCode()) {
318318
ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket);
319-
handleLookupError(tableBucket, destination, error, lookupBatch.lookups(), "");
319+
handleLookupError(tableBucket, destination, error, lookupBatch.lookups(), "lookup");
320320
} else {
321321
List<byte[]> byteValues =
322322
pbLookupRespForBucket.getValuesList().stream()
@@ -353,7 +353,11 @@ private void handlePrefixLookupResponse(
353353
if (pbRespForBucket.hasErrorCode()) {
354354
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
355355
handleLookupError(
356-
tableBucket, destination, error, prefixLookupBatch.lookups(), "prefix ");
356+
tableBucket,
357+
destination,
358+
error,
359+
prefixLookupBatch.lookups(),
360+
"prefix lookup");
357361
} else {
358362
List<List<byte[]>> result = new ArrayList<>(pbRespForBucket.getValueListsCount());
359363
for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) {
@@ -374,7 +378,7 @@ private void handleLookupRequestException(
374378
ApiError error = ApiError.fromThrowable(t);
375379
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
376380
handleLookupError(
377-
lookupBatch.tableBucket(), destination, error, lookupBatch.lookups(), "");
381+
lookupBatch.tableBucket(), destination, error, lookupBatch.lookups(), "lookup");
378382
}
379383
}
380384

@@ -387,7 +391,7 @@ private void handlePrefixLookupException(
387391
destination,
388392
error,
389393
lookupBatch.lookups(),
390-
"prefix ");
394+
"prefix lookup");
391395
}
392396
}
393397

@@ -446,15 +450,15 @@ private void handleLookupError(
446450
for (AbstractLookupQuery<?> lookup : lookups) {
447451
if (canRetry(lookup, error.exception())) {
448452
LOG.warn(
449-
"Get error {}lookup response on table bucket {}, retrying ({} attempts left). Error: {}",
453+
"Get error {} response on table bucket {}, retrying ({} attempts left). Error: {}",
450454
lookupType,
451455
tableBucket,
452456
maxRetries - lookup.retries(),
453457
error.formatErrMsg());
454458
reEnqueueLookup(lookup);
455459
} else {
456460
LOG.warn(
457-
"Get error {}lookup response on table bucket {}, fail. Error: {}",
461+
"Get error {} response on table bucket {}, fail. Error: {}",
458462
lookupType,
459463
tableBucket,
460464
error.formatErrMsg());

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,21 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.row.InternalRow;
2222

23+
import javax.annotation.concurrent.NotThreadSafe;
24+
2325
import java.util.concurrent.CompletableFuture;
2426

2527
/**
26-
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key.
28+
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
29+
* lookuper has retriable ability to handle transient errors during lookup operations which is
30+
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
31+
*
32+
* <p>Note: Lookuper instances are not thread-safe.
2733
*
2834
* @since 0.6
2935
*/
3036
@PublicEvolving
37+
@NotThreadSafe
3138
public interface Lookuper {
3239

3340
/**

fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.junit.jupiter.api.BeforeEach;
4444
import org.junit.jupiter.api.Test;
4545

46+
import java.time.Duration;
4647
import java.util.HashMap;
4748
import java.util.List;
4849
import java.util.Map;
@@ -54,6 +55,7 @@
5455
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
5556
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
5657
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
58+
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
5759
import static org.assertj.core.api.Assertions.assertThat;
5860
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5961

@@ -269,6 +271,12 @@ void testRetryStopsIfFutureCompleted() throws Exception {
269271
// first attempt fails
270272
return createFailedResponse(request, new TimeoutException("timeout"));
271273
} else {
274+
try {
275+
// Avoid attempting again too quickly
276+
Thread.sleep(100);
277+
} catch (InterruptedException e) {
278+
throw new RuntimeException(e);
279+
}
272280
// subsequent attempts should not happen if we complete the future
273281
throw new AssertionError(
274282
"Should not retry after future is completed externally");
@@ -281,15 +289,15 @@ void testRetryStopsIfFutureCompleted() throws Exception {
281289
lookupQueue.appendLookup(query);
282290

283291
// complete the future externally before retry happens
284-
Thread.sleep(100); // wait for first attempt to fail
292+
waitUntil(() -> attemptCount.get() >= 1, Duration.ofSeconds(5), "first attempt to be made");
285293
query.future().complete("external".getBytes());
286294

287295
// verify: completed externally
288296
byte[] result = query.future().get(1, TimeUnit.SECONDS);
289297
assertThat(result).isEqualTo("external".getBytes());
290-
// retries is 1 because we incremented it when re-enqueuing, but didn't send again
291-
assertThat(query.retries()).isLessThanOrEqualTo(1);
292-
assertThat(attemptCount.get()).isEqualTo(1); // only first attempt
298+
// retries is less than 3, because we stop the query so it won't send again.
299+
assertThat(query.retries()).isGreaterThanOrEqualTo(0).isLessThan(3);
300+
assertThat(attemptCount.get()).isGreaterThanOrEqualTo(1).isLessThan(4);
293301
}
294302

295303
@Test

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ public DynamicTableSource createDynamicTableSource(Context context) {
148148
partitionKeyIndexes,
149149
isStreamingMode,
150150
startupOptions,
151-
tableOptions.get(LookupOptions.MAX_RETRIES),
152151
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
153152
cache,
154153
partitionDiscoveryIntervalMs,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ public class FlinkTableSource
127127
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
128128

129129
// options for lookup source
130-
private final int lookupMaxRetryTimes;
131130
private final boolean lookupAsync;
132131
@Nullable private final LookupCache cache;
133132

@@ -166,7 +165,6 @@ public FlinkTableSource(
166165
int[] partitionKeyIndexes,
167166
boolean streaming,
168167
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
169-
int lookupMaxRetryTimes,
170168
boolean lookupAsync,
171169
@Nullable LookupCache cache,
172170
long scanPartitionDiscoveryIntervalMs,
@@ -183,7 +181,6 @@ public FlinkTableSource(
183181
this.streaming = streaming;
184182
this.startupOptions = checkNotNull(startupOptions, "startupOptions must not be null");
185183

186-
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
187184
this.lookupAsync = lookupAsync;
188185
this.cache = cache;
189186

@@ -253,7 +250,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
253250
flussConfig,
254251
tableOutputType,
255252
primaryKeyIndexes,
256-
lookupMaxRetryTimes,
257253
projectedFields);
258254
} else if (limit > 0) {
259255
results =
@@ -447,7 +443,6 @@ public DynamicTableSource copy() {
447443
partitionKeyIndexes,
448444
streaming,
449445
startupOptions,
450-
lookupMaxRetryTimes,
451446
lookupAsync,
452447
cache,
453448
scanPartitionDiscoveryIntervalMs,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
120120
RemainingFilter remainingFilter = lookupNormalizer.createRemainingFilter(keyRow);
121121
InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow);
122122

123-
// the retry mechanism is now handled by the underlying LookupClient layer
123+
// the retry mechanism is now handled by the underlying LookupClient layer,
124+
// we can't call lookuper.lookup() in whenComplete callback as lookuper is not thread-safe.
124125
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
125126
lookuper.lookup(flussKeyRow)
126127
.whenComplete(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public static Collection<RowData> querySingleRow(
262262
Configuration flussConfig,
263263
RowType sourceOutputType,
264264
int[] primaryKeyIndexes,
265-
int lookupMaxRetryTimes,
266265
@Nullable int[] projectedFields) {
267266
LookupNormalizer lookupNormalizer =
268267
createPrimaryKeyLookupNormalizer(primaryKeyIndexes, sourceOutputType);

website/docs/engine-flink/options.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,17 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties)
109109
| Option | Type | Default | Description |
110110
|------------------------------------------|------------|---------|-----------------------------------------------------------------------------------------------------------------------------|
111111
| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. |
112-
| lookup.cache | Enum | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | |
113-
| lookup.max-retries | Integer | 3 | The maximum allowed retries if a lookup operation fails. | |
114-
| lookup.partial-cache.expire-after-access | Duration | (None) | Duration to expire an entry in the cache after accessing. | |
115-
| lookup.partial-cache.expire-after-write | Duration | (None) | Duration to expire an entry in the cache after writing. | |
116-
| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | |
117-
| lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. | |
112+
| lookup.cache | Enum | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. |
113+
| lookup.max-retries | Integer | 3 | The maximum allowed retries if a lookup operation fails. Setting this value will override option 'client.lookup.max-retries'.|
114+
| lookup.partial-cache.expire-after-access | Duration | (None) | Duration to expire an entry in the cache after accessing. |
115+
| lookup.partial-cache.expire-after-write | Duration | (None) | Duration to expire an entry in the cache after writing. |
116+
| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. |
117+
| lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. |
118118
| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. |
119119
| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. |
120120
| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. |
121121
| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. |
122+
| client.lookup.max-retries | Integer | 3 | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. |
122123

123124

124125
## Write Options

0 commit comments

Comments
 (0)