Skip to content

Commit 9409651

Browse files
authored
feat: Implement full Lance maintenance SQL surface and internal-only maintenance APIs (#155)
* Implement full Lance maintenance SQL surface and tests * Fix flaky maintenance auto cleanup sqllogictest baseline * Add Arrow schema compatibility across DuckDB signatures * Format Arrow schema compatibility changes * Revert "Format Arrow schema compatibility changes" This reverts commit 1e23bc9. * Revert "Add Arrow schema compatibility across DuckDB signatures" This reverts commit 7f49ffd.
1 parent 91b6109 commit 9409651

16 files changed

Lines changed: 2297 additions & 32 deletions

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ set(EXTENSION_SOURCES src/lance_extension.cpp src/lance_scan.cpp
1717
src/lance_logical_exec.cpp
1818
src/lance_storage.cpp
1919
src/lance_metadata.cpp
20+
src/lance_maintenance.cpp
2021
src/lance_replacement.cpp
2122
src/lance_insert.cpp
2223
src/lance_merge.cpp

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ chrono = "0.4"
4545
futures = "0.3"
4646
roaring = "0.10.12"
4747
serde_json = "1.0"
48+
serde = { version = "1.0", features = ["derive"] }
4849
snafu = "0.8.6"
4950
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
5051
rand = "0.8"

docs/sql.md

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ USING INVERTED;
374374

375375
Notes:
376376
- `CREATE INDEX` currently supports a single column.
377-
- Use `REINDEX ... RETRAIN` to retrain an untrained index (if created with training disabled).
378377

379378
### `SHOW INDEXES`
380379

@@ -387,3 +386,96 @@ SHOW INDEXES ON 'path/to/dataset.lance';
387386
```sql
388387
DROP INDEX vec_idx ON 'path/to/dataset.lance';
389388
```
389+
390+
### `ALTER INDEX ... OPTIMIZE`
391+
392+
```sql
393+
ALTER INDEX vec_idx
394+
ON 'path/to/dataset.lance'
395+
OPTIMIZE WITH (mode = 'append');
396+
```
397+
398+
Supported `mode` values:
399+
- `append`
400+
- `merge` (`num_indices_to_merge` is supported)
401+
- `retrain`
402+
403+
## Maintenance SQL
404+
405+
Maintenance statements accept either:
406+
- A dataset path string literal (for example `'path/to/dataset.lance'`)
407+
- An attached Lance table name (for example `ns.main.my_table`)
408+
409+
### `OPTIMIZE`
410+
411+
```sql
412+
OPTIMIZE 'path/to/dataset.lance' WITH (
413+
target_rows_per_fragment = 1048576,
414+
max_rows_per_group = 1024,
415+
max_bytes_per_file = 0,
416+
materialize_deletions = true,
417+
materialize_deletions_threshold = 0.1,
418+
num_threads = 0,
419+
batch_size = 0,
420+
defer_index_remap = false
421+
);
422+
```
423+
424+
Returns:
425+
- `Operation` (`compact`)
426+
- `Target`
427+
- `MetricsJSON` (compaction metrics)
428+
429+
### `VACUUM LANCE`
430+
431+
```sql
432+
VACUUM LANCE 'path/to/dataset.lance' WITH (
433+
older_than_seconds = 1209600,
434+
delete_unverified = false,
435+
error_if_tagged_old_versions = true,
436+
retain_n_versions = 3
437+
);
438+
```
439+
440+
Returns:
441+
- `Operation` (`cleanup`)
442+
- `Target`
443+
- `MetricsJSON` (cleanup metrics such as removed bytes / versions)
444+
445+
### `ALTER INDEX ... OPTIMIZE`
446+
447+
```sql
448+
ALTER INDEX vec_idx
449+
ON 'path/to/dataset.lance'
450+
OPTIMIZE WITH (
451+
mode = 'merge',
452+
num_indices_to_merge = 4
453+
);
454+
```
455+
456+
Supported `mode` values:
457+
- `append`
458+
- `merge` (`num_indices_to_merge` is supported)
459+
- `retrain`
460+
461+
Returns:
462+
- `Operation` (`optimize_index`)
463+
- `Target`
464+
- `MetricsJSON`
465+
466+
### Auto cleanup configuration
467+
468+
```sql
469+
ALTER TABLE 'path/to/dataset.lance'
470+
SET AUTO_CLEANUP WITH (interval = 1, older_than = '1h', retain_versions = 3);
471+
472+
ALTER TABLE 'path/to/dataset.lance' UNSET AUTO_CLEANUP;
473+
474+
SHOW MAINTENANCE ON 'path/to/dataset.lance';
475+
```
476+
477+
`SHOW MAINTENANCE` returns key/value rows including:
478+
- `enabled`
479+
- `interval` (if configured)
480+
- `older_than` (if configured)
481+
- `retain_versions` (if configured)

rust/ffi/index.rs

Lines changed: 165 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::ffi::{c_char, c_void};
1+
use std::ffi::{c_char, c_void, CStr};
22
use std::ptr;
33
use std::sync::Arc;
44

@@ -12,12 +12,39 @@ use lance_index::vector::hnsw::builder::HnswBuildParams;
1212
use lance_index::vector::pq::PQBuildParams;
1313
use lance_index::{DatasetIndexExt, IndexType};
1414
use lance_linalg::distance::DistanceType;
15+
use serde::{Deserialize, Serialize};
1516

1617
use crate::error::{clear_last_error, set_last_error, ErrorCode};
1718
use crate::runtime;
1819

1920
use super::types::{SchemaHandle, StreamHandle};
20-
use super::util::{cstr_to_str, dataset_handle, FfiError, FfiResult};
21+
use super::util::{cstr_to_str, dataset_handle, to_c_string, FfiError, FfiResult};
22+
23+
#[derive(Debug, Deserialize)]
24+
#[serde(default, deny_unknown_fields)]
25+
struct OptimizeIndexOptionsInput {
26+
mode: Option<String>,
27+
retrain: Option<bool>,
28+
num_indices_to_merge: Option<usize>,
29+
}
30+
31+
impl Default for OptimizeIndexOptionsInput {
32+
fn default() -> Self {
33+
Self {
34+
mode: None,
35+
retrain: None,
36+
num_indices_to_merge: None,
37+
}
38+
}
39+
}
40+
41+
#[derive(Debug, Serialize)]
42+
struct OptimizeIndexMetricsOutput {
43+
index_name: String,
44+
mode: String,
45+
retrain: bool,
46+
num_indices_to_merge: Option<usize>,
47+
}
2148

2249
#[no_mangle]
2350
pub unsafe extern "C" fn lance_get_index_list_schema(dataset: *mut c_void) -> *mut c_void {
@@ -340,7 +367,13 @@ pub unsafe extern "C" fn lance_dataset_optimize_index(
340367
index_name: *const c_char,
341368
retrain: u8,
342369
) -> i32 {
343-
match dataset_optimize_index_inner(dataset, index_name, retrain) {
370+
match dataset_optimize_index_with_options_inner(
371+
dataset,
372+
index_name,
373+
ptr::null(),
374+
ptr::null_mut(),
375+
retrain != 0,
376+
) {
344377
Ok(()) => {
345378
clear_last_error();
346379
0
@@ -352,10 +385,112 @@ pub unsafe extern "C" fn lance_dataset_optimize_index(
352385
}
353386
}
354387

355-
fn dataset_optimize_index_inner(
388+
#[no_mangle]
389+
pub unsafe extern "C" fn lance_dataset_optimize_index_with_options(
356390
dataset: *mut c_void,
357391
index_name: *const c_char,
358-
retrain: u8,
392+
options_json: *const c_char,
393+
out_metrics_json: *mut *const c_char,
394+
) -> i32 {
395+
if !out_metrics_json.is_null() {
396+
unsafe {
397+
ptr::write_unaligned(out_metrics_json, ptr::null());
398+
}
399+
}
400+
match dataset_optimize_index_with_options_inner(
401+
dataset,
402+
index_name,
403+
options_json,
404+
out_metrics_json,
405+
false,
406+
) {
407+
Ok(()) => {
408+
clear_last_error();
409+
0
410+
}
411+
Err(err) => {
412+
set_last_error(err.code, err.message);
413+
-1
414+
}
415+
}
416+
}
417+
418+
fn parse_optimize_index_options_json(
419+
options_json: *const c_char,
420+
legacy_retrain: bool,
421+
) -> FfiResult<(OptimizeOptions, String, bool, Option<usize>)> {
422+
let input = if options_json.is_null() {
423+
OptimizeIndexOptionsInput::default()
424+
} else {
425+
let text = unsafe { CStr::from_ptr(options_json) }
426+
.to_str()
427+
.map_err(|err| FfiError::new(ErrorCode::Utf8, format!("options_json utf8: {err}")))?;
428+
if text.trim().is_empty() {
429+
OptimizeIndexOptionsInput::default()
430+
} else {
431+
serde_json::from_str(text).map_err(|err| {
432+
FfiError::new(
433+
ErrorCode::InvalidArgument,
434+
format!("optimize_index options_json parse: {err}"),
435+
)
436+
})?
437+
}
438+
};
439+
440+
let mode = if let Some(mode) = input.mode.as_ref() {
441+
mode.trim().to_ascii_lowercase()
442+
} else if input.retrain.unwrap_or(false) || legacy_retrain {
443+
String::from("retrain")
444+
} else {
445+
String::from("append")
446+
};
447+
448+
let (options, retrain, num_indices_to_merge) = match mode.as_str() {
449+
"append" => {
450+
if input.num_indices_to_merge.is_some() {
451+
return Err(FfiError::new(
452+
ErrorCode::InvalidArgument,
453+
"num_indices_to_merge is only valid for mode='merge'",
454+
));
455+
}
456+
(OptimizeOptions::append(), false, Some(0))
457+
}
458+
"merge" => {
459+
let num = input.num_indices_to_merge.unwrap_or(1);
460+
if num == 0 {
461+
return Err(FfiError::new(
462+
ErrorCode::InvalidArgument,
463+
"num_indices_to_merge must be > 0 for mode='merge'",
464+
));
465+
}
466+
(OptimizeOptions::merge(num), false, Some(num))
467+
}
468+
"retrain" => {
469+
if input.num_indices_to_merge.is_some() {
470+
return Err(FfiError::new(
471+
ErrorCode::InvalidArgument,
472+
"num_indices_to_merge is invalid for mode='retrain'",
473+
));
474+
}
475+
(OptimizeOptions::retrain(), true, None)
476+
}
477+
other => {
478+
return Err(FfiError::new(
479+
ErrorCode::InvalidArgument,
480+
format!("unsupported optimize mode: {other}"),
481+
))
482+
}
483+
};
484+
485+
Ok((options, mode, retrain, num_indices_to_merge))
486+
}
487+
488+
fn dataset_optimize_index_with_options_inner(
489+
dataset: *mut c_void,
490+
index_name: *const c_char,
491+
options_json: *const c_char,
492+
out_metrics_json: *mut *const c_char,
493+
legacy_retrain: bool,
359494
) -> FfiResult<()> {
360495
let handle = unsafe { dataset_handle(dataset)? };
361496
let index_name = unsafe { cstr_to_str(index_name, "index_name")? };
@@ -365,25 +500,41 @@ fn dataset_optimize_index_inner(
365500
"index_name must be non-empty",
366501
));
367502
}
503+
let index_name_owned = index_name.to_string();
368504

369-
let mut options = if retrain != 0 {
370-
OptimizeOptions::retrain()
371-
} else {
372-
OptimizeOptions::append()
373-
};
374-
options = options.index_names(vec![index_name.to_string()]);
505+
let (mut options, mode, retrain, num_indices_to_merge) =
506+
parse_optimize_index_options_json(options_json, legacy_retrain)?;
507+
options = options.index_names(vec![index_name_owned.clone()]);
375508

376509
let mut ds: Dataset = handle.dataset.as_ref().clone();
377-
run_with_large_stack(move || {
510+
let metrics = run_with_large_stack(move || {
378511
match runtime::block_on(async { ds.optimize_indices(&options).await }) {
379-
Ok(Ok(())) => Ok(()),
512+
Ok(Ok(())) => Ok(OptimizeIndexMetricsOutput {
513+
index_name: index_name_owned,
514+
mode,
515+
retrain,
516+
num_indices_to_merge,
517+
}),
380518
Ok(Err(err)) => Err(FfiError::new(
381519
ErrorCode::DatasetOptimizeIndices,
382520
format!("dataset optimize_indices: {err}"),
383521
)),
384522
Err(err) => Err(FfiError::new(ErrorCode::Runtime, format!("runtime: {err}"))),
385523
}
386-
})?
524+
})??;
525+
526+
if !out_metrics_json.is_null() {
527+
let payload = serde_json::to_string(&metrics).map_err(|err| {
528+
FfiError::new(
529+
ErrorCode::DatasetOptimizeIndices,
530+
format!("optimize_index metrics_json serialize: {err}"),
531+
)
532+
})?;
533+
unsafe {
534+
ptr::write_unaligned(out_metrics_json, to_c_string(payload).into_raw() as *const c_char);
535+
}
536+
}
537+
Ok(())
387538
}
388539

389540
fn normalize_index_type(index_type: &str) -> String {

0 commit comments

Comments
 (0)