Skip to content

Commit cfd685b

Browse files
committed
[core] Refactor scalar index eval and apply pre filter to vector search
1 parent 0933636 commit cfd685b

File tree

19 files changed

+827
-556
lines changed

19 files changed

+827
-556
lines changed

paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@
2121
import org.apache.paimon.predicate.FieldRef;
2222
import org.apache.paimon.predicate.VectorSearch;
2323

24+
import javax.annotation.Nullable;
25+
2426
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.Iterator;
2529
import java.util.List;
2630
import java.util.Optional;
31+
import java.util.concurrent.ExecutorService;
2732
import java.util.function.Function;
33+
import java.util.stream.Collectors;
34+
35+
import static java.util.Collections.singletonList;
36+
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
2837

2938
/**
3039
* A {@link GlobalIndexReader} that combines results from multiple readers by performing a union
@@ -33,9 +42,16 @@
3342
public class UnionGlobalIndexReader implements GlobalIndexReader {
3443

3544
private final List<GlobalIndexReader> readers;
45+
private final @Nullable ExecutorService executor;
3646

3747
public UnionGlobalIndexReader(List<GlobalIndexReader> readers) {
48+
this(readers, null);
49+
}
50+
51+
public UnionGlobalIndexReader(
52+
List<GlobalIndexReader> readers, @Nullable ExecutorService executor) {
3853
this.readers = readers;
54+
this.executor = executor;
3955
}
4056

4157
@Override
@@ -116,8 +132,9 @@ public Optional<GlobalIndexResult> visitBetween(FieldRef fieldRef, Object from,
116132
@Override
117133
public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch vectorSearch) {
118134
Optional<ScoredGlobalIndexResult> result = Optional.empty();
119-
for (GlobalIndexReader reader : readers) {
120-
Optional<ScoredGlobalIndexResult> current = reader.visitVectorSearch(vectorSearch);
135+
List<Optional<ScoredGlobalIndexResult>> results =
136+
executeAllReaders(reader -> reader.visitVectorSearch(vectorSearch));
137+
for (Optional<ScoredGlobalIndexResult> current : results) {
121138
if (!current.isPresent()) {
122139
continue;
123140
}
@@ -132,8 +149,8 @@ public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch vectorSe
132149
private Optional<GlobalIndexResult> union(
133150
Function<GlobalIndexReader, Optional<GlobalIndexResult>> visitor) {
134151
Optional<GlobalIndexResult> result = Optional.empty();
135-
for (GlobalIndexReader reader : readers) {
136-
Optional<GlobalIndexResult> current = visitor.apply(reader);
152+
List<Optional<GlobalIndexResult>> results = executeAllReaders(visitor);
153+
for (Optional<GlobalIndexResult> current : results) {
137154
if (!current.isPresent()) {
138155
continue;
139156
}
@@ -145,6 +162,21 @@ private Optional<GlobalIndexResult> union(
145162
return result;
146163
}
147164

165+
private <R> List<R> executeAllReaders(Function<GlobalIndexReader, R> function) {
166+
if (executor == null) {
167+
return readers.stream().map(function).collect(Collectors.toList());
168+
}
169+
170+
Iterator<R> iterator =
171+
randomlyExecuteSequentialReturn(
172+
executor, reader -> singletonList(function.apply(reader)), readers);
173+
List<R> result = new ArrayList<>();
174+
while (iterator.hasNext()) {
175+
result.add(iterator.next());
176+
}
177+
return result;
178+
}
179+
148180
@Override
149181
public void close() throws IOException {
150182
for (GlobalIndexReader reader : readers) {

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.paimon.format.FileFormat;
2727
import org.apache.paimon.fs.FileIO;
2828
import org.apache.paimon.fs.Path;
29-
import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
30-
import org.apache.paimon.globalindex.GlobalIndexScanBuilderImpl;
3129
import org.apache.paimon.iceberg.IcebergCommitCallback;
3230
import org.apache.paimon.iceberg.IcebergOptions;
3331
import org.apache.paimon.index.IndexFileHandler;
@@ -523,15 +521,4 @@ public void setManifestCache(SegmentsCache<Path> manifestCache) {
523521
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
524522
this.snapshotCache = cache;
525523
}
526-
527-
@Override
528-
public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
529-
return new GlobalIndexScanBuilderImpl(
530-
options.toConfiguration(),
531-
schema.logicalRowType(),
532-
fileIO,
533-
pathFactory().globalIndexFileFactory(),
534-
snapshotManager(),
535-
newIndexFileHandler());
536-
}
537524
}

paimon-core/src/main/java/org/apache/paimon/FileStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.paimon;
2020

2121
import org.apache.paimon.fs.Path;
22-
import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
2322
import org.apache.paimon.index.IndexFileHandler;
2423
import org.apache.paimon.manifest.IndexManifestFile;
2524
import org.apache.paimon.manifest.ManifestFile;
@@ -125,6 +124,4 @@ PartitionExpire newPartitionExpire(
125124
void setManifestCache(SegmentsCache<Path> manifestCache);
126125

127126
void setSnapshotCache(Cache<Path, Snapshot> cache);
128-
129-
GlobalIndexScanBuilder newGlobalIndexScanBuilder();
130127
}

paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.globalindex;
2020

21-
import org.apache.paimon.Snapshot;
21+
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.annotation.VisibleForTesting;
2323
import org.apache.paimon.data.BinaryRow;
2424
import org.apache.paimon.io.DataFileMeta;
@@ -36,23 +36,21 @@
3636
import org.apache.paimon.table.source.DataTableScan;
3737
import org.apache.paimon.table.source.InnerTableScan;
3838
import org.apache.paimon.table.source.Split;
39-
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
4039
import org.apache.paimon.types.RowType;
4140
import org.apache.paimon.utils.Filter;
4241
import org.apache.paimon.utils.Range;
4342
import org.apache.paimon.utils.RowRangeIndex;
4443

4544
import javax.annotation.Nullable;
4645

46+
import java.io.IOException;
4747
import java.util.ArrayList;
4848
import java.util.Collections;
4949
import java.util.List;
5050
import java.util.Map;
51-
import java.util.Objects;
5251
import java.util.Optional;
5352
import java.util.function.Function;
5453

55-
import static org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
5654
import static org.apache.paimon.table.SpecialFields.ROW_ID;
5755
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
5856

@@ -263,39 +261,18 @@ private Optional<GlobalIndexResult> evalGlobalIndex() {
263261
if (filter == null) {
264262
return Optional.empty();
265263
}
266-
if (!table.coreOptions().globalIndexEnabled()) {
264+
CoreOptions options = table.coreOptions();
265+
if (!options.globalIndexEnabled()) {
267266
return Optional.empty();
268267
}
269-
PartitionPredicate partitionPredicate =
268+
PartitionPredicate partitionFilter =
270269
batchScan.snapshotReader().manifestsReader().partitionFilter();
271-
GlobalIndexScanBuilder indexScanBuilder = table.store().newGlobalIndexScanBuilder();
272-
Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
273-
indexScanBuilder.withPartitionPredicate(partitionPredicate).withSnapshot(snapshot);
274-
List<Range> indexedRowRanges = indexScanBuilder.shardList();
275-
if (indexedRowRanges.isEmpty()) {
276-
return Optional.empty();
277-
}
278-
279-
Long nextRowId = Objects.requireNonNull(snapshot.nextRowId());
280-
List<Range> nonIndexedRowRanges = new Range(0, nextRowId - 1).exclude(indexedRowRanges);
281-
Optional<GlobalIndexResult> resultOptional =
282-
parallelScan(
283-
indexedRowRanges,
284-
indexScanBuilder,
285-
filter,
286-
table.coreOptions().globalIndexThreadNum());
287-
if (!resultOptional.isPresent()) {
288-
return Optional.empty();
270+
try (GlobalIndexScanner scanner =
271+
GlobalIndexScanner.create(table, partitionFilter, filter)) {
272+
return scanner.scan(filter);
273+
} catch (IOException e) {
274+
throw new RuntimeException(e);
289275
}
290-
291-
GlobalIndexResult result = resultOptional.get();
292-
if (!nonIndexedRowRanges.isEmpty()) {
293-
for (Range range : nonIndexedRowRanges) {
294-
result.or(GlobalIndexResult.fromRange(range));
295-
}
296-
}
297-
298-
return Optional.of(result);
299276
}
300277

301278
@VisibleForTesting

paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)