Skip to content

Commit cda12ef

Browse files
committed
implement the server side functionality
1 parent ecc47b6 commit cda12ef

File tree

8 files changed

+337
-4
lines changed

8 files changed

+337
-4
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,7 @@ private List<InternalRow> decodeFullScanResponses(
246246
for (CompletableFuture<FullScanResponse> responseFuture : responseFutures) {
247247
FullScanResponse response = responseFuture.join();
248248

249-
if (response.hasErrorCode()
250-
&& response.getErrorCode()
251-
!= org.apache.fluss.rpc.protocol.Errors.NONE.code()) {
249+
if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) {
252250
Errors err = Errors.forCode(response.getErrorCode());
253251
throw err.exception(
254252
response.hasErrorMessage() ? response.getErrorMessage() : err.message());
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.metadata.TableBucket;
23+
import org.apache.fluss.metadata.TableInfo;
24+
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.row.InternalRow;
26+
import org.apache.fluss.rpc.messages.FullScanRequest;
27+
import org.apache.fluss.rpc.messages.FullScanResponse;
28+
import org.apache.fluss.server.tablet.TestTabletServerGateway;
29+
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
32+
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.CompletableFuture;
37+
38+
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
39+
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
40+
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
41+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
42+
43+
/** Consolidated tests for PrimaryKeyLookuper covering snapshotAll() and snapshotAllPartition(). */
44+
class PrimaryKeyLookuperTest {
45+
46+
private TableInfo nonPartitionedPk;
47+
private TestingMetadataUpdater metadataUpdater;
48+
49+
@BeforeEach
50+
void setUp() {
51+
nonPartitionedPk = DATA1_TABLE_INFO_PK;
52+
Map<TablePath, TableInfo> tableInfos = new HashMap<>();
53+
tableInfos.put(DATA1_TABLE_PATH_PK, nonPartitionedPk);
54+
metadataUpdater = new TestingMetadataUpdater(tableInfos);
55+
}
56+
57+
@Test
58+
void testSnapshotAll() {
59+
LookupClient lookupClient = new LookupClient(new Configuration(), metadataUpdater);
60+
PrimaryKeyLookuper lookuper =
61+
new PrimaryKeyLookuper(nonPartitionedPk, metadataUpdater, lookupClient);
62+
63+
// Kick off snapshotAll which will enqueue FullScan requests to leaders (1,2,3)
64+
CompletableFuture<List<InternalRow>> future = lookuper.snapshotAll();
65+
66+
int leader = metadataUpdater.leaderFor(new TableBucket(DATA1_TABLE_ID_PK, 0));
67+
System.out.println(leader);
68+
TestTabletServerGateway gateway =
69+
(TestTabletServerGateway) metadataUpdater.newTabletServerClientForNode(leader);
70+
FullScanRequest request = new FullScanRequest().setTableId(nonPartitionedPk.getTableId());
71+
System.out.println(request);
72+
CompletableFuture<FullScanResponse> fullScanResponseCompletableFuture =
73+
gateway.fullScan(request);
74+
fullScanResponseCompletableFuture.join();
75+
76+
// // Prepare 10 records split across 3 leaders: 4 + 3 + 3
77+
// List<Object[]> all = DATA1; // already contains 10 rows matching the schema (a
78+
// INT, b STRING)
79+
// List<Object[]> part1 = all.subList(0, 4);
80+
// List<Object[]> part2 = all.subList(4, 7);
81+
// List<Object[]> part3 = all.subList(7, 10);
82+
//
83+
// respondWithRecords(1, part1);
84+
// respondWithRecords(2, part2);
85+
// respondWithRecords(3, part3);
86+
//
87+
// List<InternalRow> rows = future.join();
88+
// assertThat(rows).hasSize(10);
89+
}
90+
91+
@Test
92+
void snapshotAllPartition_throwsOnNonPartitionedTable() {
93+
LookupClient lookupClient = new LookupClient(new Configuration(), metadataUpdater);
94+
PrimaryKeyLookuper lookuper =
95+
new PrimaryKeyLookuper(nonPartitionedPk, metadataUpdater, lookupClient);
96+
assertThatThrownBy(() -> lookuper.snapshotAllPartition("p1").join())
97+
.hasMessageContaining("Table is not partitioned");
98+
}
99+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.rpc.messages;
19+
20+
import org.apache.fluss.rpc.protocol.Errors;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/** Serde tests for FULL_SCAN request/response. */
27+
class FullScanProtocolSerdeTest {
28+
29+
@Test
30+
void testRequestRoundTrip() {
31+
FullScanRequest req = new FullScanRequest().setTableId(123L).setPartitionId(456L);
32+
byte[] bytes = req.toByteArray();
33+
FullScanRequest parsed = new FullScanRequest();
34+
parsed.parseFrom(bytes);
35+
assertThat(parsed.hasTableId()).isTrue();
36+
assertThat(parsed.getTableId()).isEqualTo(123L);
37+
assertThat(parsed.hasPartitionId()).isTrue();
38+
assertThat(parsed.getPartitionId()).isEqualTo(456L);
39+
40+
FullScanRequest copy = new FullScanRequest().copyFrom(parsed);
41+
assertThat(copy.getTableId()).isEqualTo(123L);
42+
assertThat(copy.getPartitionId()).isEqualTo(456L);
43+
}
44+
45+
@Test
46+
void testResponseRoundTrip() {
47+
byte[] records = new byte[] {1, 2, 3, 4};
48+
FullScanResponse resp =
49+
new FullScanResponse()
50+
.setErrorCode(Errors.NONE.code())
51+
.setIsLogTable(false)
52+
.setRecords(records);
53+
54+
byte[] bytes = resp.toByteArray();
55+
FullScanResponse parsed = new FullScanResponse();
56+
parsed.parseFrom(bytes);
57+
58+
assertThat(parsed.hasErrorCode()).isTrue();
59+
assertThat(parsed.getErrorCode()).isEqualTo(Errors.NONE.code());
60+
assertThat(parsed.hasIsLogTable()).isTrue();
61+
assertThat(parsed.isIsLogTable()).isFalse();
62+
assertThat(parsed.hasRecords()).isTrue();
63+
assertThat(parsed.getRecordsSize()).isEqualTo(records.length);
64+
65+
FullScanResponse copy = new FullScanResponse().copyFrom(parsed);
66+
assertThat(copy.getRecords()).containsExactly(records);
67+
}
68+
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,15 @@ public List<byte[]> limitScan(int limit) throws IOException {
492492
});
493493
}
494494

495+
public List<byte[]> fullScan() {
496+
return inReadLock(
497+
kvLock,
498+
() -> {
499+
rocksDBKv.checkIfRocksDBClosed();
500+
return rocksDBKv.fullScan();
501+
});
502+
}
503+
495504
public KvBatchWriter createKvBatchWriter() {
496505
return rocksDBKv.newWriteBatch(writeBatchSize);
497506
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.rocksdb.RocksDB;
3030
import org.rocksdb.RocksDBException;
3131
import org.rocksdb.RocksIterator;
32+
import org.rocksdb.Snapshot;
3233
import org.rocksdb.WriteOptions;
3334

3435
import javax.annotation.Nullable;
@@ -141,6 +142,33 @@ public List<byte[]> limitScan(Integer limit) {
141142
return pkList;
142143
}
143144

145+
public List<byte[]> fullScan() {
146+
ArrayList<byte[]> values = new ArrayList<>();
147+
148+
// grab current snapshot
149+
Snapshot snapshot = db.getSnapshot();
150+
ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
151+
152+
RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions);
153+
try {
154+
iterator.seekToFirst();
155+
while (iterator.isValid()) {
156+
values.add(iterator.value());
157+
iterator.next();
158+
}
159+
} finally {
160+
try {
161+
readOptions.close();
162+
} finally {
163+
iterator.close();
164+
if (snapshot != null) {
165+
snapshot.close();
166+
}
167+
}
168+
}
169+
return values;
170+
}
171+
144172
public void put(byte[] key, byte[] value) throws IOException {
145173
try {
146174
db.put(writeOptions, key, value);

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,59 @@ public DefaultValueRecordBatch limitKvScan(int limit) {
11601160
});
11611161
}
11621162

1163+
public DefaultValueRecordBatch fullKvScan() {
1164+
if (!isKvTable()) {
1165+
throw new NonPrimaryKeyTableException(
1166+
"the primary key table not exists for " + tableBucket);
1167+
}
1168+
1169+
return inReadLock(
1170+
leaderIsrUpdateLock,
1171+
() -> {
1172+
try {
1173+
if (!isLeader()) {
1174+
throw new NotLeaderOrFollowerException(
1175+
String.format(
1176+
"Leader not local for bucket %s on tabletServer %d",
1177+
tableBucket, localTabletServerId));
1178+
}
1179+
checkNotNull(kvTablet, "KvTablet for the replica shouldn't be null.");
1180+
List<byte[]> values = kvTablet.fullScan();
1181+
DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder();
1182+
for (byte[] v : values) {
1183+
builder.append(v);
1184+
}
1185+
return builder.build();
1186+
} catch (IOException e) {
1187+
String errorMsg =
1188+
String.format(
1189+
"Failed to full scan from local kv for table bucket %s, the cause is: %s",
1190+
tableBucket, e.getMessage());
1191+
LOG.error(errorMsg, e);
1192+
throw new KvStorageException(errorMsg, e);
1193+
}
1194+
});
1195+
}
1196+
1197+
public List<byte[]> fullKvScanRaw() {
1198+
if (!isKvTable()) {
1199+
throw new NonPrimaryKeyTableException(
1200+
"the primary key table not exists for " + tableBucket);
1201+
}
1202+
return inReadLock(
1203+
leaderIsrUpdateLock,
1204+
() -> {
1205+
if (!isLeader()) {
1206+
throw new NotLeaderOrFollowerException(
1207+
String.format(
1208+
"Leader not local for bucket %s on tabletServer %d",
1209+
tableBucket, localTabletServerId));
1210+
}
1211+
checkNotNull(kvTablet, "KvTablet for the replica shouldn't be null.");
1212+
return kvTablet.fullScan();
1213+
});
1214+
}
1215+
11631216
public LogRecords limitLogScan(int limit) {
11641217
return inReadLock(
11651218
leaderIsrUpdateLock,

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.fluss.metadata.TableInfo;
3535
import org.apache.fluss.metadata.TablePath;
3636
import org.apache.fluss.metrics.MetricNames;
37+
import org.apache.fluss.record.DefaultValueRecordBatch;
3738
import org.apache.fluss.record.KvRecordBatch;
3839
import org.apache.fluss.record.MemoryLogRecords;
3940
import org.apache.fluss.remote.RemoteLogFetchInfo;
@@ -997,6 +998,55 @@ public void limitScan(
997998
responseCallback.accept(limitScanResultForBucket);
998999
}
9991000

1001+
public DefaultValueRecordBatch fullScan(long tableId, @Nullable Long partitionId) {
1002+
long start = System.currentTimeMillis();
1003+
List<Replica> leaderReplicas =
1004+
getOnlineReplicaList().stream()
1005+
.filter(Replica::isKvTable)
1006+
.filter(Replica::isLeader)
1007+
.filter(r -> r.getTableBucket().getTableId() == tableId)
1008+
.filter(
1009+
r ->
1010+
(partitionId == null)
1011+
? r.getTableBucket().getPartitionId() == null
1012+
: partitionId.equals(
1013+
r.getTableBucket().getPartitionId()))
1014+
.collect(Collectors.toList());
1015+
1016+
try {
1017+
DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder();
1018+
int bucketCount = 0;
1019+
int valueCount = 0;
1020+
1021+
for (Replica replica : leaderReplicas) {
1022+
List<byte[]> values = replica.fullKvScanRaw();
1023+
for (byte[] value : values) {
1024+
builder.append(value);
1025+
}
1026+
valueCount += values.size();
1027+
bucketCount++;
1028+
}
1029+
long elapsed = System.currentTimeMillis() - start;
1030+
if (bucketCount > 0) {
1031+
LOG.info(
1032+
"Full-scan success: tableId={}, partitionId={}, buckets_scanned={}, values={}, elapsed_ms={}",
1033+
tableId,
1034+
partitionId,
1035+
bucketCount,
1036+
valueCount,
1037+
elapsed);
1038+
}
1039+
return builder.build();
1040+
} catch (IOException e) {
1041+
LOG.error(
1042+
"Error during fullScan aggregation for table {}, partition {}",
1043+
tableId,
1044+
partitionId,
1045+
e);
1046+
throw new RuntimeException(e);
1047+
}
1048+
}
1049+
10001050
public Map<TableBucket, LogReadResult> readFromLog(
10011051
FetchParams fetchParams, Map<TableBucket, FetchReqInfo> bucketFetchInfo) {
10021052
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();

0 commit comments

Comments
 (0)