Skip to content

Commit 5ecb617

Browse files
author
Sicheng Pan
committed
Wire up RFE
1 parent 6ca2f00 commit 5ecb617

File tree

3 files changed

+107
-38
lines changed

3 files changed

+107
-38
lines changed

Diff for: rust/frontend/src/get_collection_with_segments_provider.rs

+43-31
Original file line numberDiff line numberDiff line change
@@ -151,43 +151,55 @@ impl CollectionsWithSegmentsProvider {
151151
return Ok(collection_and_segments_with_ttl.collection_and_segments);
152152
}
153153
}
154-
// We acquire a lock to prevent the sysdb from experiencing a thundering herd.
155-
// This can happen when a large number of threads try to get the same collection
156-
// at the same time.
157-
let _guard = self.sysdb_rpc_lock.lock(&collection_id).await;
158-
// Double checked locking pattern to avoid lock contention in the
159-
// happy path when the collection is already cached.
160-
if let Some(collection_and_segments_with_ttl) = self
161-
.collections_with_segments_cache
162-
.get(&collection_id)
163-
.await?
164-
{
165-
if collection_and_segments_with_ttl.expires_at
166-
> SystemTime::now()
167-
.duration_since(UNIX_EPOCH)
168-
.expect("Do not deploy before UNIX epoch")
154+
155+
let collection_and_segments_sysdb = {
156+
// We acquire a lock to prevent the sysdb from experiencing a thundering herd.
157+
// This can happen when a large number of threads try to get the same collection
158+
// at the same time.
159+
let _guard = self.sysdb_rpc_lock.lock(&collection_id).await;
160+
// Double checked locking pattern to avoid lock contention in the
161+
// happy path when the collection is already cached.
162+
if let Some(collection_and_segments_with_ttl) = self
163+
.collections_with_segments_cache
164+
.get(&collection_id)
165+
.await?
169166
{
170-
return Ok(collection_and_segments_with_ttl.collection_and_segments);
167+
if collection_and_segments_with_ttl.expires_at
168+
> SystemTime::now()
169+
.duration_since(UNIX_EPOCH)
170+
.expect("Do not deploy before UNIX epoch")
171+
{
172+
return Ok(collection_and_segments_with_ttl.collection_and_segments);
173+
}
171174
}
172-
}
173-
tracing::info!("Cache miss for collection {}", collection_id);
174-
let collection_and_segments_sysdb = self
175-
.sysdb_client
176-
.get_collection_with_segments(collection_id)
177-
.await?;
178-
let collection_and_segments_sysdb_with_ttl = CollectionAndSegmentsWithTtl {
179-
collection_and_segments: collection_and_segments_sysdb.clone(),
180-
expires_at: SystemTime::now()
181-
.duration_since(UNIX_EPOCH)
182-
.expect("Do not deploy before UNIX epoch")
183-
+ Duration::from_secs(self.cache_ttl_secs as u64), // Cache for 1 minute
175+
tracing::info!("Cache miss for collection {}", collection_id);
176+
self.sysdb_client
177+
.get_collection_with_segments(collection_id)
178+
.await?
184179
};
180+
181+
self.set_collection_with_segments(collection_and_segments_sysdb.clone())
182+
.await;
183+
Ok(collection_and_segments_sysdb)
184+
}
185+
186+
pub(crate) async fn set_collection_with_segments(
187+
&mut self,
188+
collection_and_segments: CollectionAndSegments,
189+
) {
185190
// Insert only if the collection dimension is set.
186-
if collection_and_segments_sysdb.collection.dimension.is_some() {
191+
if collection_and_segments.collection.dimension.is_some() {
192+
let collection_id = collection_and_segments.collection.collection_id;
193+
let collection_and_segments_with_ttl = CollectionAndSegmentsWithTtl {
194+
collection_and_segments,
195+
expires_at: SystemTime::now()
196+
.duration_since(UNIX_EPOCH)
197+
.expect("Do not deploy before UNIX epoch")
198+
+ Duration::from_secs(self.cache_ttl_secs as u64), // Cache for 1 minute
199+
};
187200
self.collections_with_segments_cache
188-
.insert(collection_id, collection_and_segments_sysdb_with_ttl)
201+
.insert(collection_id, collection_and_segments_with_ttl)
189202
.await;
190203
}
191-
Ok(collection_and_segments_sysdb)
192204
}
193205
}

Diff for: rust/frontend/src/impls/service_based_frontend.rs

+35-7
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ use chroma_types::{
2121
CreateTenantError, CreateTenantRequest, CreateTenantResponse, DeleteCollectionError,
2222
DeleteCollectionRecordsError, DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse,
2323
DeleteCollectionRequest, DeleteDatabaseError, DeleteDatabaseRequest, DeleteDatabaseResponse,
24-
GetCollectionError, GetCollectionRequest, GetCollectionResponse, GetCollectionsError,
25-
GetDatabaseError, GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse,
26-
GetTenantError, GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError,
27-
HeartbeatResponse, Include, KnnIndex, ListCollectionsRequest, ListCollectionsResponse,
28-
ListDatabasesError, ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord,
29-
QueryError, QueryRequest, QueryResponse, ResetError, ResetResponse, Segment, SegmentScope,
30-
SegmentType, SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError,
24+
ForkCollectionError, ForkCollectionRequest, ForkCollectionResponse, GetCollectionError,
25+
GetCollectionRequest, GetCollectionResponse, GetCollectionsError, GetDatabaseError,
26+
GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse, GetTenantError,
27+
GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError, HeartbeatResponse,
28+
Include, KnnIndex, ListCollectionsRequest, ListCollectionsResponse, ListDatabasesError,
29+
ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord, QueryError,
30+
QueryRequest, QueryResponse, ResetError, ResetResponse, Segment, SegmentScope, SegmentType,
31+
SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError,
3132
UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateCollectionRequest,
3233
UpdateCollectionResponse, UpsertCollectionRecordsError, UpsertCollectionRecordsRequest,
3334
UpsertCollectionRecordsResponse, VectorIndexConfiguration, Where,
@@ -542,6 +543,33 @@ impl ServiceBasedFrontend {
542543
Ok(DeleteCollectionRecordsResponse {})
543544
}
544545

546+
pub async fn fork_collection(
547+
&mut self,
548+
ForkCollectionRequest {
549+
source_collection_id,
550+
target_collection_name,
551+
..
552+
}: ForkCollectionRequest,
553+
) -> Result<ForkCollectionResponse, ForkCollectionError> {
554+
let target_collection_id = CollectionUuid::new();
555+
let collection_and_segments = self
556+
.sysdb_client
557+
.fork_collection(
558+
source_collection_id,
559+
target_collection_id,
560+
target_collection_name,
561+
)
562+
.await?;
563+
let collection = collection_and_segments.collection.clone();
564+
565+
// Update the cache.
566+
self.collections_with_segments_provider
567+
.set_collection_with_segments(collection_and_segments)
568+
.await;
569+
570+
Ok(collection)
571+
}
572+
545573
pub async fn add(
546574
&mut self,
547575
AddCollectionRecordsRequest {

Diff for: rust/types/src/api_types.rs

+29
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,35 @@ impl ChromaError for DeleteCollectionError {
767767
}
768768
}
769769

770+
#[non_exhaustive]
771+
#[derive(Clone, Validate, Serialize, ToSchema)]
772+
pub struct ForkCollectionRequest {
773+
pub tenant_id: String,
774+
pub database_name: String,
775+
pub source_collection_id: CollectionUuid,
776+
pub target_collection_name: String,
777+
}
778+
779+
impl ForkCollectionRequest {
780+
pub fn try_new(
781+
tenant_id: String,
782+
database_name: String,
783+
source_collection_id: CollectionUuid,
784+
target_collection_name: String,
785+
) -> Result<Self, ChromaValidationError> {
786+
let request = Self {
787+
tenant_id,
788+
database_name,
789+
source_collection_id,
790+
target_collection_name,
791+
};
792+
request.validate().map_err(ChromaValidationError::from)?;
793+
Ok(request)
794+
}
795+
}
796+
797+
pub type ForkCollectionResponse = Collection;
798+
770799
#[derive(Error, Debug)]
771800
pub enum ForkCollectionError {
772801
#[error("Collection [{0}] already exists")]

0 commit comments

Comments
 (0)