Skip to content

Commit 978202d

Browse files
author
Sicheng Pan
committed
Wire up RFE
1 parent 264cfbd commit 978202d

File tree

3 files changed

+110
-42
lines changed

3 files changed

+110
-42
lines changed

rust/frontend/src/get_collection_with_segments_provider.rs

+43-31
Original file line numberDiff line numberDiff line change
@@ -151,43 +151,55 @@ impl CollectionsWithSegmentsProvider {
151151
return Ok(collection_and_segments_with_ttl.collection_and_segments);
152152
}
153153
}
154-
// We acquire a lock to prevent the sysdb from experiencing a thundering herd.
155-
// This can happen when a large number of threads try to get the same collection
156-
// at the same time.
157-
let _guard = self.sysdb_rpc_lock.lock(&collection_id).await;
158-
// Double checked locking pattern to avoid lock contention in the
159-
// happy path when the collection is already cached.
160-
if let Some(collection_and_segments_with_ttl) = self
161-
.collections_with_segments_cache
162-
.get(&collection_id)
163-
.await?
164-
{
165-
if collection_and_segments_with_ttl.expires_at
166-
> SystemTime::now()
167-
.duration_since(UNIX_EPOCH)
168-
.expect("Do not deploy before UNIX epoch")
154+
155+
let collection_and_segments_sysdb = {
156+
// We acquire a lock to prevent the sysdb from experiencing a thundering herd.
157+
// This can happen when a large number of threads try to get the same collection
158+
// at the same time.
159+
let _guard = self.sysdb_rpc_lock.lock(&collection_id).await;
160+
// Double checked locking pattern to avoid lock contention in the
161+
// happy path when the collection is already cached.
162+
if let Some(collection_and_segments_with_ttl) = self
163+
.collections_with_segments_cache
164+
.get(&collection_id)
165+
.await?
169166
{
170-
return Ok(collection_and_segments_with_ttl.collection_and_segments);
167+
if collection_and_segments_with_ttl.expires_at
168+
> SystemTime::now()
169+
.duration_since(UNIX_EPOCH)
170+
.expect("Do not deploy before UNIX epoch")
171+
{
172+
return Ok(collection_and_segments_with_ttl.collection_and_segments);
173+
}
171174
}
172-
}
173-
tracing::info!("Cache miss for collection {}", collection_id);
174-
let collection_and_segments_sysdb = self
175-
.sysdb_client
176-
.get_collection_with_segments(collection_id)
177-
.await?;
178-
let collection_and_segments_sysdb_with_ttl = CollectionAndSegmentsWithTtl {
179-
collection_and_segments: collection_and_segments_sysdb.clone(),
180-
expires_at: SystemTime::now()
181-
.duration_since(UNIX_EPOCH)
182-
.expect("Do not deploy before UNIX epoch")
183-
+ Duration::from_secs(self.cache_ttl_secs as u64), // Cache for 1 minute
175+
tracing::info!("Cache miss for collection {}", collection_id);
176+
self.sysdb_client
177+
.get_collection_with_segments(collection_id)
178+
.await?
184179
};
180+
181+
self.set_collection_with_segments(collection_and_segments_sysdb.clone())
182+
.await;
183+
Ok(collection_and_segments_sysdb)
184+
}
185+
186+
pub(crate) async fn set_collection_with_segments(
187+
&mut self,
188+
collection_and_segments: CollectionAndSegments,
189+
) {
185190
// Insert only if the collection dimension is set.
186-
if collection_and_segments_sysdb.collection.dimension.is_some() {
191+
if collection_and_segments.collection.dimension.is_some() {
192+
let collection_id = collection_and_segments.collection.collection_id;
193+
let collection_and_segments_with_ttl = CollectionAndSegmentsWithTtl {
194+
collection_and_segments,
195+
expires_at: SystemTime::now()
196+
.duration_since(UNIX_EPOCH)
197+
.expect("Do not deploy before UNIX epoch")
198+
+ Duration::from_secs(self.cache_ttl_secs as u64), // Cache for 1 minute
199+
};
187200
self.collections_with_segments_cache
188-
.insert(collection_id, collection_and_segments_sysdb_with_ttl)
201+
.insert(collection_id, collection_and_segments_with_ttl)
189202
.await;
190203
}
191-
Ok(collection_and_segments_sysdb)
192204
}
193205
}

rust/frontend/src/impls/service_based_frontend.rs

+38-11
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ use chroma_types::{
2121
CreateTenantError, CreateTenantRequest, CreateTenantResponse, DeleteCollectionError,
2222
DeleteCollectionRecordsError, DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse,
2323
DeleteCollectionRequest, DeleteDatabaseError, DeleteDatabaseRequest, DeleteDatabaseResponse,
24-
GetCollectionError, GetCollectionRequest, GetCollectionResponse, GetCollectionsError,
25-
GetDatabaseError, GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse,
26-
GetTenantError, GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError,
27-
HeartbeatResponse, Include, InternalCollectionConfiguration, ListCollectionsRequest,
28-
ListCollectionsResponse, ListDatabasesError, ListDatabasesRequest, ListDatabasesResponse,
29-
Operation, OperationRecord, QueryError, QueryRequest, QueryResponse, ResetError, ResetResponse,
30-
Segment, SegmentScope, SegmentType, SegmentUuid, UpdateCollectionError,
31-
UpdateCollectionRecordsError, UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse,
32-
UpdateCollectionRequest, UpdateCollectionResponse, UpsertCollectionRecordsError,
33-
UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, VectorIndexConfiguration,
34-
Where,
24+
ForkCollectionError, ForkCollectionRequest, ForkCollectionResponse, GetCollectionError,
25+
GetCollectionRequest, GetCollectionResponse, GetCollectionsError, GetDatabaseError,
26+
GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse, GetTenantError,
27+
GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError, HeartbeatResponse,
28+
Include, InternalCollectionConfiguration, ListCollectionsRequest, ListCollectionsResponse,
29+
ListDatabasesError, ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord,
30+
QueryError, QueryRequest, QueryResponse, ResetError, ResetResponse, Segment, SegmentScope,
31+
SegmentType, SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError,
32+
UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateCollectionRequest,
33+
UpdateCollectionResponse, UpsertCollectionRecordsError, UpsertCollectionRecordsRequest,
34+
UpsertCollectionRecordsResponse, VectorIndexConfiguration, Where,
3535
};
3636
use opentelemetry::global;
3737
use opentelemetry::metrics::Counter;
@@ -495,6 +495,33 @@ impl ServiceBasedFrontend {
495495
Ok(DeleteCollectionRecordsResponse {})
496496
}
497497

498+
pub async fn fork_collection(
499+
&mut self,
500+
ForkCollectionRequest {
501+
source_collection_id,
502+
target_collection_name,
503+
..
504+
}: ForkCollectionRequest,
505+
) -> Result<ForkCollectionResponse, ForkCollectionError> {
506+
let target_collection_id = CollectionUuid::new();
507+
let collection_and_segments = self
508+
.sysdb_client
509+
.fork_collection(
510+
source_collection_id,
511+
target_collection_id,
512+
target_collection_name,
513+
)
514+
.await?;
515+
let collection = collection_and_segments.collection.clone();
516+
517+
// Update the cache.
518+
self.collections_with_segments_provider
519+
.set_collection_with_segments(collection_and_segments)
520+
.await;
521+
522+
Ok(collection)
523+
}
524+
498525
pub async fn add(
499526
&mut self,
500527
AddCollectionRecordsRequest {

rust/types/src/api_types.rs

+29
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,35 @@ impl ChromaError for DeleteCollectionError {
758758
}
759759
}
760760

761+
#[non_exhaustive]
762+
#[derive(Clone, Validate, Serialize, ToSchema)]
763+
pub struct ForkCollectionRequest {
764+
pub tenant_id: String,
765+
pub database_name: String,
766+
pub source_collection_id: CollectionUuid,
767+
pub target_collection_name: String,
768+
}
769+
770+
impl ForkCollectionRequest {
771+
pub fn try_new(
772+
tenant_id: String,
773+
database_name: String,
774+
source_collection_id: CollectionUuid,
775+
target_collection_name: String,
776+
) -> Result<Self, ChromaValidationError> {
777+
let request = Self {
778+
tenant_id,
779+
database_name,
780+
source_collection_id,
781+
target_collection_name,
782+
};
783+
request.validate().map_err(ChromaValidationError::from)?;
784+
Ok(request)
785+
}
786+
}
787+
788+
pub type ForkCollectionResponse = Collection;
789+
761790
#[derive(Error, Debug)]
762791
pub enum ForkCollectionError {
763792
#[error("Collection [{0}] already exists")]

0 commit comments

Comments
 (0)