Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.TimeoutException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
import org.apache.fluss.rpc.entity.PutKvResultForBucket;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.ProduceLogResponse;
import org.apache.fluss.rpc.messages.PutKvResponse;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.tablet.TestTabletServerGateway;
import org.apache.fluss.utils.clock.SystemClock;
Expand All @@ -52,11 +59,19 @@

import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse;
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -93,7 +108,7 @@ void testSimple() throws Exception {
appendToAccumulator(tb1, row(1, "a"), future::complete);
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));

sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
Expand All @@ -118,7 +133,7 @@ void testRetries() throws Exception {
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
long offset = 0;
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));

sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
Expand All @@ -131,13 +146,13 @@ void testRetries() throws Exception {
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);

// timeout error can retry send.
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);

// Even if timeout error can retry send, but the retry number > maxRetries, which will
// return error.
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
assertThat(future.get())
Expand Down Expand Up @@ -168,12 +183,12 @@ void testCanRetryWithoutIdempotence() throws Exception {
assertThat(firstRequest).isInstanceOf(ProduceLogRequest.class);
assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest) firstRequest)).isFalse();
// first complete with retriable error.
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
sender.runOnce();
assertThat(future.isDone()).isFalse();

// second retry complete.
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
finishRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
sender.runOnce();
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isNull();
Expand Down Expand Up @@ -691,17 +706,51 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);

finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));

// send again, should send nothing since no batch in queue
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
assertThat(future.get()).isNull();
}

@Test
void testRetryPutKeyWithSchemaNotExistException() throws Exception {
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 0);

BinaryRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
int[] pkIndex = DATA1_SCHEMA_PK.getPrimaryKeyIndexes();
byte[] key = new CompactedKeyEncoder(DATA1_ROW_TYPE, pkIndex).encodeKey(row);
CompletableFuture<Exception> future = new CompletableFuture<>();
accumulator.append(
WriteRecord.forUpsert(
DATA1_TABLE_INFO_PK,
PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
row,
key,
key,
null),
future::complete,
metadataUpdater.getCluster(),
0,
false);
sender.runOnce();
finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, SCHEMA_NOT_EXIST));
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);

// retry to put kv request again
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(1);
finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 1));
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
assertThat(future.get()).isNull();
}

private TestingMetadataUpdater initializeMetadataUpdater() {
return new TestingMetadataUpdater(
Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO));
Map<TablePath, TableInfo> tableInfos = new HashMap<>();
tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
return new TestingMetadataUpdater(tableInfos);
}

private void appendToAccumulator(TableBucket tb, GenericRow row, WriteCallback writeCallback)
Expand All @@ -722,7 +771,7 @@ private ApiMessage getRequest(TableBucket tb, int index) {
return gateway.getRequest(index);
}

private void finishProduceLogRequest(TableBucket tb, int index, ProduceLogResponse response) {
private void finishRequest(TableBucket tb, int index, ApiMessage response) {
TestTabletServerGateway gateway =
(TestTabletServerGateway)
metadataUpdater.newTabletServerClientForNode(
Expand Down Expand Up @@ -763,6 +812,16 @@ private ProduceLogResponse createProduceLogResponse(TableBucket tb, Errors error
Collections.singletonList(new ProduceLogResultForBucket(tb, error.toApiError())));
}

private PutKvResponse createPutKvResponse(TableBucket tb, long endOffset) {
return makePutKvResponse(
Collections.singletonList(new PutKvResultForBucket(tb, endOffset)));
}

private PutKvResponse createPutKvResponse(TableBucket tb, Errors error) {
return makePutKvResponse(
Collections.singletonList(new PutKvResultForBucket(tb, error.toApiError())));
}

private Sender setupWithIdempotenceState() {
return setupWithIdempotenceState(createIdempotenceManager(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @since 0.1
*/
@PublicEvolving
public class SchemaNotExistException extends ApiException {
public class SchemaNotExistException extends RetriableException {

public SchemaNotExistException(String message, Throwable cause) {
super(message, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
short latestSchemaId = (short) schemaInfo.getSchemaId();
short schemaIdOfNewData = kvRecords.schemaId();
if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
// TODO: we may need to support retriable exception here
throw new SchemaNotExistException(
"Invalid schema id: "
+ schemaIdOfNewData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ public CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest request) {

@Override
public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
return null;
CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
requests.add(Tuple2.of(request, response));
return response;
}

@Override
Expand Down