2121import com .alibaba .fluss .metadata .TableBucket ;
2222import com .alibaba .fluss .metadata .TableDescriptor ;
2323import com .alibaba .fluss .metadata .TableInfo ;
24+ import com .alibaba .fluss .metadata .TablePath ;
2425import com .alibaba .fluss .record .KvRecordBatch ;
2526import com .alibaba .fluss .record .LogRecords ;
2627import com .alibaba .fluss .rpc .entity .FetchLogResultForBucket ;
5960import static com .alibaba .fluss .record .TestData .DATA_1_WITH_KEY_AND_VALUE ;
6061import static com .alibaba .fluss .record .TestData .EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK ;
6162import static com .alibaba .fluss .server .testutils .KvTestUtils .assertLookupResponse ;
63+ import static com .alibaba .fluss .server .testutils .RpcMessageTestUtils .assertFetchLogResponseWithRowKind ;
64+ import static com .alibaba .fluss .server .testutils .RpcMessageTestUtils .createTable ;
65+ import static com .alibaba .fluss .server .testutils .RpcMessageTestUtils .newFetchLogRequest ;
6266import static com .alibaba .fluss .server .testutils .RpcMessageTestUtils .newLookupRequest ;
67+ import static com .alibaba .fluss .server .testutils .RpcMessageTestUtils .newPutKvRequest ;
6368import static com .alibaba .fluss .testutils .DataTestUtils .assertLogRecordsEquals ;
6469import static com .alibaba .fluss .testutils .DataTestUtils .assertLogRecordsEqualsWithRowKind ;
6570import static com .alibaba .fluss .testutils .DataTestUtils .genKvRecordBatch ;
@@ -103,7 +108,7 @@ void testProduceLogNeedAck() throws Exception {
103108 FLUSS_CLUSTER_EXTENSION .waitUtilAllGatewayHasSameMetadata ();
104109
105110 long tableId =
106- RpcMessageTestUtils . createTable (
111+ createTable (
107112 FLUSS_CLUSTER_EXTENSION ,
108113 DATA1_TABLE_PATH ,
109114 data1NonPkTableInfo .getTableDescriptor ());
@@ -131,9 +136,7 @@ void testProduceLogNeedAck() throws Exception {
131136
132137 // check leader log data.
133138 RpcMessageTestUtils .assertFetchLogResponse (
134- leaderGateWay
135- .fetchLog (RpcMessageTestUtils .newFetchLogRequest (-1 , tableId , bucketId , 0L ))
136- .get (),
139+ leaderGateWay .fetchLog (newFetchLogRequest (-1 , tableId , bucketId , 0L )).get (),
137140 tableId ,
138141 bucketId ,
139142 10L ,
@@ -178,17 +181,23 @@ void testProduceLogNeedAck() throws Exception {
178181 @ Test
179182 void testPutKvNeedAck () throws Exception {
180183 // set bucket count to 1 to easy for debug.
181- TableInfo data1PkTableInfo = createPkTable ();
184+ TablePath tp = TablePath .of ("test_db_1" , "test_pk_table_need_ack" );
185+ TableInfo data1PkTableInfo =
186+ new TableInfo (
187+ tp ,
188+ DATA1_TABLE_ID_PK ,
189+ TableDescriptor .builder ()
190+ .schema (DATA1_SCHEMA_PK )
191+ .distributedBy (1 , "a" )
192+ .build (),
193+ 1 );
182194
183195 // wait until all the gateway has same metadata because the follower fetcher manager need
184196 // to get the leader address from server metadata while make follower.
185197 FLUSS_CLUSTER_EXTENSION .waitUtilAllGatewayHasSameMetadata ();
186198
187199 long tableId =
188- RpcMessageTestUtils .createTable (
189- FLUSS_CLUSTER_EXTENSION ,
190- DATA1_TABLE_PATH_PK ,
191- data1PkTableInfo .getTableDescriptor ());
200+ createTable (FLUSS_CLUSTER_EXTENSION , tp , data1PkTableInfo .getTableDescriptor ());
192201 int bucketId = 0 ;
193202 TableBucket tb = new TableBucket (tableId , bucketId );
194203
@@ -202,7 +211,7 @@ void testPutKvNeedAck() throws Exception {
202211 assertPutKvResponse (
203212 leaderGateWay
204213 .putKv (
205- RpcMessageTestUtils . newPutKvRequest (
214+ newPutKvRequest (
206215 tableId ,
207216 bucketId ,
208217 -1 ,
@@ -211,10 +220,8 @@ void testPutKvNeedAck() throws Exception {
211220 bucketId );
212221
213222 // check leader log data.
214- RpcMessageTestUtils .assertFetchLogResponseWithRowKind (
215- leaderGateWay
216- .fetchLog (RpcMessageTestUtils .newFetchLogRequest (-1 , tableId , bucketId , 0L ))
217- .get (),
223+ assertFetchLogResponseWithRowKind (
224+ leaderGateWay .fetchLog (newFetchLogRequest (-1 , tableId , bucketId , 0L )).get (),
218225 tableId ,
219226 bucketId ,
220227 8L ,
@@ -263,7 +270,7 @@ void testFlushForPutKvNeedAck() throws Exception {
263270
264271 // create a table and wait all replica ready
265272 long tableId =
266- RpcMessageTestUtils . createTable (
273+ createTable (
267274 FLUSS_CLUSTER_EXTENSION ,
268275 DATA1_TABLE_PATH_PK ,
269276 data1PkTableInfo .getTableDescriptor ());
@@ -298,8 +305,7 @@ void testFlushForPutKvNeedAck() throws Exception {
298305 Tuple2 .of ("k2" , new Object [] {3 , "b1" }));
299306
300307 CompletableFuture <PutKvResponse > putResponse =
301- leaderGateWay .putKv (
302- RpcMessageTestUtils .newPutKvRequest (tableId , bucketId , -1 , kvRecords ));
308+ leaderGateWay .putKv (newPutKvRequest (tableId , bucketId , -1 , kvRecords ));
303309
304310 // wait util the log has been written
305311 Replica replica = FLUSS_CLUSTER_EXTENSION .waitAndGetLeaderReplica (tb );
0 commit comments