Skip to content

Commit fcc15c0

Browse files
committed
[client] Add retry when get one available tablet serverNode fails (#425)
1 parent 3bf243d commit fcc15c0

File tree

3 files changed

+70
-4
lines changed

3 files changed

+70
-4
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
139139
if (securityTokenManager == null) {
140140
// prepare security token manager
141141
// create the admin read only gateway
142-
// todo: may add retry logic when no any available tablet server?
143142
AdminReadOnlyGateway gateway =
144143
GatewayClientProxy.createGatewayProxy(
145144
() ->
146145
getOneAvailableTabletServerNode(
147146
metadataUpdater.getCluster()),
148147
rpcClient,
149148
AdminReadOnlyGateway.class);
149+
150150
SecurityTokenProvider securityTokenProvider =
151151
new DefaultSecurityTokenProvider(gateway);
152152
securityTokenManager =

fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java

+29-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import com.alibaba.fluss.rpc.messages.PbTableMetadata;
3838
import com.alibaba.fluss.rpc.messages.PbTablePath;
3939

40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
4043
import javax.annotation.Nullable;
4144

4245
import java.util.ArrayList;
@@ -52,6 +55,9 @@
5255

5356
/** Utils for metadata for client. */
5457
public class MetadataUtils {
58+
private static final Logger LOG = LoggerFactory.getLogger(MetadataUtils.class);
59+
60+
private static final int MAX_RETRY_TIMES = 5;
5561

5662
private static final Random randOffset = new Random();
5763

@@ -255,10 +261,30 @@ public NewTableMetadata(
255261
}
256262

257263
public static ServerNode getOneAvailableTabletServerNode(Cluster cluster) {
258-
List<ServerNode> aliveTabletServers = cluster.getAliveTabletServerList();
259-
if (aliveTabletServers.isEmpty()) {
260-
throw new FlussRuntimeException("no alive tablet server in cluster");
264+
List<ServerNode> aliveTabletServers = null;
265+
for (int retryTimes = 0; retryTimes <= MAX_RETRY_TIMES; retryTimes++) {
266+
aliveTabletServers = cluster.getAliveTabletServerList();
267+
if (aliveTabletServers.isEmpty()) {
268+
LOG.error("Fluss create gateway proxy error, retry times = {}.", retryTimes);
269+
if (retryTimes >= MAX_RETRY_TIMES) {
270+
String exceptionMsg =
271+
String.format(
272+
"Execution of Fluss get one available tablet failed, no alive tablet server in cluster, retry times = %d.",
273+
retryTimes);
274+
throw new FlussRuntimeException(exceptionMsg);
275+
} else {
276+
try {
277+
Thread.sleep(1000L * retryTimes);
278+
} catch (InterruptedException interruptedException) {
279+
Thread.currentThread().interrupt();
280+
throw new RuntimeException(interruptedException);
281+
}
282+
}
283+
} else {
284+
break;
285+
}
261286
}
287+
262288
// just pick one random server node
263289
int offset = randOffset.nextInt(aliveTabletServers.size());
264290
return aliveTabletServers.get(offset);

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java

+40
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
package com.alibaba.fluss.client.table;
1818

19+
import com.alibaba.fluss.client.Connection;
20+
import com.alibaba.fluss.client.ConnectionFactory;
1921
import com.alibaba.fluss.client.admin.ClientToServerITCaseBase;
2022
import com.alibaba.fluss.client.table.scanner.ScanRecord;
2123
import com.alibaba.fluss.client.table.scanner.log.LogScanner;
2224
import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
2325
import com.alibaba.fluss.client.table.writer.AppendWriter;
2426
import com.alibaba.fluss.client.table.writer.UpsertWriter;
27+
import com.alibaba.fluss.cluster.ServerNode;
28+
import com.alibaba.fluss.cluster.ServerType;
29+
import com.alibaba.fluss.exception.FlussRuntimeException;
2530
import com.alibaba.fluss.row.GenericRow;
2631
import com.alibaba.fluss.row.InternalRow;
2732

@@ -31,6 +36,7 @@
3136
import java.time.Duration;
3237
import java.util.ArrayList;
3338
import java.util.List;
39+
import java.util.stream.Collectors;
3440

3541
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
3642
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -39,6 +45,7 @@
3945
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
4046
import static com.alibaba.fluss.testutils.DataTestUtils.row;
4147
import static com.alibaba.fluss.testutils.InternalRowListAssert.assertThatRows;
48+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4249

4350
/** IT case for {@link FlussTable} in the case of one tablet server fails. */
4451
class FlussFailServerTableITCase extends ClientToServerITCaseBase {
@@ -146,6 +153,39 @@ void testLogScan() throws Exception {
146153
}
147154
}
148155

156+
@Test
157+
void testRetryGetTabletServerNodes() throws Exception {
158+
createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
159+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
160+
table.newScan().createLogScanner();
161+
162+
List<ServerNode> serverNodes =
163+
conn.getAdmin().getServerNodes().get().stream()
164+
.filter(
165+
serverNode ->
166+
serverNode.serverType() == ServerType.TABLET_SERVER)
167+
.collect(Collectors.toList());
168+
169+
// kill all tablet server
170+
for (ServerNode serverNode : serverNodes) {
171+
FLUSS_CLUSTER_EXTENSION.stopTabletServer(serverNode.id());
172+
}
173+
174+
try (Connection connNew = ConnectionFactory.createConnection(clientConf)) {
175+
assertThatThrownBy(() -> connNew.getTable(DATA1_TABLE_PATH))
176+
.cause()
177+
.isInstanceOf(FlussRuntimeException.class)
178+
.hasMessage(
179+
"Execution of Fluss get one available tablet failed, no alive tablet server in cluster, retry times = %d.",
180+
5);
181+
} finally {
182+
for (ServerNode serverNode : serverNodes) {
183+
FLUSS_CLUSTER_EXTENSION.startTabletServer(serverNode.id());
184+
}
185+
}
186+
}
187+
}
188+
149189
private List<InternalRow> toRows(ScanRecords scanRecords) {
150190
List<InternalRow> rows = new ArrayList<>();
151191
for (ScanRecord scanRecord : scanRecords) {

0 commit comments

Comments
 (0)