Skip to content

Commit 45d321d

Browse files
jimczithecoop
andauthored
Avoid using direct I/O during vector merges (#127406)
The vector file isn't reopened during merge operations, so relying on `IOContext` to disable direct I/O during merges is ineffective. This change updates the strategy to explicitly open the vector file twice when direct I/O is enabled: once for default reads and once for direct I/O. We then switch to the appropriate index input in `getMergeInstance`, following the same approach used by other formats. --------- Co-authored-by: Simon Cooper <[email protected]>
1 parent edc8da4 commit 45d321d

File tree

3 files changed

+107
-6
lines changed

3 files changed

+107
-6
lines changed

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsFormat.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
2424
import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
2525
import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
26+
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader;
2627
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsWriter;
2728
import org.apache.lucene.index.SegmentReadState;
2829
import org.apache.lucene.index.SegmentWriteState;
@@ -62,7 +63,16 @@ public FlatVectorsWriter fieldsWriter(SegmentWriteState state) throws IOExceptio
6263

6364
@Override
6465
public FlatVectorsReader fieldsReader(SegmentReadState state) throws IOException {
65-
return new DirectIOLucene99FlatVectorsReader(state, vectorsScorer);
66+
if (DirectIOLucene99FlatVectorsReader.shouldUseDirectIO(state)) {
67+
// Use mmap for merges and direct I/O for searches.
68+
// TODO: Open the mmap file with sequential access instead of random (current behavior).
69+
return new MergeReaderWrapper(
70+
new DirectIOLucene99FlatVectorsReader(state, vectorsScorer),
71+
new Lucene99FlatVectorsReader(state, vectorsScorer)
72+
);
73+
} else {
74+
return new Lucene99FlatVectorsReader(state, vectorsScorer);
75+
}
6676
}
6777

6878
@Override

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public DirectIOLucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScor
8585
}
8686
}
8787

88+
public static boolean shouldUseDirectIO(SegmentReadState state) {
89+
return USE_DIRECT_IO && state.directory instanceof DirectIOIndexInputSupplier;
90+
}
91+
8892
private int readMetadata(SegmentReadState state) throws IOException {
8993
String metaFileName = IndexFileNames.segmentFileName(
9094
state.segmentInfo.name,
@@ -122,11 +126,9 @@ private static IndexInput openDataInput(
122126
) throws IOException {
123127
String fileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, fileExtension);
124128
// use direct IO for accessing raw vector data for searches
125-
IndexInput in = USE_DIRECT_IO
126-
&& context.context() == IOContext.Context.DEFAULT
127-
&& state.directory instanceof DirectIOIndexInputSupplier did
128-
? did.openInputDirect(fileName, context)
129-
: state.directory.openInput(fileName, context);
129+
IndexInput in = USE_DIRECT_IO && state.directory instanceof DirectIOIndexInputSupplier did
130+
? did.openInputDirect(fileName, context)
131+
: state.directory.openInput(fileName, context);
130132
boolean success = false;
131133
try {
132134
int versionVectorData = CodecUtil.checkIndexHeader(
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.vectors.es818;
11+
12+
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
13+
import org.apache.lucene.index.ByteVectorValues;
14+
import org.apache.lucene.index.FloatVectorValues;
15+
import org.apache.lucene.search.KnnCollector;
16+
import org.apache.lucene.util.Accountable;
17+
import org.apache.lucene.util.Bits;
18+
import org.apache.lucene.util.hnsw.RandomVectorScorer;
19+
import org.elasticsearch.core.IOUtils;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
24+
class MergeReaderWrapper extends FlatVectorsReader {
25+
26+
private final FlatVectorsReader mainReader;
27+
private final FlatVectorsReader mergeReader;
28+
29+
protected MergeReaderWrapper(FlatVectorsReader mainReader, FlatVectorsReader mergeReader) {
30+
super(mainReader.getFlatVectorScorer());
31+
this.mainReader = mainReader;
32+
this.mergeReader = mergeReader;
33+
}
34+
35+
@Override
36+
public RandomVectorScorer getRandomVectorScorer(String field, float[] target) throws IOException {
37+
return mainReader.getRandomVectorScorer(field, target);
38+
}
39+
40+
@Override
41+
public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) throws IOException {
42+
return mainReader.getRandomVectorScorer(field, target);
43+
}
44+
45+
@Override
46+
public void checkIntegrity() throws IOException {
47+
mainReader.checkIntegrity();
48+
}
49+
50+
@Override
51+
public FloatVectorValues getFloatVectorValues(String field) throws IOException {
52+
return mainReader.getFloatVectorValues(field);
53+
}
54+
55+
@Override
56+
public ByteVectorValues getByteVectorValues(String field) throws IOException {
57+
return mainReader.getByteVectorValues(field);
58+
}
59+
60+
@Override
61+
public FlatVectorsReader getMergeInstance() {
62+
return mergeReader;
63+
}
64+
65+
@Override
66+
public long ramBytesUsed() {
67+
return mainReader.ramBytesUsed();
68+
}
69+
70+
@Override
71+
public Collection<Accountable> getChildResources() {
72+
return mainReader.getChildResources();
73+
}
74+
75+
@Override
76+
public void search(String field, float[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException {
77+
mainReader.search(field, target, knnCollector, acceptDocs);
78+
}
79+
80+
@Override
81+
public void search(String field, byte[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException {
82+
mainReader.search(field, target, knnCollector, acceptDocs);
83+
}
84+
85+
@Override
86+
public void close() throws IOException {
87+
IOUtils.close(mainReader, mergeReader);
88+
}
89+
}

0 commit comments

Comments
 (0)