Skip to content

Commit b1382ea

Browse files
author
Jonathan Ellis
authored
V5 one-to-many fixes (CNDB-10737) (#1268)
* assertion compares with getIdUpperBound instead of size. fixes cndb-10710 * add test of one-to-many CompactionGraph * re-serialize CompactionVectorPostings in postingsMap after adding a row to it * OrdinalMapper needs to return OMITTED where holes are present * parameterize VectorLocalTest to cover Versions CA, DC * remove redundant (duplicated) empty table test * easy test flushing empty index * add testOneToManyCompactionTooManyHoles * use correct value of max degree in FusedADC * create a separate path for compaction that doesn't renumber vectors that have already been written to disk * refactors RemappedPostings constructors to static methods that are more clear about memtable vs compaction usage
1 parent 758b110 commit b1382ea

File tree

11 files changed

+444
-118
lines changed

11 files changed

+444
-118
lines changed

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.Iterator;
2424
import java.util.List;
25-
import java.util.Optional;
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.function.BooleanSupplier;
2827
import javax.annotation.concurrent.NotThreadSafe;
@@ -44,6 +43,7 @@
4443
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
4544
import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher;
4645
import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat;
46+
import org.apache.cassandra.index.sai.disk.v5.V5VectorIndexSearcher;
4747
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter;
4848
import org.apache.cassandra.index.sai.disk.vector.CassandraDiskAnn;
4949
import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph;
@@ -381,13 +381,15 @@ private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexCon
381381
for (Segment segment : index.getSegments())
382382
{
383383
segmentsChecked++;
384-
var searcher = (V2VectorIndexSearcher) segment.getIndexSearcher();
384+
if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher)
385+
return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph
386+
var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher();
385387
var structure = searcher.getPostingsStructure();
386388
if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY)
387389
return false;
388390
}
389391
}
390-
return segmentsChecked != 0;
392+
return true;
391393
}
392394

393395
private CassandraOnHeapGraph.PqInfo maybeReadPqFromLastSegment() throws IOException

src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorPostingsWriter.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,16 +203,21 @@ private static <T> Pair<BiMap<Integer, Integer>, Integer> buildOrdinalMap(Map<Ve
203203
return Pair.create(ordinalMap, maxRow);
204204
}
205205

206-
public static <T> V5VectorPostingsWriter.RemappedPostings remapPostings(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap,
207-
boolean containsDeletes)
206+
public static <T> V5VectorPostingsWriter.RemappedPostings remapForMemtable(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap,
207+
boolean containsDeletes)
208208
{
209209
var p = buildOrdinalMap(postingsMap);
210210
int maxNewOrdinal = postingsMap.size() - 1; // no in-graph deletes in v2
211211
if (p == null || containsDeletes)
212-
return V5VectorPostingsWriter.createGenericV2Mapping(postingsMap);
212+
return V5VectorPostingsWriter.createGenericIdentityMapping(postingsMap);
213213

214214
var ordinalMap = p.left;
215215
var maxRow = p.right;
216-
return new V5VectorPostingsWriter.RemappedPostings(V5VectorPostingsWriter.Structure.ONE_TO_ONE, maxNewOrdinal, maxRow, ordinalMap, new Int2IntHashMap(Integer.MIN_VALUE));
216+
return new V5VectorPostingsWriter.RemappedPostings(V5VectorPostingsWriter.Structure.ONE_TO_ONE,
217+
maxNewOrdinal,
218+
maxRow,
219+
ordinalMap,
220+
new Int2IntHashMap(Integer.MIN_VALUE),
221+
new V5VectorPostingsWriter.BiMapMapper(maxNewOrdinal, ordinalMap));
217222
}
218223
}

src/java/org/apache/cassandra/index/sai/disk/v5/V5VectorPostingsWriter.java

Lines changed: 150 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import java.util.Arrays;
2323
import java.util.Map;
2424
import java.util.Set;
25+
import java.util.function.IntPredicate;
2526
import java.util.function.IntUnaryOperator;
26-
import java.util.stream.Collectors;
2727
import java.util.stream.IntStream;
28+
import javax.annotation.Nullable;
2829

2930
import com.google.common.annotations.VisibleForTesting;
3031
import com.google.common.collect.BiMap;
@@ -34,6 +35,7 @@
3435

3536
import io.github.jbellis.jvector.graph.RandomAccessVectorValues;
3637
import io.github.jbellis.jvector.graph.disk.OrdinalMapper;
38+
import io.github.jbellis.jvector.util.FixedBitSet;
3739
import io.github.jbellis.jvector.vector.types.VectorFloat;
3840
import org.agrona.collections.Int2IntHashMap;
3941
import org.agrona.collections.Int2ObjectHashMap;
@@ -101,12 +103,60 @@ public V5VectorPostingsWriter(RemappedPostings remappedPostings)
101103
this.remappedPostings = remappedPostings;
102104
}
103105

104-
public V5VectorPostingsWriter(Structure structure, int graphSize, Map<VectorFloat<?>, VectorPostings.CompactionVectorPostings> postingsMap)
106+
/**
107+
* This method describes the mapping done during construction of the graph so that we can easily create
108+
* an appropriate V5VectorPostingsWriter. No ordinal remapping is performed because (V5) compaction writes
109+
* vectors to disk as they are added to the graph, so there is no opportunity to reorder the way there is
110+
* in a Memtable index.
111+
*/
112+
public static RemappedPostings describeForCompaction(Structure structure, int graphSize, Map<VectorFloat<?>, VectorPostings.CompactionVectorPostings> postingsMap)
105113
{
114+
assert !postingsMap.isEmpty(); // flush+compact should skip writing an index component in this case
115+
106116
if (structure == Structure.ONE_TO_ONE)
107-
remappedPostings = new RemappedPostings(Structure.ONE_TO_ONE, graphSize - 1, graphSize - 1, null, null);
108-
else
109-
remappedPostings = remapPostings(postingsMap);
117+
{
118+
return new RemappedPostings(Structure.ONE_TO_ONE,
119+
graphSize - 1,
120+
graphSize - 1,
121+
null,
122+
null,
123+
new OrdinalMapper.IdentityMapper(graphSize - 1));
124+
}
125+
126+
if (structure == Structure.ONE_TO_MANY)
127+
{
128+
// compute maxOldOrdinal, maxRow, and extraOrdinals from the postingsMap
129+
int maxOldOrdinal = Integer.MIN_VALUE;
130+
int maxRow = Integer.MIN_VALUE;
131+
var extraOrdinals = new Int2IntHashMap(Integer.MIN_VALUE);
132+
for (var entry : postingsMap.entrySet())
133+
{
134+
var postings = entry.getValue();
135+
int ordinal = postings.getOrdinal();
136+
137+
maxOldOrdinal = Math.max(maxOldOrdinal, ordinal);
138+
var rowIds = postings.getRowIds();
139+
assert ordinal == rowIds.getInt(0); // synthetic ordinals not allowed in ONE_TO_MANY
140+
for (int i = 0; i < rowIds.size(); i++)
141+
{
142+
int rowId = rowIds.getInt(i);
143+
maxRow = Math.max(maxRow, rowId);
144+
if (i > 0)
145+
extraOrdinals.put(rowId, ordinal);
146+
}
147+
}
148+
149+
var skippedOrdinals = extraOrdinals.keySet();
150+
return new RemappedPostings(Structure.ONE_TO_MANY,
151+
maxOldOrdinal,
152+
maxRow,
153+
null,
154+
extraOrdinals,
155+
new OmissionAwareIdentityMapper(maxOldOrdinal, skippedOrdinals::contains));
156+
}
157+
158+
assert structure == Structure.ZERO_OR_ONE_TO_MANY : structure;
159+
return createGenericIdentityMapping(postingsMap);
110160
}
111161

112162
public long writePostings(SequentialWriter writer,
@@ -167,7 +217,6 @@ private void writeOneToManyOrdinalMapping(SequentialWriter writer) throws IOExce
167217
writer.writeInt(newOrdinal);
168218
writer.writeInt(0);
169219
entries++;
170-
assert !ordinalToExtraRowIds.containsKey(oldOrdinal);
171220
continue;
172221
}
173222

@@ -204,8 +253,8 @@ private void writeOneToManyRowIdMapping(SequentialWriter writer) throws IOExcept
204253
writer.writeInt(rowId);
205254
writer.writeInt(remappedPostings.ordinalMapper.oldToNew(originalOrdinal));
206255
// validate that we do in fact have contiguous rowids in the non-extra mapping
207-
for (int j = lastExtraRowId + 1; j < rowId; j++)
208-
assert remappedPostings.ordinalMap.inverse().containsKey(j);
256+
assert IntStream.range(lastExtraRowId + 1, rowId)
257+
.allMatch(j -> remappedPostings.ordinalMapper.newToOld(j) != OrdinalMapper.OMITTED) : "Non-contiguous rowids found in non-extra mapping";
209258
lastExtraRowId = rowId;
210259
}
211260

@@ -314,63 +363,32 @@ public static class RemappedPostings
314363
public final int maxNewOrdinal;
315364
/** the largest rowId in the postings (inclusive) */
316365
public final int maxRowId;
317-
/** map from original vector ordinal to rowId that will be its new, remapped ordinal */
318-
private final BiMap<Integer, Integer> ordinalMap;
319366
/** map from rowId to [original] vector ordinal */
367+
@Nullable
320368
private final Int2IntHashMap extraPostings;
321369
/** public api */
322370
public final OrdinalMapper ordinalMapper;
323371

324-
public RemappedPostings(Structure structure, int maxNewOrdinal, int maxRowId, BiMap<Integer, Integer> ordinalMap, Int2IntHashMap extraPostings)
372+
/** visible for V2VectorPostingsWriter.remapPostings, everyone else should use factory methods */
373+
public RemappedPostings(Structure structure, int maxNewOrdinal, int maxRowId, BiMap<Integer, Integer> ordinalMap, Int2IntHashMap extraPostings, OrdinalMapper ordinalMapper)
325374
{
326-
assert structure == Structure.ONE_TO_ONE || structure == Structure.ONE_TO_MANY;
327375
this.structure = structure;
328376
this.maxNewOrdinal = maxNewOrdinal;
329377
this.maxRowId = maxRowId;
330-
this.ordinalMap = ordinalMap;
331378
this.extraPostings = extraPostings;
332-
ordinalMapper = new OrdinalMapper()
333-
{
334-
@Override
335-
public int maxOrdinal()
336-
{
337-
return maxNewOrdinal;
338-
}
339-
340-
@Override
341-
public int oldToNew(int i)
342-
{
343-
return ordinalMap.get(i);
344-
}
345-
346-
@Override
347-
public int newToOld(int i)
348-
{
349-
return ordinalMap.inverse().getOrDefault(i, OMITTED);
350-
}
351-
};
352-
}
353-
354-
public RemappedPostings(int maxNewOrdinal, int maxRowId, Int2IntHashMap sequentialMap)
355-
{
356-
this.structure = Structure.ZERO_OR_ONE_TO_MANY;
357-
this.maxNewOrdinal = maxNewOrdinal;
358-
this.maxRowId = maxRowId;
359-
this.ordinalMap = null;
360-
this.extraPostings = null;
361-
ordinalMapper = new OrdinalMapper.MapMapper(sequentialMap);
379+
this.ordinalMapper = ordinalMapper;
362380
}
363381
}
364382

365383
/**
366384
* @see RemappedPostings
367385
*/
368-
public static <T> RemappedPostings remapPostings(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap)
386+
public static <T> RemappedPostings remapForMemtable(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap)
369387
{
370388
assert V5OnDiskFormat.writeV5VectorPostings();
371389

372390
BiMap<Integer, Integer> ordinalMap = HashBiMap.create();
373-
Int2IntHashMap extraPostings = new Int2IntHashMap(-1);
391+
Int2IntHashMap extraPostings = new Int2IntHashMap(Integer.MIN_VALUE);
374392
int minRow = Integer.MAX_VALUE;
375393
int maxRow = Integer.MIN_VALUE;
376394
int maxNewOrdinal = Integer.MIN_VALUE;
@@ -398,12 +416,13 @@ public static <T> RemappedPostings remapPostings(Map<VectorFloat<?>, ? extends V
398416
extraPostings.put(a[i], oldOrdinal);
399417
}
400418
}
419+
assert totalRowsAssigned == 0 || totalRowsAssigned <= maxRow + 1: "rowids are not unique -- " + totalRowsAssigned + " >= " + maxRow;
401420

402421
// derive the correct structure
403422
Structure structure;
404-
if (totalRowsAssigned > 0 && (minRow != 0 || totalRowsAssigned != maxRow + 1))
423+
if (totalRowsAssigned > 0 && (minRow != 0 || totalRowsAssigned < maxRow + 1))
405424
{
406-
logger.debug("Not all rows are assigned vectors, cannot remap");
425+
logger.debug("Not all rows are assigned vectors, cannot remap one-to-many");
407426
structure = Structure.ZERO_OR_ONE_TO_MANY;
408427
}
409428
else
@@ -419,32 +438,105 @@ public static <T> RemappedPostings remapPostings(Map<VectorFloat<?>, ? extends V
419438

420439
// create the mapping
421440
if (structure == Structure.ZERO_OR_ONE_TO_MANY)
422-
return createGenericMapping(ordinalMap.keySet(), maxOldOrdinal, maxRow);
423-
return new RemappedPostings(structure, maxNewOrdinal, maxRow, ordinalMap, extraPostings);
441+
return createGenericRenumberedMapping(ordinalMap.keySet(), maxOldOrdinal, maxRow);
442+
var ordinalMapper = new BiMapMapper(maxNewOrdinal, ordinalMap);
443+
return new RemappedPostings(structure, maxNewOrdinal, maxRow, ordinalMap, extraPostings, ordinalMapper);
424444
}
425445

426446
/**
427447
* return an exhaustive zero-to-many mapping with the live ordinals renumbered sequentially
428448
*/
429-
private static RemappedPostings createGenericMapping(Set<Integer> liveOrdinals, int maxOldOrdinal, int maxRow)
449+
private static RemappedPostings createGenericRenumberedMapping(Set<Integer> liveOrdinals, int maxOldOrdinal, int maxRow)
430450
{
431-
var sequentialMap = new Int2IntHashMap(maxOldOrdinal, 0.65f, Integer.MIN_VALUE);
451+
var oldToNew = new Int2IntHashMap(maxOldOrdinal, 0.65f, Integer.MIN_VALUE);
432452
int nextOrdinal = 0;
433453
for (int i = 0; i <= maxOldOrdinal; i++) {
434454
if (liveOrdinals.contains(i))
435-
sequentialMap.put(i, nextOrdinal++);
455+
oldToNew.put(i, nextOrdinal++);
436456
}
437-
return new RemappedPostings(nextOrdinal - 1, maxRow, sequentialMap);
457+
return new RemappedPostings(Structure.ZERO_OR_ONE_TO_MANY,
458+
nextOrdinal - 1,
459+
maxRow,
460+
null,
461+
null,
462+
new OrdinalMapper.MapMapper(oldToNew));
438463
}
439464

440465
/**
441-
* return an exhaustive zero-to-many mapping for v2 postings, which never contain missing ordinals
442-
* since deleted vectors are only removed from the index in its next compaction
466+
* return an exhaustive zero-to-many mapping with no renumbering
443467
*/
444-
public static <T> RemappedPostings createGenericV2Mapping(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap)
468+
public static <T> RemappedPostings createGenericIdentityMapping(Map<VectorFloat<?>, ? extends VectorPostings<T>> postingsMap)
445469
{
446-
int maxOldOrdinal = postingsMap.size() - 1;
470+
var maxOldOrdinal = postingsMap.values().stream().mapToInt(VectorPostings::getOrdinal).max().orElseThrow();
447471
int maxRow = postingsMap.values().stream().flatMap(p -> p.getRowIds().stream()).mapToInt(i -> i).max().orElseThrow();
448-
return createGenericMapping(IntStream.range(0, postingsMap.size()).boxed().collect(Collectors.toSet()), maxOldOrdinal, maxRow);
472+
var presentOrdinals = new FixedBitSet(maxOldOrdinal + 1);
473+
for (var entry : postingsMap.entrySet())
474+
presentOrdinals.set(entry.getValue().getOrdinal());
475+
return new RemappedPostings(Structure.ZERO_OR_ONE_TO_MANY,
476+
maxOldOrdinal,
477+
maxRow,
478+
null,
479+
null,
480+
new OmissionAwareIdentityMapper(maxOldOrdinal, i -> !presentOrdinals.get(i)));
481+
}
482+
483+
public static class BiMapMapper implements OrdinalMapper
484+
{
485+
private final int maxOrdinal;
486+
private final BiMap<Integer, Integer> ordinalMap;
487+
488+
public BiMapMapper(int maxNewOrdinal, BiMap<Integer, Integer> ordinalMap)
489+
{
490+
this.maxOrdinal = maxNewOrdinal;
491+
this.ordinalMap = ordinalMap;
492+
}
493+
494+
@Override
495+
public int maxOrdinal()
496+
{
497+
return maxOrdinal;
498+
}
499+
500+
@Override
501+
public int oldToNew(int i)
502+
{
503+
return ordinalMap.get(i);
504+
}
505+
506+
@Override
507+
public int newToOld(int i)
508+
{
509+
return ordinalMap.inverse().getOrDefault(i, OMITTED);
510+
}
511+
}
512+
513+
private static class OmissionAwareIdentityMapper implements OrdinalMapper
514+
{
515+
private final int maxVectorOrdinal;
516+
private final IntPredicate toSkip;
517+
518+
public OmissionAwareIdentityMapper(int maxVectorOrdinal, IntPredicate toSkip)
519+
{
520+
this.maxVectorOrdinal = maxVectorOrdinal;
521+
this.toSkip = toSkip;
522+
}
523+
524+
@Override
525+
public int maxOrdinal()
526+
{
527+
return maxVectorOrdinal;
528+
}
529+
530+
@Override
531+
public int oldToNew(int i)
532+
{
533+
return i;
534+
}
535+
536+
@Override
537+
public int newToOld(int i)
538+
{
539+
return toSkip.test(i) ? OrdinalMapper.OMITTED : i;
540+
}
449541
}
450542
}

src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Collection;
2727
import java.util.Comparator;
2828
import java.util.Map;
29-
import java.util.Optional;
3029
import java.util.concurrent.ConcurrentMap;
3130
import java.util.concurrent.ConcurrentSkipListMap;
3231
import java.util.concurrent.atomic.AtomicInteger;
@@ -395,14 +394,14 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
395394
deletedOrdinals.stream().parallel().forEach(builder::markNodeDeleted);
396395
deletedOrdinals.clear();
397396
builder.cleanup();
398-
remappedPostings = V5VectorPostingsWriter.remapPostings(postingsMap);
397+
remappedPostings = V5VectorPostingsWriter.remapForMemtable(postingsMap);
399398
}
400399
else
401400
{
402401
assert postingsMap.keySet().size() == vectorValues.size() : String.format("postings map entry count %d != vector count %d",
403402
postingsMap.keySet().size(), vectorValues.size());
404403
builder.cleanup();
405-
remappedPostings = V2VectorPostingsWriter.remapPostings(postingsMap, !deletedOrdinals.isEmpty());
404+
remappedPostings = V2VectorPostingsWriter.remapForMemtable(postingsMap, !deletedOrdinals.isEmpty());
406405
}
407406

408407
OrdinalMapper ordinalMapper = remappedPostings.ordinalMapper;

0 commit comments

Comments
 (0)