Skip to content

Commit 1c38a02

Browse files
committed
[Client] Fix LogFetcher doesn't update bucket metadata when receive NotLeaderOrFollowerException in bucket level
1 parent e69e4f4 commit 1c38a02

File tree

5 files changed

+369
-204
lines changed

5 files changed

+369
-204
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.cluster.BucketLocation;
2727
import org.apache.fluss.config.ConfigOptions;
2828
import org.apache.fluss.config.Configuration;
29+
import org.apache.fluss.exception.ApiException;
2930
import org.apache.fluss.exception.InvalidMetadataException;
3031
import org.apache.fluss.exception.LeaderNotAvailableException;
3132
import org.apache.fluss.fs.FsPath;
@@ -47,6 +48,7 @@
4748
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
4849
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
4950
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
51+
import org.apache.fluss.rpc.protocol.ApiError;
5052
import org.apache.fluss.rpc.protocol.Errors;
5153
import org.apache.fluss.utils.IOUtils;
5254
import org.apache.fluss.utils.Projection;
@@ -220,7 +222,8 @@ private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
220222
}
221223
}
222224

223-
private void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
225+
@VisibleForTesting
226+
void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
224227
TableOrPartitions tableOrPartitionsInFetchRequest =
225228
getTableOrPartitionsInFetchRequest(fetchLogRequest);
226229
// TODO cache the tablet server gateway.
@@ -341,6 +344,14 @@ private synchronized void handleFetchLogResponse(
341344
respForBucket.getBucketId());
342345
FetchLogResultForBucket fetchResultForBucket =
343346
getFetchLogResultForBucket(tb, tablePath, respForBucket);
347+
348+
// if error code is not NONE, it means the fetch log request failed, we need to
349+
// clear table bucket meta for InvalidMetadataException.
350+
if (fetchResultForBucket.getErrorCode() != Errors.NONE.code()) {
351+
ApiError error = ApiError.fromErrorMessage(respForBucket);
352+
handleFetchLogExceptionForBucket(tb, destination, error);
353+
}
354+
344355
Long fetchOffset = logScannerStatus.getBucketOffset(tb);
345356
// if the offset is null, it means the bucket has been unsubscribed,
346357
// we just set a Long.MAX_VALUE as the next fetch offset
@@ -383,6 +394,29 @@ private synchronized void handleFetchLogResponse(
383394
}
384395
}
385396

397+
private void handleFetchLogExceptionForBucket(TableBucket tb, int destination, ApiError error) {
398+
ApiException exception = error.error().exception();
399+
LOG.error("Failed to fetch log from node {} for bucket {}", destination, tb, exception);
400+
if (exception instanceof InvalidMetadataException) {
401+
LOG.warn(
402+
"Invalid metadata error in fetch log request. "
403+
+ "Going to request metadata update.",
404+
exception);
405+
long tableId = tb.getTableId();
406+
TableOrPartitions tableOrPartitions;
407+
if (tb.getPartitionId() == null) {
408+
tableOrPartitions = new TableOrPartitions(Collections.singleton(tableId), null);
409+
} else {
410+
tableOrPartitions =
411+
new TableOrPartitions(
412+
null,
413+
Collections.singleton(
414+
new TablePartition(tableId, tb.getPartitionId())));
415+
}
416+
invalidTableOrPartitions(tableOrPartitions);
417+
}
418+
}
419+
386420
private void pendRemoteFetches(
387421
RemoteLogFetchInfo remoteLogFetchInfo, long firstFetchOffset, long highWatermark) {
388422
checkNotNull(remoteLogFetchInfo);
@@ -413,7 +447,8 @@ private void pendRemoteFetches(
413447
}
414448
}
415449

416-
private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
450+
@VisibleForTesting
451+
Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
417452
Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new HashMap<>();
418453
int readyForFetchCount = 0;
419454
Long tableId = null;

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public void updateCluster(Cluster cluster) {
7878
this.cluster = cluster;
7979
}
8080

81+
public void setResponseLogicId(int serverId, int responseLogicId) {
82+
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
83+
}
84+
8185
@Override
8286
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
8387
Set<TablePath> needUpdateTablePaths =
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
21+
import org.apache.fluss.client.metadata.MetadataUpdater;
22+
import org.apache.fluss.client.metrics.TestingScannerMetricGroup;
23+
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
24+
import org.apache.fluss.client.table.scanner.ScanRecord;
25+
import org.apache.fluss.cluster.Cluster;
26+
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.metadata.TableBucket;
28+
import org.apache.fluss.record.MemoryLogRecords;
29+
import org.apache.fluss.rpc.RpcClient;
30+
import org.apache.fluss.rpc.gateway.TabletServerGateway;
31+
import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket;
32+
import org.apache.fluss.rpc.messages.ProduceLogResponse;
33+
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
37+
import java.time.Duration;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.Future;
45+
import java.util.concurrent.TimeUnit;
46+
47+
import static org.apache.fluss.record.TestData.DATA1;
48+
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
49+
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
50+
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
51+
import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
52+
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
53+
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
54+
import static org.assertj.core.api.Assertions.assertThat;
55+
56+
/** IT test for {@link LogFetcher}. */
57+
public class LogFetcherITCase extends ClientToServerITCaseBase {
58+
private LogFetcher logFetcher;
59+
private long tableId;
60+
private final int bucketId0 = 0;
61+
private final int bucketId1 = 1;
62+
private LogScannerStatus logScannerStatus;
63+
64+
@BeforeEach
65+
protected void setup() throws Exception {
66+
super.setup();
67+
68+
// We create table data1NonPkTablePath previously.
69+
tableId = createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
70+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
71+
72+
RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
73+
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
74+
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
75+
76+
Map<TableBucket, Long> scanBuckets = new HashMap<>();
77+
// add bucket 0 and bucket 1 to log scanner status.
78+
scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
79+
scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
80+
logScannerStatus = new LogScannerStatus();
81+
logScannerStatus.assignScanBuckets(scanBuckets);
82+
logFetcher =
83+
new LogFetcher(
84+
DATA1_TABLE_INFO,
85+
null,
86+
logScannerStatus,
87+
clientConf,
88+
metadataUpdater,
89+
TestingScannerMetricGroup.newInstance(),
90+
new RemoteFileDownloader(1));
91+
}
92+
93+
@Test
94+
void testFetch() throws Exception {
95+
// add one batch records to tb0.
96+
TableBucket tb0 = new TableBucket(tableId, bucketId0);
97+
addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
98+
99+
// add one batch records to tb1.
100+
TableBucket tb1 = new TableBucket(tableId, bucketId1);
101+
addRecordsToBucket(tb1, genMemoryLogRecordsByObject(DATA1), 0L);
102+
103+
assertThat(logFetcher.hasAvailableFetches()).isFalse();
104+
// collect fetch will be empty while no available fetch.
105+
assertThat(logFetcher.collectFetch()).isEmpty();
106+
107+
// send fetcher to fetch data.
108+
logFetcher.sendFetches();
109+
// The fetcher is async to fetch data, so we need to wait the result write to the
110+
// logFetchBuffer.
111+
retry(
112+
Duration.ofMinutes(1),
113+
() -> {
114+
assertThat(logFetcher.hasAvailableFetches()).isTrue();
115+
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
116+
});
117+
118+
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
119+
assertThat(records.size()).isEqualTo(2);
120+
assertThat(records.get(tb0).size()).isEqualTo(10);
121+
assertThat(records.get(tb1).size()).isEqualTo(10);
122+
123+
// after collect fetch, the fetcher is empty.
124+
assertThat(logFetcher.hasAvailableFetches()).isFalse();
125+
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0);
126+
}
127+
128+
@Test
129+
void testFetchWhenDestinationIsNullInMetadata() throws Exception {
130+
TableBucket tb0 = new TableBucket(tableId, bucketId0);
131+
addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
132+
133+
RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
134+
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
135+
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
136+
137+
int leaderNode = metadataUpdater.leaderFor(tb0);
138+
139+
// now, remove leader nodd ,so that fetch destination
140+
// server node is null
141+
Cluster oldCluster = metadataUpdater.getCluster();
142+
Map<Integer, ServerNode> aliveTabletServersById =
143+
new HashMap<>(oldCluster.getAliveTabletServers());
144+
aliveTabletServersById.remove(leaderNode);
145+
Cluster newCluster =
146+
new Cluster(
147+
aliveTabletServersById,
148+
oldCluster.getCoordinatorServer(),
149+
oldCluster.getBucketLocationsByPath(),
150+
oldCluster.getTableIdByPath(),
151+
oldCluster.getPartitionIdByPath(),
152+
oldCluster.getTableInfoByPath());
153+
metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
154+
155+
LogScannerStatus logScannerStatus = new LogScannerStatus();
156+
logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));
157+
158+
LogFetcher logFetcher =
159+
new LogFetcher(
160+
DATA1_TABLE_INFO,
161+
null,
162+
logScannerStatus,
163+
clientConf,
164+
metadataUpdater,
165+
TestingScannerMetricGroup.newInstance(),
166+
new RemoteFileDownloader(1));
167+
168+
// send fetches to fetch data, should have no available fetch.
169+
logFetcher.sendFetches();
170+
assertThat(logFetcher.hasAvailableFetches()).isFalse();
171+
172+
// then fetches again, should have available fetch.
173+
// first send fetch is for update metadata
174+
logFetcher.sendFetches();
175+
// second send fetch will do real fetch data
176+
logFetcher.sendFetches();
177+
retry(
178+
Duration.ofMinutes(1),
179+
() -> {
180+
assertThat(logFetcher.hasAvailableFetches()).isTrue();
181+
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
182+
});
183+
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
184+
assertThat(records.size()).isEqualTo(1);
185+
assertThat(records.get(tb0).size()).isEqualTo(10);
186+
}
187+
188+
@Test
189+
void testFetchWithInvalidTableOrPartitions() throws Exception {
190+
MetadataUpdater metadataUpdater1 =
191+
new MetadataUpdater(clientConf, FLUSS_CLUSTER_EXTENSION.getRpcClient());
192+
logFetcher =
193+
new LogFetcher(
194+
DATA1_TABLE_INFO,
195+
null,
196+
logScannerStatus,
197+
clientConf,
198+
metadataUpdater1,
199+
TestingScannerMetricGroup.newInstance(),
200+
new RemoteFileDownloader(1));
201+
202+
ExecutorService executor = Executors.newSingleThreadExecutor();
203+
Future<?> future =
204+
executor.submit(
205+
() -> {
206+
// If this test blocked, please checking whether it was blocked with
207+
// the same reason as https://github.com/apache/fluss/pull/1666
208+
for (int i = 0; i < 1000; i++) {
209+
logFetcher.sendFetches();
210+
logFetcher.invalidTableOrPartitions(
211+
new LogFetcher.TableOrPartitions(
212+
Collections.singleton(tableId), null));
213+
}
214+
});
215+
216+
future.get(30, TimeUnit.SECONDS);
217+
assertThat(future.isDone()).isTrue();
218+
executor.shutdownNow();
219+
}
220+
221+
private void addRecordsToBucket(
222+
TableBucket tableBucket, MemoryLogRecords logRecords, long expectedBaseOffset)
223+
throws Exception {
224+
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
225+
TabletServerGateway leaderGateWay =
226+
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
227+
assertProduceLogResponse(
228+
leaderGateWay
229+
.produceLog(
230+
newProduceLogRequest(
231+
tableBucket.getTableId(),
232+
tableBucket.getBucket(),
233+
-1, // need ack, so we can make sure every batch is acked.
234+
logRecords))
235+
.get(),
236+
tableBucket.getBucket(),
237+
expectedBaseOffset);
238+
}
239+
240+
private static void assertProduceLogResponse(
241+
ProduceLogResponse produceLogResponse, int bucketId, Long baseOffset) {
242+
assertThat(produceLogResponse.getBucketsRespsCount()).isEqualTo(1);
243+
PbProduceLogRespForBucket produceLogRespForBucket =
244+
produceLogResponse.getBucketsRespsList().get(0);
245+
assertThat(produceLogRespForBucket.getBucketId()).isEqualTo(bucketId);
246+
assertThat(produceLogRespForBucket.getBaseOffset()).isEqualTo(baseOffset);
247+
}
248+
}

0 commit comments

Comments
 (0)