Skip to content

Commit 97d6f34

Browse files
authored
[client] Fix the MetadataRequest sent by LogFetcher timeout exception when upgrading cluster (#1666)
1 parent e5f3ea7 commit 97d6f34

File tree

7 files changed

+92
-24
lines changed

7 files changed

+92
-24
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public void updatePhysicalTableMetadata(Set<PhysicalTablePath> physicalTablePath
254254
}
255255

256256
@VisibleForTesting
257-
protected void updateMetadata(
257+
public void updateMetadata(
258258
@Nullable Set<TablePath> tablePaths,
259259
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
260260
@Nullable Collection<Long> tablePartitionIds)

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,19 @@ public Map<TableBucket, List<ScanRecord>> collectFetch() {
162162
* Set up a fetch request for any node that we have assigned buckets for which doesn't already
163163
* have an in-flight fetch or pending fetch data.
164164
*/
165-
public synchronized void sendFetches() {
166-
Map<Integer, FetchLogRequest> fetchRequestMap = prepareFetchLogRequests();
167-
fetchRequestMap.forEach(
168-
(nodeId, fetchLogRequest) -> {
169-
LOG.debug("Adding pending request for node id {}", nodeId);
170-
nodesWithPendingFetchRequests.add(nodeId);
171-
sendFetchRequest(nodeId, fetchLogRequest);
172-
});
165+
public void sendFetches() {
166+
checkAndUpdateMetadata(fetchableBuckets());
167+
synchronized (this) {
168+
// NOTE: Don't perform heavy I/O operations or synchronous waits inside this lock to
169+
// avoid blocking the future complete of FetchLogResponse.
170+
Map<Integer, FetchLogRequest> fetchRequestMap = prepareFetchLogRequests();
171+
fetchRequestMap.forEach(
172+
(nodeId, fetchLogRequest) -> {
173+
LOG.debug("Adding pending request for node id {}", nodeId);
174+
nodesWithPendingFetchRequests.add(nodeId);
175+
sendFetchRequest(nodeId, fetchLogRequest);
176+
});
177+
}
173178
}
174179

175180
/**
@@ -190,6 +195,31 @@ public void wakeup() {
190195
logFetchBuffer.wakeup();
191196
}
192197

198+
private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
199+
// If the table is partitioned table, check if we need update partition metadata.
200+
List<Long> partitionIds = isPartitioned ? new ArrayList<>() : null;
201+
// If the table is none-partitioned table, check if we need update table metadata.
202+
boolean needUpdate = false;
203+
for (TableBucket tb : tableBuckets) {
204+
if (getTableBucketLeader(tb) != null) {
205+
continue;
206+
}
207+
208+
if (isPartitioned) {
209+
partitionIds.add(tb.getPartitionId());
210+
} else {
211+
needUpdate = true;
212+
break;
213+
}
214+
}
215+
216+
if (isPartitioned && !partitionIds.isEmpty()) {
217+
metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, partitionIds);
218+
} else if (needUpdate) {
219+
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
220+
}
221+
}
222+
193223
private void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
194224
TableOrPartitions tableOrPartitionsInFetchRequest =
195225
getTableOrPartitionsInFetchRequest(fetchLogRequest);
@@ -243,18 +273,21 @@ private TableOrPartitions getTableOrPartitionsInFetchRequest(FetchLogRequest fet
243273
return new TableOrPartitions(tableIdsInFetchRequest, tablePartitionsInFetchRequest);
244274
}
245275

246-
private static class TableOrPartitions {
276+
/** A helper class to hold table ids or table partitions. */
277+
@VisibleForTesting
278+
static class TableOrPartitions {
247279
private final @Nullable Set<Long> tableIds;
248280
private final @Nullable Set<TablePartition> tablePartitions;
249281

250-
private TableOrPartitions(
282+
TableOrPartitions(
251283
@Nullable Set<Long> tableIds, @Nullable Set<TablePartition> tablePartitions) {
252284
this.tableIds = tableIds;
253285
this.tablePartitions = tablePartitions;
254286
}
255287
}
256288

257-
private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
289+
@VisibleForTesting
290+
void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
258291
Set<PhysicalTablePath> physicalTablePaths =
259292
metadataUpdater.getPhysicalTablePathByIds(
260293
tableOrPartitions.tableIds, tableOrPartitions.tablePartitions);
@@ -404,9 +437,6 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
404437
LOG.trace(
405438
"Skipping fetch request for bucket {} because leader is not available.",
406439
tb);
407-
// try to get the latest metadata info of this table because the leader for this
408-
// bucket is unknown.
409-
metadataUpdater.updateTableOrPartitionMetadata(tablePath, tb.getPartitionId());
410440
} else if (nodesWithPendingFetchRequests.contains(leader)) {
411441
LOG.trace(
412442
"Skipping fetch request for bucket {} because previous request "
@@ -472,7 +502,6 @@ private List<TableBucket> fetchableBuckets() {
472502
}
473503

474504
private Integer getTableBucketLeader(TableBucket tableBucket) {
475-
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
476505
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
477506
BucketLocation bucketLocation = metadataUpdater.getBucketLocation(tableBucket).get();
478507
if (bucketLocation.getLeader() != null) {

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ private static Configuration initConfig() {
121121
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
122122
conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
123123
conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
124+
125+
conf.set(ConfigOptions.NETTY_CLIENT_NUM_NETWORK_THREADS, 1);
124126
return conf;
125127
}
126128

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.client.Connection;
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.client.admin.Admin;
23-
import org.apache.fluss.client.utils.MetadataUtils;
2423
import org.apache.fluss.cluster.Cluster;
2524
import org.apache.fluss.cluster.ServerNode;
2625
import org.apache.fluss.config.Configuration;
@@ -34,6 +33,7 @@
3433
import java.util.Collections;
3534
import java.util.List;
3635

36+
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
3737
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
3838
import static org.assertj.core.api.Assertions.assertThat;
3939

@@ -64,7 +64,7 @@ void testRebuildClusterNTimes() throws Exception {
6464
// any N levels UnmodifiableCollection
6565
for (int i = 0; i < 20000; i++) {
6666
cluster =
67-
MetadataUtils.sendMetadataRequestAndRebuildCluster(
67+
sendMetadataRequestAndRebuildCluster(
6868
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
6969
true,
7070
cluster,

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.Future;
45+
import java.util.concurrent.TimeUnit;
4246

4347
import static org.apache.fluss.record.TestData.DATA1;
4448
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -55,6 +59,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
5559
private long tableId;
5660
private final int bucketId0 = 0;
5761
private final int bucketId1 = 1;
62+
private LogScannerStatus logScannerStatus;
5863

5964
// TODO covert this test to UT as kafka.
6065

@@ -74,17 +79,16 @@ protected void setup() throws Exception {
7479
// add bucket 0 and bucket 1 to log scanner status.
7580
scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
7681
scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
77-
LogScannerStatus logScannerStatus = new LogScannerStatus();
82+
logScannerStatus = new LogScannerStatus();
7883
logScannerStatus.assignScanBuckets(scanBuckets);
79-
TestingScannerMetricGroup scannerMetricGroup = TestingScannerMetricGroup.newInstance();
8084
logFetcher =
8185
new LogFetcher(
8286
DATA1_TABLE_INFO,
8387
null,
8488
logScannerStatus,
8589
clientConf,
8690
metadataUpdater,
87-
scannerMetricGroup,
91+
TestingScannerMetricGroup.newInstance(),
8892
new RemoteFileDownloader(1));
8993
}
9094

@@ -183,6 +187,39 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
183187
assertThat(records.get(tb0).size()).isEqualTo(10);
184188
}
185189

190+
@Test
191+
void testFetchWithInvalidTableOrPartitions() throws Exception {
192+
MetadataUpdater metadataUpdater1 =
193+
new MetadataUpdater(clientConf, FLUSS_CLUSTER_EXTENSION.getRpcClient());
194+
logFetcher =
195+
new LogFetcher(
196+
DATA1_TABLE_INFO,
197+
null,
198+
logScannerStatus,
199+
clientConf,
200+
metadataUpdater1,
201+
TestingScannerMetricGroup.newInstance(),
202+
new RemoteFileDownloader(1));
203+
204+
ExecutorService executor = Executors.newSingleThreadExecutor();
205+
Future<?> future =
206+
executor.submit(
207+
() -> {
208+
// If this test blocked, please checking whether it was blocked with
209+
// the same reason as https://github.com/apache/fluss/pull/1666
210+
for (int i = 0; i < 1000; i++) {
211+
logFetcher.sendFetches();
212+
logFetcher.invalidTableOrPartitions(
213+
new LogFetcher.TableOrPartitions(
214+
Collections.singleton(tableId), null));
215+
}
216+
});
217+
218+
future.get(30, TimeUnit.SECONDS);
219+
assertThat(future.isDone()).isTrue();
220+
executor.shutdownNow();
221+
}
222+
186223
private void addRecordsToBucket(
187224
TableBucket tableBucket, MemoryLogRecords logRecords, long expectedBaseOffset)
188225
throws Exception {

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,10 +763,10 @@ public class ConfigOptions {
763763
public static final ConfigOption<Integer> NETTY_CLIENT_NUM_NETWORK_THREADS =
764764
key("netty.client.num-network-threads")
765765
.intType()
766-
.defaultValue(3)
766+
.defaultValue(4)
767767
.withDescription(
768768
"The number of threads that the client uses for sending requests to the "
769-
+ "network and receiving responses from network. The default value is 3");
769+
+ "network and receiving responses from network. The default value is 4");
770770

771771
// ------------------------------------------------------------------------
772772
// Client Settings

website/docs/maintenance/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ during the Fluss cluster working.
9292
| netty.server.num-worker-threads | Integer | 8 | The number of threads that the server uses for processing requests, which may include disk and remote I/O. |
9393
| netty.server.max-queued-requests | Integer | 500 | The number of queued requests allowed for worker threads, before blocking the I/O threads. |
9494
| netty.connection.max-idle-time | Duration | 10min | Close idle connections after the given time specified by this config. |
95-
| netty.client.num-network-threads | Integer | 3 | The number of threads that the client uses for sending requests to the network and receiving responses from network. The default value is 3 |
95+
| netty.client.num-network-threads | Integer | 4 | The number of threads that the client uses for sending requests to the network and receiving responses from network. The default value is 4 |
9696

9797
## Log
9898

0 commit comments

Comments
 (0)