2525import org .apache .fluss .config .Configuration ;
2626import org .apache .fluss .config .MemorySize ;
2727import org .apache .fluss .exception .TimeoutException ;
28+ import org .apache .fluss .metadata .PhysicalTablePath ;
2829import org .apache .fluss .metadata .TableBucket ;
30+ import org .apache .fluss .metadata .TableInfo ;
31+ import org .apache .fluss .metadata .TablePath ;
2932import org .apache .fluss .record .MemoryLogRecords ;
33+ import org .apache .fluss .row .BinaryRow ;
3034import org .apache .fluss .row .GenericRow ;
35+ import org .apache .fluss .row .encode .CompactedKeyEncoder ;
3136import org .apache .fluss .rpc .entity .ProduceLogResultForBucket ;
37+ import org .apache .fluss .rpc .entity .PutKvResultForBucket ;
3238import org .apache .fluss .rpc .messages .ApiMessage ;
3339import org .apache .fluss .rpc .messages .ProduceLogRequest ;
3440import org .apache .fluss .rpc .messages .ProduceLogResponse ;
41+ import org .apache .fluss .rpc .messages .PutKvResponse ;
3542import org .apache .fluss .rpc .protocol .Errors ;
3643import org .apache .fluss .server .tablet .TestTabletServerGateway ;
3744import org .apache .fluss .utils .clock .SystemClock ;
5259
5360import static org .apache .fluss .record .LogRecordBatchFormat .NO_WRITER_ID ;
5461import static org .apache .fluss .record .TestData .DATA1_PHYSICAL_TABLE_PATH ;
62+ import static org .apache .fluss .record .TestData .DATA1_ROW_TYPE ;
63+ import static org .apache .fluss .record .TestData .DATA1_SCHEMA_PK ;
5564import static org .apache .fluss .record .TestData .DATA1_TABLE_ID ;
65+ import static org .apache .fluss .record .TestData .DATA1_TABLE_ID_PK ;
5666import static org .apache .fluss .record .TestData .DATA1_TABLE_INFO ;
67+ import static org .apache .fluss .record .TestData .DATA1_TABLE_INFO_PK ;
5768import static org .apache .fluss .record .TestData .DATA1_TABLE_PATH ;
69+ import static org .apache .fluss .record .TestData .DATA1_TABLE_PATH_PK ;
70+ import static org .apache .fluss .rpc .protocol .Errors .SCHEMA_NOT_EXIST ;
5871import static org .apache .fluss .server .utils .ServerRpcMessageUtils .getProduceLogData ;
5972import static org .apache .fluss .server .utils .ServerRpcMessageUtils .makeProduceLogResponse ;
73+ import static org .apache .fluss .server .utils .ServerRpcMessageUtils .makePutKvResponse ;
74+ import static org .apache .fluss .testutils .DataTestUtils .compactedRow ;
6075import static org .apache .fluss .testutils .DataTestUtils .row ;
6176import static org .apache .fluss .testutils .common .CommonTestUtils .retry ;
6277import static org .assertj .core .api .Assertions .assertThat ;
@@ -93,7 +108,7 @@ void testSimple() throws Exception {
93108 appendToAccumulator (tb1 , row (1 , "a" ), future ::complete );
94109 sender .runOnce ();
95110 assertThat (sender .numOfInFlightBatches (tb1 )).isEqualTo (1 );
96- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
111+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
97112
98113 sender .runOnce ();
99114 assertThat (sender .numOfInFlightBatches (tb1 )).isEqualTo (0 );
@@ -118,7 +133,7 @@ void testRetries() throws Exception {
118133 sender1 .runOnce ();
119134 assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (1 );
120135 long offset = 0 ;
121- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
136+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
122137
123138 sender1 .runOnce ();
124139 assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (0 );
@@ -131,13 +146,13 @@ void testRetries() throws Exception {
131146 assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (1 );
132147
133148 // timeout error can retry send.
134- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
149+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
135150 sender1 .runOnce ();
136151 assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (1 );
137152
138153 // Even if timeout error can retry send, but the retry number > maxRetries, which will
139154 // return error.
140- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
155+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
141156 sender1 .runOnce ();
142157 assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (0 );
143158 assertThat (future .get ())
@@ -168,12 +183,12 @@ void testCanRetryWithoutIdempotence() throws Exception {
168183 assertThat (firstRequest ).isInstanceOf (ProduceLogRequest .class );
169184 assertThat (hasIdempotentRecords (tb1 , (ProduceLogRequest ) firstRequest )).isFalse ();
170185 // first complete with retriable error.
171- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
186+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
172187 sender .runOnce ();
173188 assertThat (future .isDone ()).isFalse ();
174189
175190 // second retry complete.
176- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , 0L , 1L ));
191+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , 0L , 1L ));
177192 sender .runOnce ();
178193 assertThat (future .isDone ()).isTrue ();
179194 assertThat (future .get ()).isNull ();
@@ -691,17 +706,51 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
691706 sender .runOnce ();
692707 assertThat (sender .numOfInFlightBatches (tb1 )).isEqualTo (1 );
693708
694- finishProduceLogRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
709+ finishRequest (tb1 , 0 , createProduceLogResponse (tb1 , offset , 1 ));
695710
696711 // send again, should send nothing since no batch in queue
697712 sender .runOnce ();
698713 assertThat (sender .numOfInFlightBatches (tb1 )).isEqualTo (0 );
699714 assertThat (future .get ()).isNull ();
700715 }
701716
717+ @ Test
718+ void testRetryPutKeyWithSchemaNotExistException () throws Exception {
719+ TableBucket tableBucket = new TableBucket (DATA1_TABLE_ID_PK , 0 );
720+
721+ BinaryRow row = compactedRow (DATA1_ROW_TYPE , new Object [] {1 , "a" });
722+ int [] pkIndex = DATA1_SCHEMA_PK .getPrimaryKeyIndexes ();
723+ byte [] key = new CompactedKeyEncoder (DATA1_ROW_TYPE , pkIndex ).encodeKey (row );
724+ CompletableFuture <Exception > future = new CompletableFuture <>();
725+ accumulator .append (
726+ WriteRecord .forUpsert (
727+ DATA1_TABLE_INFO_PK ,
728+ PhysicalTablePath .of (DATA1_TABLE_PATH_PK ),
729+ row ,
730+ key ,
731+ key ,
732+ null ),
733+ future ::complete ,
734+ metadataUpdater .getCluster (),
735+ 0 ,
736+ false );
737+ sender .runOnce ();
738+ finishRequest (tableBucket , 0 , createPutKvResponse (tableBucket , SCHEMA_NOT_EXIST ));
739+ assertThat (sender .numOfInFlightBatches (tableBucket )).isEqualTo (0 );
740+
741+ // retry to put kv request again
742+ sender .runOnce ();
743+ assertThat (sender .numOfInFlightBatches (tableBucket )).isEqualTo (1 );
744+ finishRequest (tableBucket , 0 , createPutKvResponse (tableBucket , 1 ));
745+ assertThat (sender .numOfInFlightBatches (tableBucket )).isEqualTo (0 );
746+ assertThat (future .get ()).isNull ();
747+ }
748+
702749 private TestingMetadataUpdater initializeMetadataUpdater () {
703- return new TestingMetadataUpdater (
704- Collections .singletonMap (DATA1_TABLE_PATH , DATA1_TABLE_INFO ));
750+ Map <TablePath , TableInfo > tableInfos = new HashMap <>();
751+ tableInfos .put (DATA1_TABLE_PATH , DATA1_TABLE_INFO );
752+ tableInfos .put (DATA1_TABLE_PATH_PK , DATA1_TABLE_INFO_PK );
753+ return new TestingMetadataUpdater (tableInfos );
705754 }
706755
707756 private void appendToAccumulator (TableBucket tb , GenericRow row , WriteCallback writeCallback )
@@ -722,7 +771,7 @@ private ApiMessage getRequest(TableBucket tb, int index) {
722771 return gateway .getRequest (index );
723772 }
724773
725- private void finishProduceLogRequest (TableBucket tb , int index , ProduceLogResponse response ) {
774+ private void finishRequest (TableBucket tb , int index , ApiMessage response ) {
726775 TestTabletServerGateway gateway =
727776 (TestTabletServerGateway )
728777 metadataUpdater .newTabletServerClientForNode (
@@ -763,6 +812,16 @@ private ProduceLogResponse createProduceLogResponse(TableBucket tb, Errors error
763812 Collections .singletonList (new ProduceLogResultForBucket (tb , error .toApiError ())));
764813 }
765814
815+ private PutKvResponse createPutKvResponse (TableBucket tb , long endOffset ) {
816+ return makePutKvResponse (
817+ Collections .singletonList (new PutKvResultForBucket (tb , endOffset )));
818+ }
819+
820+ private PutKvResponse createPutKvResponse (TableBucket tb , Errors error ) {
821+ return makePutKvResponse (
822+ Collections .singletonList (new PutKvResultForBucket (tb , error .toApiError ())));
823+ }
824+
766825 private Sender setupWithIdempotenceState () {
767826 return setupWithIdempotenceState (createIdempotenceManager (false ));
768827 }
0 commit comments