Skip to content

Commit 1b3ce5d

Browse files
beinanBeinan Wang
andauthored
Add configurable scalar index on id column (#50)
## Summary - Add `IdIndexType` enum (None/BTree/ZoneMap) for configurable scalar index on the `id` column - BTree indices are maintained by MemWAL during writes; ZoneMap indices are excluded from MemWAL maintenance (unsupported by MemWAL) and rebuilt after compaction - Index is created eagerly on store open and ensured after each compaction cycle - Exposed via Python API as `id_index_type` parameter (`"btree"`, `"zonemap"`, or `None`) ## Test plan - [x] Rust tests: btree index creation on open, persistence after compaction - [x] Rust tests: zonemap index creation on open, persistence after compaction - [x] Rust tests: no index created by default (IdIndexType::None) - [x] Rust tests: idempotent ensure_id_index (no version bump on re-check) - [x] Python tests: btree/zonemap creation, invalid type rejection, data integrity across compaction cycles - [x] Full existing test suite passes (19/19) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Beinan Wang <beinanwang@microsoft.com>
1 parent 998ca18 commit 1b3ce5d

5 files changed

Lines changed: 312 additions & 6 deletions

File tree

crates/lance-context-core/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ mod store;
77

88
pub use context::{Context, ContextEntry, Snapshot};
99
pub use record::{ContextRecord, SearchResult, StateMetadata};
10-
pub use store::{CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions};
10+
pub use store::{
11+
CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType,
12+
};
1113

1214
// Re-export CompactionMetrics from lance for Python bindings
1315
pub use lance::dataset::optimize::CompactionMetrics;

crates/lance-context-core/src/store.rs

Lines changed: 204 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use lance::index::DatasetIndexExt;
2222
use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
2323
use lance::{Error as LanceError, Result as LanceResult};
2424
use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
25+
use lance_index::scalar::ScalarIndexParams;
26+
use lance_index::IndexType;
2527
use tokio::sync::Mutex;
2628
use tokio::task::JoinHandle;
2729
use tracing::{error, info, warn};
@@ -32,6 +34,7 @@ use crate::record::{ContextRecord, SearchResult, StateMetadata};
3234
/// Embedding length used for the semantic index column.
3335
const DEFAULT_EMBEDDING_DIM: i32 = 1536;
3436
const DEFAULT_SEARCH_LIMIT: usize = 10;
37+
const ID_INDEX_NAME: &str = "id_idx";
3538

3639
/// Configuration for background compaction.
3740
#[derive(Debug, Clone)]
@@ -72,6 +75,18 @@ impl Default for CompactionConfig {
7275
}
7376
}
7477

78+
/// Type of scalar index on the `id` column.
79+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80+
pub enum IdIndexType {
81+
/// No index on the id column.
82+
#[default]
83+
None,
84+
/// Zone-map index (min/max per fragment, lightweight).
85+
ZoneMap,
86+
/// B-tree index (point lookups, heavier).
87+
BTree,
88+
}
89+
7590
/// Statistics about compaction status and history.
7691
#[derive(Debug, Clone)]
7792
pub struct CompactionStats {
@@ -106,6 +121,7 @@ pub struct ContextStore {
106121
compaction_state: Arc<Mutex<CompactionState>>,
107122
pub compaction_config: CompactionConfig,
108123
blob_columns: HashSet<String>,
124+
id_index_type: IdIndexType,
109125
}
110126

111127
/// Additional configuration when opening a [`ContextStore`].
@@ -116,6 +132,8 @@ pub struct ContextStoreOptions {
116132
/// Column names that should use Lance V1 blob encoding.
117133
/// Valid values: `"text_payload"`, `"binary_payload"`.
118134
pub blob_columns: HashSet<String>,
135+
/// Type of scalar index to create on the `id` column.
136+
pub id_index_type: IdIndexType,
119137
}
120138

121139
impl ContextStoreOptions {
@@ -164,8 +182,12 @@ impl ContextStore {
164182
})),
165183
compaction_config: options.compaction,
166184
blob_columns,
185+
id_index_type: options.id_index_type,
167186
};
168187

188+
// Ensure id index if configured
189+
store.ensure_id_index().await?;
190+
169191
// Start background compaction if enabled
170192
store.start_background_compaction().await?;
171193

@@ -192,8 +214,14 @@ impl ContextStore {
192214
let has_mem_wal = indices.iter().any(|i| i.name == MEM_WAL_INDEX_NAME);
193215

194216
if !has_mem_wal {
195-
let maintained_indexes: Vec<String> =
196-
indices.iter().map(|i| i.name.clone()).collect();
217+
// ZoneMap indices are not supported by MemWAL; exclude them
218+
let maintained_indexes: Vec<String> = indices
219+
.iter()
220+
.filter(|i| {
221+
!(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)
222+
})
223+
.map(|i| i.name.clone())
224+
.collect();
197225
self.dataset
198226
.initialize_mem_wal()
199227
.unsharded()
@@ -348,6 +376,7 @@ impl ContextStore {
348376
state.last_compaction = Some(Utc::now());
349377
state.total_compactions += 1;
350378
state.last_error = None;
379+
drop(state); // Release lock before ensure_id_index
351380

352381
info!(
353382
"Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)",
@@ -361,6 +390,12 @@ impl ContextStore {
361390
// Reload dataset to see new version
362391
self.dataset = Dataset::open(self.dataset.uri()).await?;
363392

393+
// Ensure id index exists after compaction
394+
// (handles first-time creation on previously empty dataset)
395+
if let Err(e) = self.ensure_id_index().await {
396+
warn!("Failed to ensure id index after compaction: {}", e);
397+
}
398+
364399
Ok(metrics)
365400
}
366401
Err(e) => {
@@ -408,6 +443,44 @@ impl ContextStore {
408443
})
409444
}
410445

446+
/// Ensure the configured id index exists on the dataset.
447+
async fn ensure_id_index(&mut self) -> LanceResult<()> {
448+
if self.id_index_type == IdIndexType::None {
449+
return Ok(());
450+
}
451+
452+
let indices = self.dataset.load_indices().await?;
453+
if indices.iter().any(|i| i.name == ID_INDEX_NAME) {
454+
return Ok(());
455+
}
456+
457+
self.create_id_index().await
458+
}
459+
460+
/// Create (or replace) the scalar index on the `id` column.
461+
pub async fn create_id_index(&mut self) -> LanceResult<()> {
462+
let index_type = match self.id_index_type {
463+
IdIndexType::ZoneMap => IndexType::ZoneMap,
464+
IdIndexType::BTree => IndexType::BTree,
465+
IdIndexType::None => return Ok(()),
466+
};
467+
468+
info!("Creating {:?} index on id column", index_type);
469+
470+
let params = ScalarIndexParams::default();
471+
472+
self.dataset
473+
.create_index_builder(&["id"], index_type, &params)
474+
.name(ID_INDEX_NAME.to_string())
475+
.replace(true)
476+
.await?;
477+
478+
// Reload dataset to pick up new index
479+
self.dataset = Dataset::open(self.dataset.uri()).await?;
480+
481+
Ok(())
482+
}
483+
411484
/// Start background compaction task if enabled.
412485
async fn start_background_compaction(&mut self) -> LanceResult<()> {
413486
if !self.compaction_config.enabled {
@@ -1379,4 +1452,133 @@ mod tests {
13791452
assert_eq!(results_binary[0].text_payload, record.text_payload);
13801453
});
13811454
}
1455+
1456+
#[test]
1457+
fn test_id_index_btree() {
1458+
let dir = TempDir::new().unwrap();
1459+
let uri = dir.path().to_string_lossy().to_string();
1460+
let runtime = tokio::runtime::Runtime::new().unwrap();
1461+
1462+
runtime.block_on(async {
1463+
let options = ContextStoreOptions {
1464+
id_index_type: IdIndexType::BTree,
1465+
..Default::default()
1466+
};
1467+
let mut store = ContextStore::open_with_options(&uri, options)
1468+
.await
1469+
.unwrap();
1470+
1471+
// Index should be created eagerly on open
1472+
let indices = store.dataset.load_indices().await.unwrap();
1473+
assert!(
1474+
indices.iter().any(|i| i.name == ID_INDEX_NAME),
1475+
"btree index should be created on open"
1476+
);
1477+
1478+
// Add data and verify it still works with the index
1479+
for i in 0..5 {
1480+
store
1481+
.add(&[text_record(&format!("btree-{i}"), i as f32)])
1482+
.await
1483+
.unwrap();
1484+
}
1485+
store.compact(None).await.unwrap();
1486+
1487+
// Index should still exist after compaction
1488+
let indices = store.dataset.load_indices().await.unwrap();
1489+
assert!(
1490+
indices.iter().any(|i| i.name == ID_INDEX_NAME),
1491+
"btree index should persist after compaction"
1492+
);
1493+
});
1494+
}
1495+
1496+
#[test]
1497+
fn test_id_index_zonemap() {
1498+
let dir = TempDir::new().unwrap();
1499+
let uri = dir.path().to_string_lossy().to_string();
1500+
let runtime = tokio::runtime::Runtime::new().unwrap();
1501+
1502+
runtime.block_on(async {
1503+
let options = ContextStoreOptions {
1504+
id_index_type: IdIndexType::ZoneMap,
1505+
..Default::default()
1506+
};
1507+
let mut store = ContextStore::open_with_options(&uri, options)
1508+
.await
1509+
.unwrap();
1510+
1511+
// Index should be created eagerly on open
1512+
let indices = store.dataset.load_indices().await.unwrap();
1513+
assert!(
1514+
indices.iter().any(|i| i.name == ID_INDEX_NAME),
1515+
"zonemap index should be created on open"
1516+
);
1517+
1518+
for i in 0..5 {
1519+
store
1520+
.add(&[text_record(&format!("zm-{i}"), i as f32)])
1521+
.await
1522+
.unwrap();
1523+
}
1524+
store.compact(None).await.unwrap();
1525+
1526+
let indices = store.dataset.load_indices().await.unwrap();
1527+
assert!(
1528+
indices.iter().any(|i| i.name == ID_INDEX_NAME),
1529+
"zonemap index should persist after compaction"
1530+
);
1531+
});
1532+
}
1533+
1534+
#[test]
1535+
fn test_id_index_none_by_default() {
1536+
let dir = TempDir::new().unwrap();
1537+
let uri = dir.path().to_string_lossy().to_string();
1538+
let runtime = tokio::runtime::Runtime::new().unwrap();
1539+
1540+
runtime.block_on(async {
1541+
let mut store = ContextStore::open(&uri).await.unwrap();
1542+
1543+
store.add(&[text_record("no-idx-1", 0.0)]).await.unwrap();
1544+
store.compact(None).await.unwrap();
1545+
1546+
let indices = store.dataset.load_indices().await.unwrap();
1547+
assert!(
1548+
!indices.iter().any(|i| i.name == ID_INDEX_NAME),
1549+
"no id index should be created when IdIndexType::None"
1550+
);
1551+
});
1552+
}
1553+
1554+
#[test]
1555+
fn test_id_index_idempotent() {
1556+
let dir = TempDir::new().unwrap();
1557+
let uri = dir.path().to_string_lossy().to_string();
1558+
let runtime = tokio::runtime::Runtime::new().unwrap();
1559+
1560+
runtime.block_on(async {
1561+
let options = ContextStoreOptions {
1562+
id_index_type: IdIndexType::BTree,
1563+
..Default::default()
1564+
};
1565+
let mut store = ContextStore::open_with_options(&uri, options)
1566+
.await
1567+
.unwrap();
1568+
1569+
for i in 0..5 {
1570+
store
1571+
.add(&[text_record(&format!("idem-{i}"), i as f32)])
1572+
.await
1573+
.unwrap();
1574+
}
1575+
1576+
// Create index twice -- second call should be a no-op
1577+
store.create_id_index().await.unwrap();
1578+
let v1 = store.version();
1579+
store.ensure_id_index().await.unwrap();
1580+
let v2 = store.version();
1581+
assert_eq!(v1, v2, "ensure_id_index should not recreate existing index");
1582+
});
1583+
}
13821584
}

python/python/lance_context/api.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ def __init__(
263263
compaction_min_fragments: int = 5,
264264
compaction_target_rows: int = 1_000_000,
265265
quiet_hours: list[tuple[int, int]] | None = None,
266+
id_index_type: str | None = None,
266267
) -> None:
267268
options = _merge_storage_options(
268269
storage_options,
@@ -282,11 +283,12 @@ def __init__(
282283
"quiet_hours": quiet_hours or [],
283284
}
284285

285-
if options or compaction_config["enabled"]:
286+
if options or compaction_config["enabled"] or id_index_type:
286287
self._inner = _Context.create(
287288
uri,
288289
storage_options=options or None,
289290
compaction_config=compaction_config,
291+
id_index_type=id_index_type,
290292
)
291293
else:
292294
self._inner = _Context.create(uri)
@@ -308,6 +310,7 @@ def create(
308310
compaction_min_fragments: int = 5,
309311
compaction_target_rows: int = 1_000_000,
310312
quiet_hours: list[tuple[int, int]] | None = None,
313+
id_index_type: str | None = None,
311314
) -> Context:
312315
return cls(
313316
uri,
@@ -323,6 +326,7 @@ def create(
323326
compaction_min_fragments=compaction_min_fragments,
324327
compaction_target_rows=compaction_target_rows,
325328
quiet_hours=quiet_hours,
329+
id_index_type=id_index_type,
326330
)
327331

328332
def uri(self) -> str:

python/src/lib.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio::runtime::Runtime;
1111
use lance_context::serde::CONTENT_TYPE_TEXT;
1212
use lance_context::{
1313
CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord,
14-
ContextStore, ContextStoreOptions, SearchResult,
14+
ContextStore, ContextStoreOptions, IdIndexType, SearchResult,
1515
};
1616

1717
const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream";
@@ -110,23 +110,37 @@ fn compaction_config_from_dict<'py>(
110110
#[pymethods]
111111
impl Context {
112112
#[classmethod]
113-
#[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None))]
113+
#[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None, id_index_type=None))]
114114
fn create(
115115
_cls: &Bound<'_, PyType>,
116116
py: Python<'_>,
117117
uri: &str,
118118
storage_options: Option<&Bound<'_, PyDict>>,
119119
compaction_config: Option<&Bound<'_, PyDict>>,
120120
blob_columns: Option<Vec<String>>,
121+
id_index_type: Option<String>,
121122
) -> PyResult<Self> {
122123
let runtime = Arc::new(Runtime::new().map_err(to_py_err)?);
123124

124125
let blob_set: HashSet<String> = blob_columns.unwrap_or_default().into_iter().collect();
125126

127+
let id_idx = match id_index_type.as_deref() {
128+
Some("btree") => IdIndexType::BTree,
129+
Some("zonemap") => IdIndexType::ZoneMap,
130+
Some("none") | None => IdIndexType::None,
131+
Some(other) => {
132+
return Err(PyRuntimeError::new_err(format!(
133+
"invalid id_index_type '{}': valid values are 'btree', 'zonemap'",
134+
other
135+
)))
136+
}
137+
};
138+
126139
let options = ContextStoreOptions {
127140
storage_options: storage_options_from_dict(storage_options)?,
128141
compaction: compaction_config_from_dict(compaction_config)?,
129142
blob_columns: blob_set,
143+
id_index_type: id_idx,
130144
};
131145

132146
let store_res =

0 commit comments

Comments
 (0)