Skip to content

Commit 3676846

Browse files
authored
[CHORE]: Create per-tenant config in the compactor for shard sizes (#6847)
## Description of changes This change creates a hashmap config in the compactor called `tenant_shard_sizes` from where one can specify a per-tenant limit on shard size. If a tenant is not specified in this config then it is considered to not have sharding enabled. Sample config: ``` compactor: shard_size: 1000000 # 1MB shard size limit sharding_enabled_tenant_patterns: - "*" # Enable for all tenants - "test-tenant-1" # Enable for specific test tenant ``` tenant_id had to be plumbed into the `CompactionJob` structure for this to work out. Due to this, the `rebuild_batch` endpoint was changed to retrieve tenant information from the sysdb before proceeding with a rebuild. - Improvements & Bug fixes - ^ - New functionality - ... ## Test plan Manual testing. Created a collection and specified a low shard size limit for the default tenant. Observed sharding working on this collection as expected. - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 6ee8c75 commit 3676846

File tree

5 files changed

+73
-12
lines changed

5 files changed

+73
-12
lines changed

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ pub(crate) struct CompactionManagerContext {
107107
collections_for_fragment_fetch: HashSet<CollectionUuid>,
108108
bloom_filter_manager: Option<BloomFilterManager>,
109109
shard_size: Option<u64>,
110+
sharding_enabled_tenant_patterns: Vec<String>,
110111
}
111112

112113
pub(crate) struct CompactionManager {
@@ -162,6 +163,7 @@ impl CompactionManager {
162163
collections_for_fragment_fetch: HashSet<CollectionUuid>,
163164
bloom_filter_manager: Option<BloomFilterManager>,
164165
shard_size: Option<u64>,
166+
sharding_enabled_tenant_patterns: Vec<String>,
165167
) -> Result<Self, Box<dyn ChromaError>> {
166168
let (compact_awaiter_tx, compact_awaiter_rx) =
167169
mpsc::channel::<CompactionTask>(compaction_manager_queue_size);
@@ -201,6 +203,7 @@ impl CompactionManager {
201203
collections_for_fragment_fetch,
202204
bloom_filter_manager,
203205
shard_size,
206+
sharding_enabled_tenant_patterns,
204207
},
205208
on_next_memberlist_signal: None,
206209
compact_awaiter_channel: compact_awaiter_tx,
@@ -231,6 +234,7 @@ impl CompactionManager {
231234
.compact(
232235
job.collection_id,
233236
job.database_name.clone(),
237+
job.tenant_id.clone(),
234238
false,
235239
HashSet::new(),
236240
)
@@ -257,15 +261,40 @@ impl CompactionManager {
257261
collection_ids: &[CollectionUuid],
258262
segment_scopes: &HashSet<chroma_types::SegmentScope>,
259263
) {
260-
// TODO(tanujnay112): Implement this for MCMR by accepting a database/topo name on this method.
261-
let _ = collection_ids
264+
let options = chroma_sysdb::types::GetCollectionsOptions {
265+
collection_ids: Some(collection_ids.to_vec()),
266+
..Default::default()
267+
};
268+
let collections = match self.context.sysdb.get_collections(options).await {
269+
Ok(collections) => collections,
270+
Err(e) => {
271+
// TODO(tanujnay112): Propagate error up and then handle it there.
272+
tracing::error!("Failed to get collections in rebuild: {}", e);
273+
return;
274+
}
275+
};
276+
let _ = collections
262277
.iter()
263-
.map(|id| {
264-
let database_name =
265-
chroma_types::DatabaseName::new("default").expect("default should be valid");
266-
self.context
267-
.clone()
268-
.compact(*id, database_name, true, segment_scopes.clone())
278+
.filter_map(|collection| {
279+
match chroma_types::DatabaseName::new(collection.database.clone()) {
280+
Some(database_name) => Some(
281+
self.context.clone().compact(
282+
collection.collection_id,
283+
database_name,
284+
collection.tenant.clone(),
285+
true,
286+
segment_scopes.clone(),
287+
)
288+
),
289+
None => {
290+
tracing::error!(
291+
"Invalid database name '{}' for collection {} (must be at least 3 characters)",
292+
collection.database,
293+
collection.collection_id
294+
);
295+
None
296+
}
297+
}
269298
})
270299
.collect::<FuturesUnordered<_>>()
271300
.collect::<Vec<_>>()
@@ -435,6 +464,7 @@ impl CompactionManagerContext {
435464
self,
436465
collection_id: CollectionUuid,
437466
database_name: chroma_types::DatabaseName,
467+
tenant_id: String,
438468
is_rebuild: bool,
439469
apply_segment_scopes: HashSet<chroma_types::SegmentScope>,
440470
) -> Result<CompactionResponse, Box<dyn ChromaError>> {
@@ -452,6 +482,12 @@ impl CompactionManagerContext {
452482
let is_function_disabled = self.disabled_function_collections.contains(&collection_id);
453483
let fragment_fetcher = self.fragment_fetcher_for_collection(collection_id);
454484
let bloom_filter_manager = self.bloom_filter_manager_for_collection(collection_id);
485+
let shard_size =
486+
if tenant_matches_patterns(&tenant_id, &self.sharding_enabled_tenant_patterns) {
487+
self.shard_size
488+
} else {
489+
None
490+
};
455491

456492
let compact_result = Box::pin(compact(
457493
self.system.clone(),
@@ -472,7 +508,7 @@ impl CompactionManagerContext {
472508
is_function_disabled,
473509
fragment_fetcher,
474510
bloom_filter_manager,
475-
self.shard_size,
511+
shard_size,
476512
#[cfg(test)]
477513
None,
478514
))
@@ -686,10 +722,23 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
686722
collections_for_fragment_fetch,
687723
Some(bloom_filter_manager),
688724
config.compactor.shard_size,
725+
config.compactor.sharding_enabled_tenant_patterns.clone(),
689726
)
690727
}
691728
}
692729

730+
fn tenant_matches_patterns(tenant_id: &str, patterns: &[String]) -> bool {
731+
for pattern in patterns {
732+
if pattern == "*" {
733+
return true;
734+
}
735+
if pattern == tenant_id {
736+
return true;
737+
}
738+
}
739+
false
740+
}
741+
693742
async fn compact_awaiter_loop(
694743
mut job_rx: mpsc::Receiver<CompactionTask>,
695744
completion_tx: mpsc::UnboundedSender<CompactionTaskCompletion>,
@@ -1245,6 +1294,7 @@ mod tests {
12451294
HashSet::new(), // collections_for_fragment_fetch
12461295
None, // bloom_filter_manager
12471296
None, // shard_size
1297+
Vec::new(), // sharding_enabled_tenant_patterns
12481298
)
12491299
.expect("Failed to create compaction manager in test");
12501300

rust/worker/src/compactor/config.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,17 @@ pub struct CompactorConfig {
127127
#[serde(default)]
128128
pub fragment_fetcher_cache: chroma_cache::CacheConfig,
129129

130-
/// The size threshold for shards. When a shard exceeds this size, it will be sealed
131-
/// and a new shard created. None or 0 means no limit.
130+
/// Global shard size limit. When a shard exceeds this size, it will be sealed
131+
/// and a new shard created. Only applies to tenants with sharding enabled.
132132
#[serde(default = "CompactorConfig::default_shard_size")]
133133
pub shard_size: Option<u64>,
134+
135+
/// List of tenant patterns that have sharding enabled. Supports wildcards:
136+
/// - "*" enables sharding for all tenants
137+
/// - "prod-*" enables for tenants starting with "prod-"
138+
/// - "tenant1" enables for exact match
139+
#[serde(default)]
140+
pub sharding_enabled_tenant_patterns: Vec<String>,
134141
}
135142

136143
impl CompactorConfig {
@@ -195,7 +202,7 @@ impl CompactorConfig {
195202
}
196203

197204
fn default_shard_size() -> Option<u64> {
198-
None // No limit by default
205+
None // No sharding by default
199206
}
200207
}
201208

@@ -222,6 +229,7 @@ impl Default for CompactorConfig {
222229
collections_for_fragment_fetch: Vec::new(),
223230
fragment_fetcher_cache: chroma_cache::CacheConfig::default(),
224231
shard_size: CompactorConfig::default_shard_size(),
232+
sharding_enabled_tenant_patterns: Vec::new(),
225233
}
226234
}
227235
}

rust/worker/src/compactor/scheduler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ impl Scheduler {
450450
self.job_queue.push(CompactionJob {
451451
collection_id: record.collection_id,
452452
database_name: database_name.clone(),
453+
tenant_id: record.tenant_id.clone(),
453454
});
454455
self.oneoff_collections.remove(&record.collection_id);
455456
rem_capacity -= 1;

rust/worker/src/compactor/scheduler_policy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl SchedulerPolicy for LasCompactionTimeSchedulerPolicy {
5959
tasks.push(CompactionJob {
6060
collection_id: collection.collection_id,
6161
database_name,
62+
tenant_id: collection.tenant_id.clone(),
6263
});
6364
}
6465
tasks

rust/worker/src/compactor/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tokio::sync::oneshot;
77
pub(crate) struct CompactionJob {
88
pub(crate) collection_id: CollectionUuid,
99
pub(crate) database_name: DatabaseName,
10+
pub(crate) tenant_id: String,
1011
}
1112

1213
#[derive(Clone, Debug)]

0 commit comments

Comments
 (0)