Skip to content

Commit 7d93c02

Browse files
committed
feat(kv): obkv-client improve batch-get (#415)
1 parent 6e7ee61 commit 7d93c02

1 file changed

Lines changed: 69 additions & 26 deletions

File tree

  • camellia-redis-proxy/camellia-redis-proxy-extensions/camellia-redis-proxy-kv/camellia-redis-proxy-kv-obkv/src/main/java/com/netease/nim/camellia/redis/proxy/kv/obkv

camellia-redis-proxy/camellia-redis-proxy-extensions/camellia-redis-proxy-kv/camellia-redis-proxy-kv-obkv/src/main/java/com/netease/nim/camellia/redis/proxy/kv/obkv/OBKVClient.java

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import java.util.*;
25-
import java.util.concurrent.Future;
26-
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.ThreadPoolExecutor;
28-
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.*;
2926

3027
/**
3128
* Created by caojiajun on 2024/9/4
@@ -233,32 +230,78 @@ public boolean[] exists(int slot, byte[]... keys) {
233230
@Override
234231
public List<KeyValue> batchGet(int slot, byte[]... keys) {
235232
try {
236-
TableBatchOps batch = obTableClient.batch(tableName);
237-
for (byte[] key : keys) {
238-
batch.get(new Object[]{slot, key}, new String[]{"v"});
239-
}
240-
List<Object> objects = batch.executeWithResult();
241-
242-
List<KeyValue> list = new ArrayList<>(keys.length);
243-
int index = 0;
244-
for (Object object : objects) {
245-
MutationResult result = (MutationResult) object;
246-
Row operationRow = result.getOperationRow();
247-
Map<String, Object> map = operationRow.getMap();
248-
if (map == null || map.isEmpty()) {
249-
index ++;
250-
continue;
251-
} else {
252-
Object o = map.get("v");
253-
if (o == null) {
254-
index ++;
233+
if (keys.length > batchSplitSize) {
234+
List<List<byte[]>> split = split(batchSplitSize, keys);
235+
List<Future<List<KeyValue>>> futureList = new ArrayList<>(split.size());
236+
for (List<byte[]> subList : split) {
237+
Future<List<KeyValue>> future = executor.submit(() -> {
238+
try {
239+
TableBatchOps batch = obTableClient.batch(tableName);
240+
for (byte[] key : subList) {
241+
batch.get(new Object[]{slot, key}, new String[]{"v"});
242+
}
243+
List<Object> objects = batch.executeWithResult();
244+
245+
List<KeyValue> list = new ArrayList<>(subList.size());
246+
int index = 0;
247+
for (Object object : objects) {
248+
MutationResult result = (MutationResult) object;
249+
Row operationRow = result.getOperationRow();
250+
Map<String, Object> map = operationRow.getMap();
251+
if (map == null || map.isEmpty()) {
252+
index++;
253+
continue;
254+
} else {
255+
Object o = map.get("v");
256+
if (o == null) {
257+
index++;
258+
continue;
259+
}
260+
list.add(new KeyValue(subList.get(index), (byte[]) o));
261+
}
262+
index++;
263+
}
264+
return list;
265+
} catch (Exception e) {
266+
logger.error("batchGet error", e);
267+
throw new KvException(e);
268+
}
269+
});
270+
futureList.add(future);
271+
}
272+
List<KeyValue> result = new ArrayList<>();
273+
for (Future<List<KeyValue>> future : futureList) {
274+
result.addAll(future.get());
275+
}
276+
return result;
277+
} else {
278+
TableBatchOps batch = obTableClient.batch(tableName);
279+
for (byte[] key : keys) {
280+
batch.get(new Object[]{slot, key}, new String[]{"v"});
281+
}
282+
List<Object> objects = batch.executeWithResult();
283+
284+
List<KeyValue> list = new ArrayList<>(keys.length);
285+
int index = 0;
286+
for (Object object : objects) {
287+
MutationResult result = (MutationResult) object;
288+
Row operationRow = result.getOperationRow();
289+
Map<String, Object> map = operationRow.getMap();
290+
if (map == null || map.isEmpty()) {
291+
index++;
255292
continue;
293+
} else {
294+
Object o = map.get("v");
295+
if (o == null) {
296+
index++;
297+
continue;
298+
}
299+
list.add(new KeyValue(keys[index], (byte[]) o));
256300
}
257-
list.add(new KeyValue(keys[index], (byte[]) o));
301+
index++;
258302
}
259-
index ++;
303+
return list;
260304
}
261-
return list;
262305
} catch (Exception e) {
263306
logger.error("batchGet error", e);
264307
throw new KvException(e);

0 commit comments

Comments
 (0)