-
-
Notifications
You must be signed in to change notification settings - Fork 87
Open
Description
We have an ingestion pipeline that concurrently creates embeddings and updates nodes during this ingestions. Due to the high number of ConcurrentModificationException and also some data corruption issues, we use write locks for updating the nodes. But with this write lock, arcadedb freezes completely. It's probably a deadlock issue or something like that, but I assume that shouldn't happen. The expected behaviour for me would be the search to wait for the update to finish (which it doesn't).
You can reproduce with the following self contained test:
package com.acme;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.query.sql.executor.ResultSet;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Self-contained test to reproduce the deadlock issue when concurrent
* embedding writes (ingestion) and vector search (vectorNeighbors) occur.
*
* The deadlock occurs because:
* 1. Ingestion threads use db.executeInWriteLock() to write embeddings
* 2. vectorNeighbors() internally triggers LSMVectorIndex to rebuild and write the graph
* 3. These operations can deadlock when competing for locks
*
* Expected behavior: This test should hang/timeout, demonstrating the bug.
*/
public class LsmVectorConcurrencyDeadlockTest {
private Database db;
private static final String DB_PATH = "./data-test/lsm-vector-deadlock-test";
private static final int DIMENSIONS = 1024; // Realistic dimension like production
private static final int INITIAL_RECORDS = 100; // Pre-populate with some records
private static final int CONCURRENT_WRITERS = 3; // Simulate ingestion threads
private static final int WRITES_PER_THREAD = 50; // Number of embedding updates per thread
@BeforeEach
void setUp() {
deleteDirectory(new File(DB_PATH));
DatabaseFactory factory = new DatabaseFactory(DB_PATH);
db = factory.create();
// Create schema similar to production ContentV
db.command("sql", "CREATE VERTEX TYPE ContentV");
db.getSchema().getType("ContentV").createProperty("id", com.arcadedb.schema.Type.STRING);
db.getSchema().getType("ContentV").createProperty("content_type", com.arcadedb.schema.Type.STRING);
db.getSchema().getType("ContentV").createProperty("text", com.arcadedb.schema.Type.STRING);
db.getSchema().getType("ContentV").createProperty("embedding", com.arcadedb.schema.Type.ARRAY_OF_FLOATS);
// Create LSM_VECTOR index
db.command("sql", """
CREATE INDEX ON ContentV (embedding) LSM_VECTOR
METADATA {
"dimensions": %d,
"similarity": "COSINE"
}""".formatted(DIMENSIONS));
// Pre-populate with initial records (simulating existing indexed content)
System.out.println("Pre-populating with " + INITIAL_RECORDS + " records...");
Random random = new Random(42);
for (int i = 0; i < INITIAL_RECORDS; i++) {
float[] embedding = randomEmbedding(random);
final int idx = i;
db.transaction(() -> {
db.command("sql",
"INSERT INTO ContentV SET id=?, content_type=?, text=?, embedding=?",
"initial-" + idx, "text", "Initial content " + idx, embedding);
});
}
System.out.println("Pre-population complete. Starting concurrency test...\n");
}
@AfterEach
void tearDown() {
if (db != null && db.isOpen()) {
db.close();
}
deleteDirectory(new File(DB_PATH));
}
/**
* This test reproduces the deadlock by running concurrent embedding writes
* while also performing vector searches. The test should timeout if the
* deadlock occurs.
*/
@Test
@Timeout(value = 60, unit = TimeUnit.SECONDS) // Should complete in <60s if no deadlock
void testConcurrentWriteAndSearchDeadlock() throws Exception {
System.out.println("=== DEADLOCK REPRODUCTION TEST ===");
System.out.println("This test simulates concurrent ingestion (embedding writes) and search.");
System.out.println("If it hangs/times out, the deadlock bug is reproduced.\n");
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_WRITERS + 1);
AtomicBoolean stopFlag = new AtomicBoolean(false);
AtomicInteger writeCount = new AtomicInteger(0);
AtomicInteger searchCount = new AtomicInteger(0);
CountDownLatch writersStarted = new CountDownLatch(CONCURRENT_WRITERS);
List<Future<?>> futures = new ArrayList<>();
// Start writer threads (simulating ingestion)
Random random = new Random();
for (int t = 0; t < CONCURRENT_WRITERS; t++) {
final int threadId = t;
futures.add(executor.submit(() -> {
writersStarted.countDown();
try {
for (int i = 0; i < WRITES_PER_THREAD && !stopFlag.get(); i++) {
float[] embedding = randomEmbedding(random);
final int idx = i;
// Use executeInWriteLock like production code does
db.executeInWriteLock(() -> {
db.transaction(() -> {
db.command("sql",
"INSERT INTO ContentV SET id=?, content_type=?, text=?, embedding=?",
"thread-" + threadId + "-" + idx, "text",
"Content from thread " + threadId + " item " + idx, embedding);
});
return null;
});
int count = writeCount.incrementAndGet();
if (count % 20 == 0) {
System.out.println("[WRITE] Completed " + count + " writes");
}
}
} catch (Exception e) {
System.err.println("[WRITE] Thread " + threadId + " error: " + e.getMessage());
}
}));
}
// Wait for writers to start
writersStarted.await();
System.out.println("All writer threads started. Starting search thread...\n");
// Start search thread (simulating semantic search during ingestion)
// No writelock - just plain query
futures.add(executor.submit(() -> {
try {
while (!stopFlag.get() && writeCount.get() < (CONCURRENT_WRITERS * WRITES_PER_THREAD)) {
float[] queryEmbedding = randomEmbedding(random);
System.out.println("[SEARCH] Starting vector search (writeCount=" + writeCount.get() + ")...");
long start = System.currentTimeMillis();
String sql = "SELECT vectorNeighbors('ContentV[embedding]', ?, 10) AS neighbors FROM ContentV LIMIT 1";
int neighborCount = 0;
try (ResultSet rs = db.query("sql", sql, queryEmbedding)) {
if (rs.hasNext()) {
Object n = rs.next().getProperty("neighbors");
if (n instanceof Iterable) {
for (Object ignored : (Iterable<?>) n) neighborCount++;
}
}
}
long elapsed = System.currentTimeMillis() - start;
int count = searchCount.incrementAndGet();
System.out.println("[SEARCH] Completed search #" + count + " in " + elapsed + "ms, found " + neighborCount + " neighbors");
// Small delay between searches
Thread.sleep(100);
}
} catch (Exception e) {
System.err.println("[SEARCH] Error: " + e.getMessage());
e.printStackTrace();
}
}));
// Wait for all tasks to complete
for (Future<?> f : futures) {
f.get(90, TimeUnit.SECONDS); // Extra buffer beyond test timeout
}
stopFlag.set(true);
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("\n=== TEST COMPLETED ===");
System.out.println("Total writes: " + writeCount.get());
System.out.println("Total searches: " + searchCount.get());
System.out.println("If you see this message, no deadlock occurred!");
}
private float[] randomEmbedding(Random random) {
float[] embedding = new float[DIMENSIONS];
float sum = 0;
for (int i = 0; i < DIMENSIONS; i++) {
embedding[i] = random.nextFloat() - 0.5f;
sum += embedding[i] * embedding[i];
}
// L2 normalize
float norm = (float) Math.sqrt(sum);
if (norm > 0) {
for (int i = 0; i < DIMENSIONS; i++) {
embedding[i] /= norm;
}
}
return embedding;
}
private void deleteDirectory(File dir) {
if (dir.exists()) {
File[] files = dir.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
} else {
file.delete();
}
}
}
dir.delete();
}
}
}