Skip to content

Commit 2aa5201

Browse files
michaeljmarshalldriftx
authored andcommitted
CNDB-16252: Add IndexComponentsImpl#tmpFileFor to allow vector CompactionGraph to index multiple columns (#2177)
Fixes riptano/cndb#16252 Add a new method to the `IndexComponents.ForWrite` interface named `tmpFileFor`. The new method is expected to create temporary files that are namespaced to the index build. This fixes an issue in the `CompactionGraph` where we incorrectly created temp files with the same name in such a way that they collided. Now the files have the column name and the build id, so they will be properly namespaced while maintaining some form of meaningful name.
1 parent f7144f6 commit 2aa5201

File tree

4 files changed

+37
-8
lines changed

4 files changed

+37
-8
lines changed

src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.cassandra.io.sstable.Component;
3939
import org.apache.cassandra.io.sstable.Descriptor;
4040
import org.apache.cassandra.io.sstable.SSTable;
41+
import org.apache.cassandra.io.util.File;
4142
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
4243

4344
/**
@@ -354,5 +355,14 @@ interface ForWrite extends ForRead
354355
* should be added to this writer after this call).
355356
*/
356357
void markComplete() throws IOException;
358+
359+
/**
360+
* Create a temporary {@link File} namespaced within the per index components. Repeated calls with the same
361+
* componentName will produce the same file.
362+
* @param componentName - unique name within the per index components
363+
* @return a temprory file for use during index construction
364+
* @throws IOException
365+
*/
366+
File tmpFileFor(String componentName) throws IOException;
357367
}
358368
}

src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,14 @@ public IndexComponent.ForWrite getForWrite(IndexComponentType component)
519519
return info;
520520
}
521521

522+
@Override
523+
public File tmpFileFor(String componentName) throws IOException
524+
{
525+
String name = context != null ? String.format("%s_%s_%s", buildId, context.getColumnName(), componentName)
526+
: String.format("%s_%s", buildId, componentName);
527+
return descriptor.tmpFileFor(new Component(SSTableFormat.Components.Types.CUSTOM, name));
528+
}
529+
522530
@Override
523531
public void forceDeleteAllComponents()
524532
{

src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@
9191
import org.apache.cassandra.index.sai.disk.vector.VectorPostings.CompactionVectorPostings;
9292
import org.apache.cassandra.index.sai.utils.LowPriorityThreadFactory;
9393
import org.apache.cassandra.index.sai.utils.SAICodecUtils;
94-
import org.apache.cassandra.io.sstable.Component;
95-
import org.apache.cassandra.io.sstable.Descriptor;
96-
import org.apache.cassandra.io.sstable.format.SSTableFormat;
97-
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
9894
import org.apache.cassandra.io.util.File;
9995
import org.apache.cassandra.io.util.FileUtils;
10096
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -191,8 +187,7 @@ public CompactionGraph(IndexComponents.ForWrite perIndexComponents, VectorCompre
191187
this.useSyntheticOrdinals = !V5OnDiskFormat.writeV5VectorPostings(context.version()) || !allRowsHaveVectors;
192188

193189
// the extension here is important to signal to CFS.scrubDataDirectories that it should be removed if present at restart
194-
Component tmpComponent = new Component(SSTableFormat.Components.Types.CUSTOM, "chronicle" + Descriptor.TMP_EXT);
195-
postingsFile = dd.fileFor(tmpComponent);
190+
postingsFile = perIndexComponents.tmpFileFor("postings_chonicle_map");
196191
postingsMap = ChronicleMapBuilder.of((Class<VectorFloat<?>>) (Class) VectorFloat.class, (Class<CompactionVectorPostings>) (Class) CompactionVectorPostings.class)
197192
.averageKeySize(dimension * Float.BYTES)
198193
.keySizeMarshaller(SizeMarshaller.constant((long) dimension * Float.BYTES))
@@ -203,8 +198,7 @@ public CompactionGraph(IndexComponents.ForWrite perIndexComponents, VectorCompre
203198
.createPersistedTo(postingsFile.toJavaIOFile());
204199

205200
// Formatted so that the full resolution vector is written at the ordinal * vector dimension offset
206-
Component vectorsByOrdinalComponent = new Component(Components.Types.CUSTOM, "vectors_by_ordinal");
207-
vectorsByOrdinalTmpFile = dd.tmpFileFor(vectorsByOrdinalComponent);
201+
vectorsByOrdinalTmpFile = perIndexComponents.tmpFileFor("vectors_by_ordinal");
208202
vectorsByOrdinalBufferedWriter = new BufferedRandomAccessWriter(vectorsByOrdinalTmpFile.toPath());
209203

210204
// VSTODO add LVQ

test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,4 +1163,21 @@ public void testRowIdIteratorClosedOnHasNextFailure() throws Throwable
11631163
closeCounter.disable();
11641164
}
11651165
}
1166+
1167+
@Test
1168+
public void testIndexingMultipleVectorColumns() throws Throwable
1169+
{
1170+
createTable("CREATE TABLE %s (pk int, val1 vector<float, 128>, val2 vector<float, 128>, PRIMARY KEY(pk))");
1171+
createIndex("CREATE CUSTOM INDEX ON %s(val1) USING 'StorageAttachedIndex'");
1172+
createIndex("CREATE CUSTOM INDEX ON %s(val2) USING 'StorageAttachedIndex'");
1173+
1174+
for (int i = 0; i < 2 * CassandraOnHeapGraph.MIN_PQ_ROWS; i++)
1175+
execute("INSERT INTO %s (pk, val1, val2) VALUES (?, ?, ?)", i, randomVectorBoxed(128), randomVectorBoxed(128));
1176+
1177+
runThenFlushThenCompact(() -> {
1178+
// Run a search on each as a sanity check
1179+
assertRowCount(execute("SELECT pk FROM %s ORDER BY val1 ANN OF ? LIMIT 10", randomVectorBoxed(128)), 10);
1180+
assertRowCount(execute("SELECT pk FROM %s ORDER BY val2 ANN OF ? LIMIT 10", randomVectorBoxed(128)), 10);
1181+
});
1182+
}
11661183
}

0 commit comments

Comments
 (0)