Skip to content

Commit 783c64f

Browse files
dcfocusAllen Cheng
andauthored
feat: support external record identifiers (#57)
## Summary - Add a nullable `external_id` field to context records for caller-owned stable identifiers. - Expose `Context.add(..., external_id=...)` and `Context.get(id=...)` / `Context.get(external_id=...)` in Python. - Reject duplicate internal ids and duplicate external ids with explicit errors. - Add Rust/Python coverage plus README examples. Closes #52 ## Testing - `python3 -m py_compile python/python/lance_context/api.py python/tests/test_search.py python/tests/test_external_id.py` - `git diff --check` - `./.codex/skills/ci-pr-helper/scripts/run_ci_checks.sh` failed locally: `cargo not found; install Rust to run Rust checks.` - Full `cargo`, `ruff`, and `pytest` checks were not run because those tools are not installed in this shell. --------- Co-authored-by: Allen Cheng <allen.cheng@uber.com>
1 parent e62de53 commit 783c64f

7 files changed

Lines changed: 405 additions & 40 deletions

File tree

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ uri = Path("context.lance").as_posix()
5454
ctx = Context.create(uri)
5555

5656
# Add multimodal entries
57-
ctx.add("user", "Where should I travel in spring?")
57+
ctx.add(
58+
"user",
59+
"Where should I travel in spring?",
60+
external_id="conversation-2026-03-01#turn-1",
61+
)
62+
print(ctx.get(external_id="conversation-2026-03-01#turn-1"))
5863

5964
from PIL import Image
6065
image = Image.new("RGB", (2, 2), color="teal")
@@ -138,6 +143,7 @@ use chrono::Utc;
138143
let mut store = ContextStore::open("context.lance").await?;
139144
let record = ContextRecord {
140145
id: "run-1-1".into(),
146+
external_id: None,
141147
run_id: "run-1".into(),
142148
created_at: Utc::now(),
143149
role: "user".into(),

crates/lance-context-core/src/record.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct StateMetadata {
1313
#[derive(Debug, Clone)]
1414
pub struct ContextRecord {
1515
pub id: String,
16+
pub external_id: Option<String>,
1617
pub run_id: String,
1718
pub bot_id: Option<String>,
1819
pub session_id: Option<String>,

crates/lance-context-core/src/store.rs

Lines changed: 175 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ impl ContextStore {
203203
return Ok(self.dataset.manifest.version);
204204
}
205205

206+
self.validate_unique_ids(entries).await?;
207+
206208
// Group entries by (bot_id, session_id)
207209
let mut groups: HashMap<(Option<String>, Option<String>), Vec<ContextRecord>> =
208210
HashMap::new();
@@ -250,6 +252,50 @@ impl ContextStore {
250252
Ok(self.dataset.manifest.version)
251253
}
252254

255+
async fn validate_unique_ids(&self, entries: &[ContextRecord]) -> LanceResult<()> {
256+
let mut ids = HashSet::new();
257+
let mut external_ids = HashSet::new();
258+
for entry in entries {
259+
if !ids.insert(entry.id.as_str()) {
260+
return Err(ArrowError::InvalidArgumentError(format!(
261+
"duplicate id '{}' in batch",
262+
entry.id
263+
))
264+
.into());
265+
}
266+
if let Some(external_id) = &entry.external_id {
267+
if !external_ids.insert(external_id.as_str()) {
268+
return Err(ArrowError::InvalidArgumentError(format!(
269+
"duplicate external_id '{}' in batch",
270+
external_id
271+
))
272+
.into());
273+
}
274+
}
275+
}
276+
277+
for record in self.list(None, None).await? {
278+
if ids.contains(record.id.as_str()) {
279+
return Err(ArrowError::InvalidArgumentError(format!(
280+
"id '{}' already exists",
281+
record.id
282+
))
283+
.into());
284+
}
285+
if let Some(external_id) = record.external_id {
286+
if external_ids.contains(external_id.as_str()) {
287+
return Err(ArrowError::InvalidArgumentError(format!(
288+
"external_id '{}' already exists",
289+
external_id
290+
))
291+
.into());
292+
}
293+
}
294+
}
295+
296+
Ok(())
297+
}
298+
253299
fn derive_region_id(bot_id: &Option<String>, session_id: &Option<String>) -> Uuid {
254300
let mut input = String::new();
255301

@@ -299,6 +345,27 @@ impl ContextStore {
299345
Ok(results)
300346
}
301347

348+
/// Find a record by its internal storage id.
349+
pub async fn get_by_id(&self, id: &str) -> LanceResult<Option<ContextRecord>> {
350+
Ok(self
351+
.list(None, None)
352+
.await?
353+
.into_iter()
354+
.find(|record| record.id == id))
355+
}
356+
357+
/// Find a record by its caller-supplied external id.
358+
pub async fn get_by_external_id(
359+
&self,
360+
external_id: &str,
361+
) -> LanceResult<Option<ContextRecord>> {
362+
Ok(self
363+
.list(None, None)
364+
.await?
365+
.into_iter()
366+
.find(|record| record.external_id.as_deref() == Some(external_id)))
367+
}
368+
302369
/// Perform a nearest-neighbor search over stored embeddings.
303370
pub async fn search(
304371
&self,
@@ -583,6 +650,10 @@ impl ContextStore {
583650
/// Lance V1 blob encoding (out-of-line binary buffers). For `text_payload`,
584651
/// this also changes the Arrow type from `LargeUtf8` to `LargeBinary`.
585652
pub fn schema(blob_columns: &HashSet<String>) -> Schema {
653+
Self::schema_with_options(blob_columns, true)
654+
}
655+
656+
fn schema_with_options(blob_columns: &HashSet<String>, include_external_id: bool) -> Schema {
586657
let mut id_metadata = HashMap::new();
587658
id_metadata.insert(
588659
"lance-schema:unenforced-primary-key".to_string(),
@@ -605,8 +676,11 @@ impl ContextStore {
605676
Field::new("binary_payload", DataType::LargeBinary, true)
606677
};
607678

608-
Schema::new(vec![
609-
Field::new("id", DataType::Utf8, false).with_metadata(id_metadata),
679+
let mut fields = vec![Field::new("id", DataType::Utf8, false).with_metadata(id_metadata)];
680+
if include_external_id {
681+
fields.push(Field::new("external_id", DataType::Utf8, true));
682+
}
683+
fields.extend([
610684
Field::new("run_id", DataType::Utf8, false),
611685
Field::new("bot_id", DataType::Utf8, true),
612686
Field::new("session_id", DataType::Utf8, true),
@@ -644,7 +718,9 @@ impl ContextStore {
644718
),
645719
true,
646720
),
647-
])
721+
]);
722+
723+
Schema::new(fields)
648724
}
649725

650726
async fn load_with_options(
@@ -692,7 +768,22 @@ impl ContextStore {
692768
}
693769

694770
fn records_to_batch(&self, entries: &[ContextRecord]) -> LanceResult<RecordBatch> {
771+
let include_external_id = self
772+
.dataset
773+
.schema()
774+
.field_paths()
775+
.iter()
776+
.any(|path| path == "external_id");
777+
if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) {
778+
return Err(ArrowError::InvalidArgumentError(
779+
"external_id requires a context dataset created with external_id support"
780+
.to_string(),
781+
)
782+
.into());
783+
}
784+
695785
let mut id_builder = StringBuilder::new();
786+
let mut external_id_builder = StringBuilder::new();
696787
let mut run_id_builder = StringBuilder::new();
697788
let mut bot_id_builder = StringBuilder::new();
698789
let mut session_id_builder = StringBuilder::new();
@@ -734,6 +825,7 @@ impl ContextStore {
734825

735826
for entry in entries {
736827
id_builder.append_value(&entry.id);
828+
external_id_builder.append_option(entry.external_id.as_deref());
737829
run_id_builder.append_value(&entry.run_id);
738830
bot_id_builder.append_option(entry.bot_id.as_deref());
739831
session_id_builder.append_option(entry.session_id.as_deref());
@@ -826,6 +918,7 @@ impl ContextStore {
826918
}
827919

828920
let id_array: ArrayRef = Arc::new(id_builder.finish());
921+
let external_id_array: ArrayRef = Arc::new(external_id_builder.finish());
829922
let run_id_array: ArrayRef = Arc::new(run_id_builder.finish());
830923
let bot_id_array: ArrayRef = Arc::new(bot_id_builder.finish());
831924
let session_id_array: ArrayRef = Arc::new(session_id_builder.finish());
@@ -841,23 +934,27 @@ impl ContextStore {
841934
let state_array: ArrayRef = Arc::new(state_builder.finish());
842935
let embedding_array: ArrayRef = Arc::new(embedding_builder.finish());
843936

844-
let schema = Arc::new(Self::schema(&self.blob_columns));
845-
let batch = RecordBatch::try_new(
846-
schema,
847-
vec![
848-
id_array,
849-
run_id_array,
850-
bot_id_array,
851-
session_id_array,
852-
created_at_array,
853-
role_array,
854-
state_array,
855-
content_type_array,
856-
text_array,
857-
binary_array,
858-
embedding_array,
859-
],
860-
)?;
937+
let schema = Arc::new(Self::schema_with_options(
938+
&self.blob_columns,
939+
include_external_id,
940+
));
941+
let mut arrays = vec![id_array];
942+
if include_external_id {
943+
arrays.push(external_id_array);
944+
}
945+
arrays.extend([
946+
run_id_array,
947+
bot_id_array,
948+
session_id_array,
949+
created_at_array,
950+
role_array,
951+
state_array,
952+
content_type_array,
953+
text_array,
954+
binary_array,
955+
embedding_array,
956+
]);
957+
let batch = RecordBatch::try_new(schema, arrays)?;
861958

862959
Ok(batch)
863960
}
@@ -877,6 +974,7 @@ impl Drop for ContextStore {
877974
/// Convert a record batch to context records.
878975
fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
879976
let id_array = column_as::<StringArray>(batch, "id")?;
977+
let external_id_array = column_as_optional::<StringArray>(batch, "external_id");
880978
let run_id_array = column_as::<StringArray>(batch, "run_id")?;
881979
let bot_id_array = column_as_optional::<StringArray>(batch, "bot_id");
882980
let session_id_array = column_as_optional::<StringArray>(batch, "session_id");
@@ -1046,6 +1144,13 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
10461144

10471145
results.push(ContextRecord {
10481146
id: id_array.value(row).to_string(),
1147+
external_id: external_id_array.and_then(|arr| {
1148+
if arr.is_null(row) {
1149+
None
1150+
} else {
1151+
Some(arr.value(row).to_string())
1152+
}
1153+
}),
10491154
run_id: run_id_array.value(row).to_string(),
10501155
bot_id,
10511156
session_id,
@@ -1134,6 +1239,7 @@ mod tests {
11341239
fn text_record(id: &str, embedding_pivot: f32) -> ContextRecord {
11351240
ContextRecord {
11361241
id: id.to_string(),
1242+
external_id: None,
11371243
run_id: format!("run-{id}"),
11381244
bot_id: None,
11391245
session_id: None,
@@ -1192,6 +1298,55 @@ mod tests {
11921298
});
11931299
}
11941300

1301+
#[test]
1302+
fn external_id_roundtrips_and_supports_lookup() {
1303+
let dir = TempDir::new().unwrap();
1304+
let uri = dir.path().to_string_lossy().to_string();
1305+
let runtime = tokio::runtime::Runtime::new().unwrap();
1306+
runtime.block_on(async {
1307+
let mut store = ContextStore::open(&uri).await.unwrap();
1308+
let mut record = text_record("a", 0.0);
1309+
record.external_id = Some("doc-123#chunk-1".to_string());
1310+
store.add(std::slice::from_ref(&record)).await.unwrap();
1311+
1312+
let by_external_id = store
1313+
.get_by_external_id("doc-123#chunk-1")
1314+
.await
1315+
.unwrap()
1316+
.unwrap();
1317+
assert_eq!(by_external_id.id, record.id);
1318+
assert_eq!(by_external_id.external_id, record.external_id);
1319+
1320+
let by_id = store.get_by_id(&record.id).await.unwrap().unwrap();
1321+
assert_eq!(by_id.external_id.as_deref(), Some("doc-123#chunk-1"));
1322+
1323+
let missing = store.get_by_external_id("missing").await.unwrap();
1324+
assert!(missing.is_none());
1325+
});
1326+
}
1327+
1328+
#[test]
1329+
fn add_rejects_duplicate_external_id() {
1330+
let dir = TempDir::new().unwrap();
1331+
let uri = dir.path().to_string_lossy().to_string();
1332+
let runtime = tokio::runtime::Runtime::new().unwrap();
1333+
runtime.block_on(async {
1334+
let mut store = ContextStore::open(&uri).await.unwrap();
1335+
let mut first = text_record("a", 0.0);
1336+
first.external_id = Some("doc-123#chunk-1".to_string());
1337+
store.add(std::slice::from_ref(&first)).await.unwrap();
1338+
1339+
let mut duplicate = text_record("b", 0.0);
1340+
duplicate.external_id = first.external_id.clone();
1341+
let err = store.add(&[duplicate]).await.unwrap_err();
1342+
let message = err.to_string();
1343+
assert!(
1344+
message.contains("external_id") && message.contains("already exists"),
1345+
"unexpected error message: {message}"
1346+
);
1347+
});
1348+
}
1349+
11951350
#[test]
11961351
fn test_region_id_derivation_explicit() {
11971352
let bot_id = Some("bot-123".to_string());

python/python/lance_context/api.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]:
115115
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
116116
return {
117117
"id": raw.get("id"),
118+
"external_id": raw.get("external_id"),
118119
"run_id": raw.get("run_id"),
119120
"bot_id": raw.get("bot_id"),
120121
"session_id": raw.get("session_id"),
@@ -350,13 +351,22 @@ def add(
350351
embedding: list[float] | None = None,
351352
bot_id: str | None = None,
352353
session_id: str | None = None,
354+
external_id: str | None = None,
353355
) -> None:
354356
if content_type is not None and data_type is not None:
355357
raise ValueError("Specify only one of content_type or data_type")
356358
if content_type is None:
357359
content_type = data_type
358360
payload, resolved_type = _normalize_content(content, content_type)
359-
self._inner.add(role, payload, resolved_type, embedding, bot_id, session_id)
361+
self._inner.add(
362+
role,
363+
payload,
364+
resolved_type,
365+
embedding,
366+
bot_id,
367+
session_id,
368+
external_id,
369+
)
360370

361371
def snapshot(self, label: str | None = None) -> str:
362372
return self._inner.snapshot(label)
@@ -389,6 +399,17 @@ def list(
389399
results = self._inner.list(limit, offset)
390400
return [_normalize_record(item) for item in results]
391401

402+
def get(
403+
self, *, id: str | None = None, external_id: str | None = None
404+
) -> dict[str, Any] | None:
405+
"""Return one entry by internal id or caller-supplied external id."""
406+
if (id is None) == (external_id is None):
407+
raise ValueError("Specify exactly one of id or external_id")
408+
result = self._inner.get(id, external_id)
409+
if result is None:
410+
return None
411+
return _normalize_record(result)
412+
392413
def compact(
393414
self,
394415
*,

0 commit comments

Comments
 (0)