Skip to content

Commit 4618fa7

Browse files
committed
[client] Fix some error which will let LookupSender hang if leader and bucket not ready
1 parent 0bd16ef commit 4618fa7

File tree

4 files changed

+83
-34
lines changed

4 files changed

+83
-34
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,17 @@ private Map<Integer, List<Lookup>> groupByLeader(List<Lookup> lookups) {
116116
for (Lookup lookup : lookups) {
117117
// get the leader node
118118
TableBucket tb = lookup.tableBucket();
119-
int leader = metadataUpdater.leaderFor(tb);
119+
120+
int leader;
121+
try {
122+
// TODO this can be a re-triable operation. We should retry here instead of throwing
123+
// exception.
124+
leader = metadataUpdater.leaderFor(tb);
125+
} catch (Exception e) {
126+
lookup.future().completeExceptionally(e);
127+
continue;
128+
}
129+
120130
lookupBatchesByLeader.computeIfAbsent(leader, k -> new ArrayList<>()).add(lookup);
121131
}
122132
return lookupBatchesByLeader;
@@ -188,16 +198,27 @@ private void handleLookupResponse(
188198
? pbLookupRespForBucket.getPartitionId()
189199
: null,
190200
pbLookupRespForBucket.getBucketId());
191-
List<PbValue> pbValues = pbLookupRespForBucket.getValuesList();
192201
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
193-
lookupBatch.complete(pbValues);
202+
if (pbLookupRespForBucket.hasErrorCode()) {
203+
// TODO for re-triable error, we should retry here instead of throwing exception.
204+
LOG.warn(
205+
"Get error lookup response on table bucket {}, fail. Error: {}",
206+
tableBucket,
207+
pbLookupRespForBucket.getErrorMessage());
208+
lookupBatch.completeExceptionally(
209+
new FlussRuntimeException(pbLookupRespForBucket.getErrorMessage()));
210+
} else {
211+
List<PbValue> pbValues = pbLookupRespForBucket.getValuesList();
212+
lookupBatch.complete(pbValues);
213+
}
194214
}
195215
}
196216

197217
private void handleLookupRequestException(
198218
Throwable t, Map<TableBucket, LookupBatch> lookupsByBucket) {
199219
ApiError error = ApiError.fromThrowable(t);
200220
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
221+
// TODO for re-triable error, we should retry here instead of throwing exception.
201222
LOG.warn(
202223
"Get error lookup response on table bucket {}, fail. Error: {}",
203224
lookupBatch.tableBucket(),

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,14 @@ public static void verifyPartitionLogs(
211211
verifyRows(rowType, actualRows, expectPartitionsRows);
212212
}
213213

214+
public static void waitAllReplicasReady(long tableId, TableDescriptor tableDescriptor) {
215+
// retry until all replica ready.
216+
int expectBucketCount = tableDescriptor.getTableDistribution().get().getBucketCount().get();
217+
for (int i = 0; i < expectBucketCount; i++) {
218+
FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(new TableBucket(tableId, i));
219+
}
220+
}
221+
214222
protected static void verifyRows(
215223
RowType rowType,
216224
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,9 @@ void testUpsertWithSmallBuffer() throws Exception {
190190
void testPutAndLookup() throws Exception {
191191
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table");
192192
createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
193-
verifyPutAndLookup(tablePath, DATA1_SCHEMA_PK, new Object[] {1, "a"});
193+
194+
Table table = conn.getTable(tablePath);
195+
verifyPutAndLookup(table, DATA1_SCHEMA_PK, new Object[] {1, "a"});
194196

195197
// test put/lookup data for primary table with pk index is not 0
196198
Schema schema =
@@ -207,8 +209,23 @@ void testPutAndLookup() throws Exception {
207209
TablePath data1PkTablePath2 =
208210
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_2");
209211
createTable(data1PkTablePath2, tableDescriptor, true);
212+
210213
// now, check put/lookup data
211-
verifyPutAndLookup(data1PkTablePath2, schema, new Object[] {"a", 1});
214+
Table table2 = conn.getTable(data1PkTablePath2);
215+
verifyPutAndLookup(table2, schema, new Object[] {"a", 1});
216+
}
217+
218+
@Test
219+
void testLookupForNotReadyTable() throws Exception {
220+
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");
221+
TableDescriptor descriptor =
222+
TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(10).build();
223+
long tableId = createTable(tablePath, descriptor, true);
224+
IndexedRow rowKey = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"});
225+
// retry until all replica ready. Otherwise, the lookup maybe fail.
226+
waitAllReplicasReady(tableId, descriptor);
227+
Table table = conn.getTable(tablePath);
228+
assertThat(lookupRow(table, rowKey)).isNull();
212229
}
213230

214231
@Test
@@ -232,8 +249,10 @@ void testLimitScanPrimaryTable() throws Exception {
232249
}
233250
}
234251
upsertWriter.flush();
252+
253+
TableBucket tb = new TableBucket(tableId, 0);
235254
List<InternalRow> actualRows =
236-
table.limitScan(new TableBucket(tableId, 0), limitSize, null).get().stream()
255+
table.limitScan(tb, limitSize, null).get().stream()
237256
.map(ScanRecord::getRow)
238257
.collect(Collectors.toList());
239258
assertThat(actualRows.size()).isEqualTo(limitSize);
@@ -248,8 +267,7 @@ void testLimitScanPrimaryTable() throws Exception {
248267
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
249268
}
250269
actualRows =
251-
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
252-
.stream()
270+
table.limitScan(tb, limitSize, projectedFields).get().stream()
253271
.map(ScanRecord::getRow)
254272
.collect(Collectors.toList());
255273
assertThat(actualRows.size()).isEqualTo(limitSize);
@@ -285,8 +303,10 @@ void testLimitScanLogTable() throws Exception {
285303
}
286304
}
287305
appendWriter.flush();
306+
307+
TableBucket tb = new TableBucket(tableId, 0);
288308
List<InternalRow> actualRows =
289-
table.limitScan(new TableBucket(tableId, 0), limitSize, null).get().stream()
309+
table.limitScan(tb, limitSize, null).get().stream()
290310
.map(ScanRecord::getRow)
291311
.collect(Collectors.toList());
292312
assertThat(actualRows.size()).isEqualTo(limitSize);
@@ -301,8 +321,7 @@ void testLimitScanLogTable() throws Exception {
301321
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
302322
}
303323
actualRows =
304-
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
305-
.stream()
324+
table.limitScan(tb, limitSize, projectedFields).get().stream()
306325
.map(ScanRecord::getRow)
307326
.collect(Collectors.toList());
308327
assertThat(actualRows.size()).isEqualTo(limitSize);
@@ -315,26 +334,21 @@ void testLimitScanLogTable() throws Exception {
315334
}
316335
}
317336

318-
void verifyPutAndLookup(TablePath tablePath, Schema tableSchema, Object[] fields)
319-
throws Exception {
337+
void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws Exception {
320338
// put data.
321339
InternalRow row = compactedRow(tableSchema.toRowType(), fields);
322-
try (Table table = conn.getTable(tablePath)) {
323-
UpsertWriter upsertWriter = table.getUpsertWriter();
324-
// put data.
325-
upsertWriter.upsert(row);
326-
upsertWriter.flush();
327-
}
340+
UpsertWriter upsertWriter = table.getUpsertWriter();
341+
// put data.
342+
upsertWriter.upsert(row);
343+
upsertWriter.flush();
328344
// lookup this key.
329345
IndexedRow keyRow = keyRow(tableSchema, fields);
330-
assertThat(lookupRow(tablePath, keyRow)).isEqualTo(row);
346+
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
331347
}
332348

333-
private InternalRow lookupRow(TablePath tablePath, IndexedRow keyRow) throws Exception {
334-
try (Table table = conn.getTable(tablePath)) {
335-
// lookup this key.
336-
return table.lookup(keyRow).get().getRow();
337-
}
349+
private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception {
350+
// lookup this key.
351+
return table.lookup(keyRow).get().getRow();
338352
}
339353

340354
@Test
@@ -353,19 +367,19 @@ void testPartialPutAndDelete() throws Exception {
353367
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, true);
354368

355369
// test put a full row
356-
verifyPutAndLookup(DATA1_TABLE_PATH_PK, schema, new Object[] {1, "a", 1, true});
370+
Table table = conn.getTable(DATA1_TABLE_PATH_PK);
371+
verifyPutAndLookup(table, schema, new Object[] {1, "a", 1, true});
357372

358373
// partial update columns: a, b
359374
UpsertWrite partialUpdate = new UpsertWrite().withPartialUpdate(new int[] {0, 1});
360-
Table table = conn.getTable(DATA1_TABLE_PATH_PK);
361375
UpsertWriter upsertWriter = table.getUpsertWriter(partialUpdate);
362376
upsertWriter
363377
.upsert(compactedRow(schema.toRowType(), new Object[] {1, "aaa", null, null}))
364378
.get();
365379

366380
// check the row
367381
IndexedRow rowKey = row(pkRowType, new Object[] {1});
368-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
382+
assertThat(lookupRow(table, rowKey))
369383
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "aaa", 1, true}));
370384

371385
// partial update columns columns: a,b,c
@@ -376,14 +390,14 @@ void testPartialPutAndDelete() throws Exception {
376390
.get();
377391

378392
// lookup the row
379-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
393+
assertThat(lookupRow(table, rowKey))
380394
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, true}));
381395

382396
// test partial delete, target column is a,b,c
383397
upsertWriter
384398
.delete(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, null}))
385399
.get();
386-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey))
400+
assertThat(lookupRow(table, rowKey))
387401
.isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, null, null, true}));
388402

389403
// partial delete, target column is d
@@ -394,7 +408,7 @@ void testPartialPutAndDelete() throws Exception {
394408
.get();
395409

396410
// the row should be deleted, shouldn't get the row again
397-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)).isNull();
411+
assertThat(lookupRow(table, rowKey)).isNull();
398412

399413
table.close();
400414
}
@@ -450,12 +464,12 @@ void testDelete() throws Exception {
450464

451465
// lookup this key.
452466
IndexedRow keyRow = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"});
453-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isEqualTo(row);
467+
assertThat(lookupRow(table, keyRow)).isEqualTo(row);
454468

455469
// delete this key.
456470
upsertWriter.delete(row).get();
457471
// lookup this key again, will return null.
458-
assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isNull();
472+
assertThat(lookupRow(table, keyRow)).isNull();
459473
}
460474
}
461475

fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,13 +459,19 @@ public void waitUtilAllReplicaReady(TableBucket tableBucket) {
459459
() -> {
460460
Optional<LeaderAndIsr> leaderAndIsrOpt = zkClient.getLeaderAndIsr(tableBucket);
461461
assertThat(leaderAndIsrOpt).isPresent();
462-
List<Integer> isr = leaderAndIsrOpt.get().isr();
462+
LeaderAndIsr leaderAndIsr = leaderAndIsrOpt.get();
463+
List<Integer> isr = leaderAndIsr.isr();
463464
for (int replicaId : isr) {
464465
ReplicaManager replicaManager =
465466
getTabletServerById(replicaId).getReplicaManager();
466467
assertThat(replicaManager.getReplica(tableBucket))
467468
.isInstanceOf(ReplicaManager.OnlineReplica.class);
468469
}
470+
471+
int leader = leaderAndIsr.leader();
472+
ReplicaManager replicaManager = getTabletServerById(leader).getReplicaManager();
473+
assertThat(replicaManager.getReplicaOrException(tableBucket).isLeader())
474+
.isTrue();
469475
});
470476
}
471477

0 commit comments

Comments
 (0)