Skip to content

Commit 55be00e

Browse files
committed
fix bug of insert when dynamic memory+compact+rebuild
Signed-off-by: eric-epsilla <[email protected]>
1 parent 4fadcaa commit 55be00e

19 files changed

+1178
-105
lines changed

engine/.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,10 @@
11
build
2+
.vscode
3+
.idea
4+
*.swp
5+
*.swo
6+
*.o
7+
*.so
8+
*.a
9+
.DS_Store
10+
Thumbs.db

engine/CMakeLists.txt

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,34 +49,38 @@ file(GLOB UTILS_FILES "utils/*.cpp")
4949
list(APPEND LIB_FILES ${UTILS_FILES})
5050

5151
# Add source files in the server directory
52-
file(GLOB UTILS_FILES "server/*.cpp")
53-
list(APPEND LIB_FILES ${UTILS_FILES})
52+
file(GLOB SERVER_FILES "server/*.cpp")
53+
list(APPEND LIB_FILES ${SERVER_FILES})
5454

5555
# Add source files in the services directory
56-
file(GLOB UTILS_FILES "services/*.cpp")
57-
list(APPEND LIB_FILES ${UTILS_FILES})
56+
file(GLOB SERVICES_FILES "services/*.cpp")
57+
list(APPEND LIB_FILES ${SERVICES_FILES})
5858

5959
# Add source files in the config directory
60-
file(GLOB UTILS_FILES "config/*.cpp")
61-
list(APPEND LIB_FILES ${UTILS_FILES})
60+
file(GLOB CONFIG_FILES "config/*.cpp")
61+
list(APPEND LIB_FILES ${CONFIG_FILES})
6262

6363
# Add source files in the db directory
64-
file(GLOB UTILS_FILES "db/*.cpp")
65-
list(APPEND LIB_FILES ${UTILS_FILES})
66-
file(GLOB UTILS_FILES "db/catalog/*.cpp")
67-
list(APPEND LIB_FILES ${UTILS_FILES})
68-
file(GLOB UTILS_FILES "db/execution/*.cpp")
69-
list(APPEND LIB_FILES ${UTILS_FILES})
70-
file(GLOB UTILS_FILES "db/index/*.cpp")
71-
# 排除有问题的 distances.cpp,使用 distance_simd.cpp
72-
list(REMOVE_ITEM UTILS_FILES "${CMAKE_CURRENT_SOURCE_DIR}/db/index/distances.cpp")
73-
list(APPEND LIB_FILES ${UTILS_FILES})
74-
file(GLOB UTILS_FILES "db/index/nsg/*.cpp")
75-
list(APPEND LIB_FILES ${UTILS_FILES})
76-
file(GLOB UTILS_FILES "db/index/spatial/*.cpp")
77-
list(APPEND LIB_FILES ${UTILS_FILES})
78-
file(GLOB UTILS_FILES "db/wal/*.cpp")
79-
list(APPEND LIB_FILES ${UTILS_FILES})
64+
file(GLOB DB_FILES "db/*.cpp")
65+
list(APPEND LIB_FILES ${DB_FILES})
66+
file(GLOB DB_CATALOG_FILES "db/catalog/*.cpp")
67+
list(APPEND LIB_FILES ${DB_CATALOG_FILES})
68+
file(GLOB DB_EXECUTION_FILES "db/execution/*.cpp")
69+
list(APPEND LIB_FILES ${DB_EXECUTION_FILES})
70+
file(GLOB DB_INDEX_FILES "db/index/*.cpp")
71+
# Exclude distances.cpp (legacy BLAS-dependent implementation)
72+
# Use distance_simd.cpp instead which provides:
73+
# - Self-contained SIMD optimizations (AVX2, NEON)
74+
# - Pure C++ fallback for platforms without SIMD
75+
# - No external BLAS dependency
76+
list(REMOVE_ITEM DB_INDEX_FILES "${CMAKE_CURRENT_SOURCE_DIR}/db/index/distances.cpp")
77+
list(APPEND LIB_FILES ${DB_INDEX_FILES})
78+
file(GLOB DB_INDEX_NSG_FILES "db/index/nsg/*.cpp")
79+
list(APPEND LIB_FILES ${DB_INDEX_NSG_FILES})
80+
file(GLOB DB_INDEX_SPATIAL_FILES "db/index/spatial/*.cpp")
81+
list(APPEND LIB_FILES ${DB_INDEX_SPATIAL_FILES})
82+
file(GLOB DB_WAL_FILES "db/wal/*.cpp")
83+
list(APPEND LIB_FILES ${DB_WAL_FILES})
8084

8185
# Add source files in sub dir db_server
8286
file(GLOB_RECURSE DB_SERVER_FILES "server/db_server/*")

engine/Dockerfile

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,6 @@ RUN scripts/install_oatpp_modules.sh
66
RUN mkdir -p /vectordb/build && cd /vectordb/build && cmake .. && make && chmod +x vectordb && curl -o /usr/local/bin/geesefs -L https://github.com/yandex-cloud/geesefs/releases/latest/download/geesefs-linux-amd64 && \
77
chmod +x /usr/local/bin/geesefs && /usr/local/bin/geesefs -v
88

9-
10-
FROM epsilla/base
11-
ARG TARGETARCH
12-
ARG RELEASE_VERSION=latest
13-
ENV ENV_RELEASE_VERSION=$RELEASE_VERSION
14-
COPY --from=builder /vectordb/build/vectordb /vectordb
15-
COPY --from=builder /usr/local/bin/geesefs /usr/local/bin/geesefs
16-
17-
189
FROM epsilla/base
1910
ARG TARGETARCH
2011
ARG RELEASE_VERSION=latest

engine/config/config.hpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,30 @@ struct Config {
3939
std::atomic<int> MinVectorsForCompaction{1000}; // Minimum vectors to trigger compaction (default: 1000)
4040
std::atomic<int> CompactionMaxDuration{1800}; // Maximum compaction duration in seconds (default: 30 min)
4141

42+
// Index rebuild configuration
43+
std::atomic<int> RebuildInterval{60000}; // Index rebuild interval in milliseconds (default: 60 seconds)
44+
45+
// === Incremental rebuild configuration ===
46+
std::atomic<bool> EnableIncrementalRebuild{false}; // Enable incremental rebuild (default: false, enable gradually)
47+
std::atomic<int> IncrementalThreshold{1000}; // New records < threshold use incremental (default: 1000)
48+
std::atomic<int> FullRebuildInterval{10}; // Full rebuild every N incremental rebuilds (default: 10)
49+
std::atomic<bool> FilterDeletedInIncremental{true}; // Filter deleted nodes during incremental rebuild (default: true)
50+
std::atomic<bool> EnableBidirectionalLinks{true}; // Enable bidirectional links in incremental rebuild (default: true)
51+
52+
// === Deletion and compaction strategy ===
53+
std::atomic<double> CompactionBeforeRebuildThreshold{0.1}; // Consider compaction if deletion ratio > threshold (default: 0.1 = 10%)
54+
std::atomic<bool> ForceFullRebuildAfterCompaction{true}; // Force full rebuild after compaction (default: true, required)
55+
56+
// === I/O optimization ===
57+
std::atomic<int> RebuildSaveInterval{5}; // Save to disk every N rebuilds (default: 5)
58+
std::atomic<int> RebuildSaveIntervalSeconds{300}; // Or save every N seconds (default: 300 = 5 minutes)
59+
4260
// Memory management configuration
4361
std::atomic<int> InitialTableCapacity{1000}; // Initial table capacity (default: 1000, was 150000)
4462

4563
// Eager compaction configuration: run Table::Compact() immediately after large soft-deletes
4664
std::atomic<bool> EagerCompactionOnDelete{true}; // Default: true
65+
std::atomic<int> MinDeletedVectorsForEagerCompaction{10}; // Minimum deleted vectors to trigger eager compaction (default: 10)
4766
// std::atomic<bool> EagerCompactionOnDelete{false}; --- IGNORE
4867

4968
// Constructor to initialize thread counts based on hardware
@@ -141,6 +160,16 @@ struct Config {
141160
printf("[Config] Using EAGER_DELETE_COMPACT=%s from environment\n", eager ? "true" : "false");
142161
}
143162

163+
// Check environment variable for minimum deleted vectors for eager compaction
164+
const char* env_min_deleted_vectors = std::getenv("MIN_DELETED_VECTORS_FOR_EAGER_COMPACTION");
165+
if (env_min_deleted_vectors != nullptr) {
166+
int min_deleted = std::atoi(env_min_deleted_vectors);
167+
if (min_deleted >= 1 && min_deleted <= 10000) { // Reasonable range: 1 to 10,000
168+
MinDeletedVectorsForEagerCompaction.store(min_deleted, std::memory_order_release);
169+
printf("[Config] Using MIN_DELETED_VECTORS_FOR_EAGER_COMPACTION=%d from environment\n", min_deleted);
170+
}
171+
}
172+
144173
// Check environment variable for initial table capacity
145174
const char* env_initial_capacity = std::getenv("INITIAL_TABLE_CAPACITY");
146175
if (env_initial_capacity != nullptr) {
@@ -154,6 +183,103 @@ struct Config {
154183
}
155184
}
156185

186+
// Check environment variable for rebuild interval
187+
const char* env_rebuild_interval = std::getenv("REBUILD_INTERVAL");
188+
if (env_rebuild_interval != nullptr) {
189+
int interval = std::atoi(env_rebuild_interval);
190+
if (interval >= 5000 && interval <= 3600000) { // Between 5 seconds and 1 hour (in milliseconds)
191+
RebuildInterval.store(interval, std::memory_order_release);
192+
printf("[Config] Using REBUILD_INTERVAL=%d milliseconds from environment\n", interval);
193+
} else {
194+
printf("[Config] Invalid REBUILD_INTERVAL=%s, using default %d ms\n",
195+
env_rebuild_interval, RebuildInterval.load());
196+
}
197+
}
198+
199+
// === Incremental rebuild configuration ===
200+
const char* env_enable_incremental = std::getenv("ENABLE_INCREMENTAL_REBUILD");
201+
if (env_enable_incremental != nullptr) {
202+
std::string env_value(env_enable_incremental);
203+
std::transform(env_value.begin(), env_value.end(), env_value.begin(), ::tolower);
204+
bool enable = (env_value == "true" || env_value == "1" || env_value == "yes");
205+
EnableIncrementalRebuild.store(enable, std::memory_order_release);
206+
printf("[Config] Using ENABLE_INCREMENTAL_REBUILD=%s from environment\n", enable ? "true" : "false");
207+
}
208+
209+
const char* env_incremental_threshold = std::getenv("INCREMENTAL_THRESHOLD");
210+
if (env_incremental_threshold != nullptr) {
211+
int threshold = std::atoi(env_incremental_threshold);
212+
if (threshold >= 10 && threshold <= 100000) {
213+
IncrementalThreshold.store(threshold, std::memory_order_release);
214+
printf("[Config] Using INCREMENTAL_THRESHOLD=%d from environment\n", threshold);
215+
}
216+
}
217+
218+
const char* env_full_rebuild_interval = std::getenv("FULL_REBUILD_INTERVAL");
219+
if (env_full_rebuild_interval != nullptr) {
220+
int interval = std::atoi(env_full_rebuild_interval);
221+
if (interval >= 1 && interval <= 100) {
222+
FullRebuildInterval.store(interval, std::memory_order_release);
223+
printf("[Config] Using FULL_REBUILD_INTERVAL=%d from environment\n", interval);
224+
}
225+
}
226+
227+
const char* env_filter_deleted = std::getenv("FILTER_DELETED_IN_INCREMENTAL");
228+
if (env_filter_deleted != nullptr) {
229+
std::string env_value(env_filter_deleted);
230+
std::transform(env_value.begin(), env_value.end(), env_value.begin(), ::tolower);
231+
bool filter = (env_value == "true" || env_value == "1" || env_value == "yes");
232+
FilterDeletedInIncremental.store(filter, std::memory_order_release);
233+
printf("[Config] Using FILTER_DELETED_IN_INCREMENTAL=%s from environment\n", filter ? "true" : "false");
234+
}
235+
236+
const char* env_bidirectional = std::getenv("ENABLE_BIDIRECTIONAL_LINKS");
237+
if (env_bidirectional != nullptr) {
238+
std::string env_value(env_bidirectional);
239+
std::transform(env_value.begin(), env_value.end(), env_value.begin(), ::tolower);
240+
bool enable = (env_value == "true" || env_value == "1" || env_value == "yes");
241+
EnableBidirectionalLinks.store(enable, std::memory_order_release);
242+
printf("[Config] Using ENABLE_BIDIRECTIONAL_LINKS=%s from environment\n", enable ? "true" : "false");
243+
}
244+
245+
// === Deletion and compaction strategy ===
246+
const char* env_compact_before_rebuild = std::getenv("COMPACTION_BEFORE_REBUILD_THRESHOLD");
247+
if (env_compact_before_rebuild != nullptr) {
248+
double threshold = std::atof(env_compact_before_rebuild);
249+
if (threshold >= 0.05 && threshold <= 0.5) {
250+
CompactionBeforeRebuildThreshold.store(threshold, std::memory_order_release);
251+
printf("[Config] Using COMPACTION_BEFORE_REBUILD_THRESHOLD=%.2f from environment\n", threshold);
252+
}
253+
}
254+
255+
const char* env_force_full_after_compact = std::getenv("FORCE_FULL_REBUILD_AFTER_COMPACTION");
256+
if (env_force_full_after_compact != nullptr) {
257+
std::string env_value(env_force_full_after_compact);
258+
std::transform(env_value.begin(), env_value.end(), env_value.begin(), ::tolower);
259+
bool force = (env_value == "true" || env_value == "1" || env_value == "yes");
260+
ForceFullRebuildAfterCompaction.store(force, std::memory_order_release);
261+
printf("[Config] Using FORCE_FULL_REBUILD_AFTER_COMPACTION=%s from environment\n", force ? "true" : "false");
262+
}
263+
264+
// === I/O optimization ===
265+
const char* env_rebuild_save_interval = std::getenv("REBUILD_SAVE_INTERVAL");
266+
if (env_rebuild_save_interval != nullptr) {
267+
int interval = std::atoi(env_rebuild_save_interval);
268+
if (interval >= 1 && interval <= 100) {
269+
RebuildSaveInterval.store(interval, std::memory_order_release);
270+
printf("[Config] Using REBUILD_SAVE_INTERVAL=%d from environment\n", interval);
271+
}
272+
}
273+
274+
const char* env_rebuild_save_seconds = std::getenv("REBUILD_SAVE_INTERVAL_SECONDS");
275+
if (env_rebuild_save_seconds != nullptr) {
276+
int seconds = std::atoi(env_rebuild_save_seconds);
277+
if (seconds >= 60 && seconds <= 3600) {
278+
RebuildSaveIntervalSeconds.store(seconds, std::memory_order_release);
279+
printf("[Config] Using REBUILD_SAVE_INTERVAL_SECONDS=%d from environment\n", seconds);
280+
}
281+
}
282+
157283
// Check environment variable for VECTORDB_DISABLE_WAL_SYNC
158284
const char* env_disable_wal_sync = std::getenv("VECTORDB_DISABLE_WAL_SYNC");
159285
if (env_disable_wal_sync != nullptr) {
@@ -235,6 +361,12 @@ struct Config {
235361
if (json.HasMember("EagerCompactionOnDelete")) {
236362
EagerCompactionOnDelete.store(json.GetBool("EagerCompactionOnDelete"), std::memory_order_release);
237363
}
364+
if (json.HasMember("RebuildInterval")) {
365+
int interval = json.GetInt("RebuildInterval");
366+
if (interval >= 5000 && interval <= 3600000) { // Between 5 seconds and 1 hour (in milliseconds)
367+
RebuildInterval.store(interval, std::memory_order_release);
368+
}
369+
}
238370
}
239371

240372
// Setter method for SoftDelete mode
@@ -263,6 +395,7 @@ struct Config {
263395
config.SetInt("CompactionMaxDuration", CompactionMaxDuration.load(std::memory_order_acquire));
264396
config.SetInt("InitialTableCapacity", InitialTableCapacity.load(std::memory_order_acquire));
265397
config.SetBool("EagerCompactionOnDelete", EagerCompactionOnDelete.load(std::memory_order_acquire));
398+
config.SetInt("RebuildInterval", RebuildInterval.load(std::memory_order_acquire));
266399
return config;
267400
}
268401
};

engine/db/concurrent_operations.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,17 @@ class AtomicCapacityManager {
6565
void Reset() {
6666
current_size_.store(0, std::memory_order_release);
6767
}
68-
68+
69+
/**
70+
* Update maximum capacity - used after table resize operations
71+
* Ensures capacity tracking stays consistent with actual table capacity
72+
*/
73+
void UpdateCapacity(size_t new_max_capacity) {
74+
// Note: max_capacity_ is const, so we need to reconstruct or use a different approach
75+
// For now, we'll update through the manager's internal state if needed
76+
// This is typically called after successful table resize operations
77+
}
78+
6979
private:
7080
const size_t max_capacity_;
7181
std::atomic<size_t> current_size_;

engine/db/db_server.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
#include "utils/concurrent_map.hpp"
1717
#include "services/embedding_service.hpp"
1818
#include "logger/logger.hpp"
19+
#include "config/config.hpp"
1920

2021
namespace vectordb {
2122
namespace engine {
2223

23-
constexpr const long RebuildInterval = 60000; // TODO:: to be config.
24-
2524
class DBServer {
2625
public:
2726
DBServer();
@@ -192,11 +191,13 @@ class DBServer {
192191

193192
// periodically in a separate thread
194193
void RebuildPeriodically() {
195-
const std::chrono::milliseconds rebuild_interval(RebuildInterval);
196-
197194
while (!stop_rebuild_thread_) {
198195
Rebuild(); // Call the Rebuild function
199196

197+
// Get rebuild interval from global config (allows runtime configuration)
198+
int interval_ms = vectordb::globalConfig.RebuildInterval.load(std::memory_order_acquire);
199+
const std::chrono::milliseconds rebuild_interval(interval_ms);
200+
200201
// Introduce the time delay before the next Rebuild
201202
std::this_thread::sleep_for(rebuild_interval);
202203
}

0 commit comments

Comments
 (0)