From 3adfe105092e87eb42632c01e3b8d93fdf375b2c Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Fri, 10 Apr 2026 10:46:51 -0700 Subject: [PATCH] feat(faults): Add Tilt fault injection CLI Add a chroma-fault binary for injecting, listing, and clearing faults against Tilt's rust-log-service with either Tilt instance defaults or an explicit service address. Wire the faults feature into Tilt and CI builds for chroma-log-service, and only register the fault injection gRPC service when that feature is enabled. Add warning logs around injected wal3 upload faults and read repair, and raise the garbage collector dispatcher queue sizes in worker configs. Co-authored-by: AI --- .../tilt-setup-prebuild/docker-bake.hcl | 3 + Cargo.lock | 2 + Tiltfile | 7 +- rust/Dockerfile | 7 +- rust/faults/Cargo.toml | 6 +- rust/faults/src/bin/chroma-fault.rs | 347 ++++++++++++++++++ rust/faults/src/lib.rs | 15 +- rust/log-service/src/lib.rs | 90 ++++- rust/tracing/src/init_tracer.rs | 1 + rust/wal3/src/interfaces/mod.rs | 46 +++ .../src/interfaces/repl/fragment_manager.rs | 72 +++- rust/wal3/src/interfaces/repl/mod.rs | 14 +- rust/wal3/src/lib.rs | 9 +- rust/worker/chroma_config.yaml | 4 +- rust/worker/chroma_mcmr.yaml | 4 +- rust/worker/chroma_mcmr2.yaml | 4 +- 16 files changed, 592 insertions(+), 39 deletions(-) create mode 100644 rust/faults/src/bin/chroma-fault.rs diff --git a/.github/actions/tilt-setup-prebuild/docker-bake.hcl b/.github/actions/tilt-setup-prebuild/docker-bake.hcl index e71a433c9e6..d1fa6c1f332 100644 --- a/.github/actions/tilt-setup-prebuild/docker-bake.hcl +++ b/.github/actions/tilt-setup-prebuild/docker-bake.hcl @@ -1,6 +1,9 @@ target "rust-log-service" { dockerfile = "rust/Dockerfile" target = "log_service" + args = { + LOG_SERVICE_CARGO_FEATURES = "faults" + } tags = [ "rust-log-service:ci" ] } diff --git a/Cargo.lock b/Cargo.lock index e06a5df0af8..193afff23f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1749,9 +1749,11 @@ dependencies = [ "async-trait", "chroma-config", "chroma-types", + "clap", "parking_lot", "tokio", "tonic 0.14.5", + "tracing", "uuid", ] diff --git a/Tiltfile b/Tiltfile index 93d0bbecfc9..ea54110caec 100644 --- a/Tiltfile +++ b/Tiltfile @@ -30,7 +30,8 @@ else: '.', only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"], dockerfile='./rust/Dockerfile', - target='log_service' + target='log_service', + build_args={'LOG_SERVICE_CARGO_FEATURES': 'faults'} ) if config.tilt_subcommand == "ci": @@ -320,7 +321,7 @@ k8s_resource('rust-sysdb-service:deployment:chroma', resource_deps = ['k8s_setup k8s_resource('rust-frontend-service:deployment:chroma', resource_deps=['sysdb:deployment:chroma', 'rust-log-service:statefulset:chroma'], labels=["chroma"], port_forwards='8000:8000') k8s_resource('query-service:statefulset:chroma', resource_deps=['sysdb:deployment:chroma'], labels=["chroma"], port_forwards='50053:50051') k8s_resource('compaction-service:statefulset:chroma', resource_deps=['sysdb:deployment:chroma'], labels=["chroma"], port_forwards="50057:50051") -k8s_resource('garbage-collector:statefulset:chroma', resource_deps=['k8s_setup', 'minio-deployment'], labels=["chroma"], port_forwards='50055:50055') +k8s_resource('garbage-collector:statefulset:chroma', resource_deps=['k8s_setup', 'minio-deployment', 'rust-log-service:statefulset:chroma'], labels=["chroma"], port_forwards='50055:50055') # Production Chroma 2 k8s_resource('postgres:deployment:chroma2', resource_deps=['k8s_setup2', 'postgres:deployment:chroma'], labels=["infrastructure2"], port_forwards='6432:5432') @@ -333,7 +334,7 @@ k8s_resource('rust-sysdb-service:deployment:chroma2', resource_deps=['k8s_setup2 k8s_resource('rust-frontend-service:deployment:chroma2', resource_deps=['sysdb:deployment:chroma2', 'rust-log-service:statefulset:chroma2', 'rust-frontend-service:deployment:chroma'], labels=["chroma2"], port_forwards='8001:8000') k8s_resource('query-service:statefulset:chroma2', resource_deps=['sysdb:deployment:chroma2', 'query-service:statefulset:chroma'], labels=["chroma2"], port_forwards='60053:50051') k8s_resource('compaction-service:statefulset:chroma2', resource_deps=['sysdb:deployment:chroma2', 'compaction-service:statefulset:chroma'], labels=["chroma2"]) -k8s_resource('garbage-collector:statefulset:chroma2', resource_deps=['k8s_setup2', 'minio-deployment', 'garbage-collector:statefulset:chroma'], labels=["chroma2"], port_forwards='60055:50055') +k8s_resource('garbage-collector:statefulset:chroma2', resource_deps=['k8s_setup2', 'minio-deployment', 'rust-log-service:statefulset:chroma2', 'garbage-collector:statefulset:chroma'], labels=["chroma2"], port_forwards='60055:50055') # Observability k8s_resource('jaeger', resource_deps=['k8s_setup'], labels=["observability"]) diff --git a/rust/Dockerfile b/rust/Dockerfile index 79d85208b7d..797c1e446ba 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -3,6 +3,7 @@ FROM rust:1.92.0 AS builder ARG RELEASE_MODE= ARG PROTOC_VERSION=31.1 ARG ENABLE_AVX512= +ARG LOG_SERVICE_CARGO_FEATURES= # ADDRESS_SANITIZER is an optional build argument to enable Address Sanitizer. ARG ADDRESS_SANITIZER @@ -57,7 +58,11 @@ RUN --mount=type=cache,sharing=locked,target=/chroma/target/ \ echo "Building without AVX512 optimizations"; \ fi && \ build_target=$( [ "${ADDRESS_SANITIZER}" = "1" ] && echo '--target x86_64-unknown-linux-gnu' || echo '' ) && \ - if [ "$RELEASE_MODE" = "1" ]; then cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES) --release; else cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES); fi && \ + release_flag=$( [ "$RELEASE_MODE" = "1" ] && echo '--release' || echo '' ) && \ + cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES) ${release_flag} && \ + if [ -n "$LOG_SERVICE_CARGO_FEATURES" ]; then \ + cargo build ${build_target} -p chroma-log-service --bin log_service --features "$LOG_SERVICE_CARGO_FEATURES" ${release_flag}; \ + fi && \ build_dir=$( [ "$RELEASE_MODE" = "1" ] && echo release || echo debug ) && \ build_dir=$( [ "${ADDRESS_SANITIZER}" = "1" ] && echo "x86_64-unknown-linux-gnu/${build_dir}" || echo "${build_dir}" ) && \ for bin in chroma garbage_collector_service chroma-load log_service heap_tender_service query_service compaction_service sysdb_service spanner_migration; do \ diff --git a/rust/faults/Cargo.toml b/rust/faults/Cargo.toml index 41095e4585f..fabc0e8afb5 100644 --- a/rust/faults/Cargo.toml +++ b/rust/faults/Cargo.toml @@ -5,12 +5,12 @@ edition = "2021" [dependencies] async-trait = { workspace = true } +clap = { workspace = true } parking_lot = { workspace = true } +tokio = { workspace = true } tonic = { workspace = true } +tracing = { workspace = true } uuid = { workspace = true } chroma-config = { workspace = true } chroma-types = { workspace = true } - -[dev-dependencies] -tokio = { workspace = true } diff --git a/rust/faults/src/bin/chroma-fault.rs b/rust/faults/src/bin/chroma-fault.rs new file mode 100644 index 00000000000..fa50f60da29 --- /dev/null +++ b/rust/faults/src/bin/chroma-fault.rs @@ -0,0 +1,347 @@ +use std::error::Error; + +use chroma_types::chroma_proto::fault_action::Act; +use chroma_types::chroma_proto::fault_injection_service_client::FaultInjectionServiceClient; +use chroma_types::chroma_proto::fault_selector::By; +use chroma_types::chroma_proto::{ + ActionDelay, ActionUnavailable, ClearFaultsRequest, FaultAction, FaultEntry, FaultSelector, + InjectFaultsRequest, ListFaultsRequest, SelectFileLine, SelectLabel, +}; +use clap::{ArgGroup, Args, Parser, Subcommand, ValueEnum}; +use tonic::transport::Channel; + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +enum TiltInstance { + Chroma, + Chroma2, +} + +impl TiltInstance { + fn default_addr(self) -> &'static str { + match self { + TiltInstance::Chroma => "http://127.0.0.1:50054", + TiltInstance::Chroma2 => "http://127.0.0.1:60054", + } + } +} + +#[derive(Debug, Parser)] +#[command( + name = "chroma-fault", + about = "Inject, inspect, and clear faults against Tilt's rust-log-service." +)] +struct Cli { + #[arg( + long, + global = true, + help = "Fault injection service address. Defaults to the selected Tilt instance." + )] + addr: Option, + + #[arg( + long, + global = true, + value_enum, + default_value_t = TiltInstance::Chroma, + help = "Tilt instance to target when --addr is omitted." + )] + tilt_instance: TiltInstance, + + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Subcommand)] +enum Command { + /// Inject a new fault + Inject(InjectArgs), + /// List all injected faults + List, + /// Clear one fault selector or all faults + Clear(ClearArgs), +} + +#[derive(Debug, Args)] +struct InjectArgs { + #[command(flatten)] + selector: SelectorArgs, + + #[command(flatten)] + action: ActionArgs, +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("selector") + .required(true) + .multiple(false) + .args(["label", "file"]) +))] +struct SelectorArgs { + #[arg(long, help = "Match a named fault label.")] + label: Option, + + #[arg(long, requires = "line", help = "Match a specific source file.")] + file: Option, + + #[arg(long, requires = "file", help = "Match a specific source line.")] + line: Option, +} + +impl SelectorArgs { + fn to_proto(&self) -> FaultSelector { + match (&self.label, &self.file, self.line) { + (Some(label), None, None) => FaultSelector { + by: Some(By::Label(SelectLabel { + label: label.clone(), + })), + }, + (None, Some(file), Some(line)) => FaultSelector { + by: Some(By::FileLine(SelectFileLine { + file: file.clone(), + line, + })), + }, + _ => unreachable!("clap guarantees selector validation"), + } + } +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("action") + .required(true) + .multiple(false) + .args(["unavailable", "delay_seconds"]) +))] +struct ActionArgs { + #[arg(long, help = "Return UNAVAILABLE for matching requests.")] + unavailable: bool, + + #[arg(long, help = "Delay matching requests by this many seconds.")] + delay_seconds: Option, +} + +impl ActionArgs { + fn to_proto(&self) -> FaultAction { + let act = if self.unavailable { + Act::Unavailable(ActionUnavailable {}) + } else if let Some(delay_seconds) = self.delay_seconds { + Act::Delay(ActionDelay { delay_seconds }) + } else { + unreachable!("clap guarantees action validation") + }; + FaultAction { act: Some(act) } + } +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("clear_target") + .required(true) + .multiple(false) + .args(["all", "label", "file"]) +))] +struct ClearArgs { + #[arg(long, help = "Clear every injected fault.")] + all: bool, + + #[arg(long, help = "Clear faults matching this label.")] + label: Option, + + #[arg( + long, + requires = "line", + help = "Clear faults matching this source file." + )] + file: Option, + + #[arg( + long, + requires = "file", + help = "Clear faults matching this source line." + )] + line: Option, +} + +impl ClearArgs { + fn selector(&self) -> Option { + if self.all { + return None; + } + + Some(match (&self.label, &self.file, self.line) { + (Some(label), None, None) => FaultSelector { + by: Some(By::Label(SelectLabel { + label: label.clone(), + })), + }, + (None, Some(file), Some(line)) => FaultSelector { + by: Some(By::FileLine(SelectFileLine { + file: file.clone(), + line, + })), + }, + _ => unreachable!("clap guarantees clear target validation"), + }) + } +} + +fn resolved_addr(cli: &Cli) -> String { + cli.addr + .clone() + .unwrap_or_else(|| cli.tilt_instance.default_addr().to_string()) +} + +fn format_selector(selector: &FaultSelector) -> String { + match selector.by.as_ref() { + Some(By::Label(SelectLabel { label })) => format!("label({label})"), + Some(By::FileLine(SelectFileLine { file, line })) => format!("file({file}:{line})"), + None => "".to_string(), + } +} + +fn format_action(action: &FaultAction) -> String { + match action.act.as_ref() { + Some(Act::Unavailable(_)) => "unavailable".to_string(), + Some(Act::Delay(ActionDelay { delay_seconds })) => format!("delay({delay_seconds}s)"), + None => "".to_string(), + } +} + +fn format_entry(entry: &FaultEntry) -> String { + let selector = entry + .selector + .as_ref() + .map(format_selector) + .unwrap_or_else(|| "".to_string()); + let action = entry + .action + .as_ref() + .map(format_action) + .unwrap_or_else(|| "".to_string()); + format!("{selector} -> {action}") +} + +async fn connect(addr: &str) -> Result, Box> { + let channel = Channel::from_shared(addr.to_string())?.connect().await?; + Ok(FaultInjectionServiceClient::new(channel)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + let addr = resolved_addr(&cli); + let mut client = connect(&addr).await?; + + match cli.command { + Command::Inject(args) => { + let selector = args.selector.to_proto(); + let action = args.action.to_proto(); + client + .inject_faults(InjectFaultsRequest { + selector: Some(selector.clone()), + action: Some(action), + }) + .await?; + println!( + "Injected {} on {}", + format_action(&action), + format_selector(&selector) + ); + } + Command::List => { + let response = client.list_faults(ListFaultsRequest {}).await?.into_inner(); + if response.faults.is_empty() { + println!("No faults configured."); + } else { + for (idx, fault) in response.faults.iter().enumerate() { + println!("{}. {}", idx + 1, format_entry(fault)); + } + } + } + Command::Clear(args) => { + let response = client + .clear_faults(ClearFaultsRequest { + id: None, + selector: args.selector(), + }) + .await? + .into_inner(); + println!("Cleared {} faults.", response.cleared_count); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tilt_default_addresses_match_tiltfile_port_forwards() { + assert_eq!( + TiltInstance::Chroma.default_addr(), + "http://127.0.0.1:50054" + ); + assert_eq!( + TiltInstance::Chroma2.default_addr(), + "http://127.0.0.1:60054" + ); + } + + #[test] + fn explicit_address_overrides_tilt_instance_default() { + let cli = Cli { + addr: Some("http://localhost:7000".to_string()), + tilt_instance: TiltInstance::Chroma2, + command: Command::List, + }; + + assert_eq!(resolved_addr(&cli), "http://localhost:7000"); + } + + #[test] + fn selector_args_build_label_selector() { + let selector = SelectorArgs { + label: Some("fragment-upload".to_string()), + file: None, + line: None, + } + .to_proto(); + + assert_eq!( + selector.by, + Some(By::Label(SelectLabel { + label: "fragment-upload".to_string(), + })) + ); + } + + #[test] + fn action_args_build_delay_action() { + let action = ActionArgs { + unavailable: false, + delay_seconds: Some(7), + } + .to_proto(); + + assert_eq!( + action.act, + Some(Act::Delay(ActionDelay { delay_seconds: 7 })) + ); + } + + #[test] + fn clear_all_omits_selector() { + let clear = ClearArgs { + all: true, + label: None, + file: None, + line: None, + }; + + assert_eq!(clear.selector(), None); + } +} diff --git a/rust/faults/src/lib.rs b/rust/faults/src/lib.rs index cf5b194c783..382e0b5e1a0 100644 --- a/rust/faults/src/lib.rs +++ b/rust/faults/src/lib.rs @@ -211,6 +211,7 @@ fn stored_fault_to_proto(fault: &StoredFault) -> FaultEntry { #[async_trait] impl FaultInjectionService for FaultRegistry { + #[tracing::instrument(skip(self, request))] async fn inject_faults( &self, request: Request, @@ -224,10 +225,12 @@ impl FaultInjectionService for FaultRegistry { .action .ok_or_else(|| invalid_argument("inject_faults requires an action")) .and_then(action_from_proto)?; + tracing::warn!(selector = ?selector, action = ?action, "inject fault configured"); let id = self.inject(selector, action); Ok(Response::new(InjectFaultsResponse { id: id.to_string() })) } + #[tracing::instrument(skip(self, _request))] async fn list_faults( &self, _request: Request, @@ -240,6 +243,7 @@ impl FaultInjectionService for FaultRegistry { Ok(Response::new(ListFaultsResponse { faults })) } + #[tracing::instrument(skip(self, request))] async fn clear_faults( &self, request: Request, @@ -248,16 +252,21 @@ impl FaultInjectionService for FaultRegistry { let fault_id: Option = request.id.as_deref().map(TryInto::try_into).transpose()?; let selector: Option = request.selector.map(TryInto::try_into).transpose()?; - let cleared_count = match (fault_id, selector) { + let cleared_count = match (&fault_id, &selector) { (Some(_), Some(_)) => { return Err(invalid_argument( "clear_faults requires exactly one of id or selector, not both", )); } - (Some(id), None) => self.clear_id(Some(&id)), - (None, Some(ref sel)) => self.clear_selector(sel), + (Some(id), None) => self.clear_id(Some(id)), + (None, Some(sel)) => self.clear_selector(sel), (None, None) => self.clear_all(), }; + tracing::warn!( + selector = ?selector, + cleared_count, + "inject fault cleared" + ); Ok(Response::new(ClearFaultsResponse { cleared_count: cleared_count as u64, })) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 643e665d614..3f24956b76f 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -20,8 +20,9 @@ use chroma_storage::config::StorageConfig; use chroma_storage::Storage; use chroma_tracing::OtelFilter; use chroma_tracing::OtelFilterLevel; +#[cfg(feature = "faults")] +use chroma_types::chroma_proto::fault_injection_service_server::FaultInjectionServiceServer; use chroma_types::chroma_proto::{ - fault_injection_service_server::FaultInjectionServiceServer, garbage_collect_phase2_request::LogToCollect, log_service_server::LogService, purge_from_cache_request::EntryToEvict, CollectionInfo, GarbageCollectPhase2Request, GarbageCollectPhase2Response, GetAllCollectionInfoToCompactRequest, @@ -65,7 +66,8 @@ use wal3::{ }; #[cfg(feature = "faults")] use wal3::{ - FaultInjectingFragmentManagerFactory, FragmentUploadFault, FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_upload_replica_fault_label, FaultInjectingFragmentManagerFactory, FragmentUploadFault, + FRAGMENT_UPLOAD_FAULT_LABEL, }; mod scrub; @@ -145,6 +147,16 @@ impl FragmentUploadFaultInjector for LogServiceFragmentUploadFaultInjector { chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay), }) } + + fn fault_for_replica_upload(&self, replica_idx: usize) -> Option { + let label = fragment_upload_replica_fault_label(replica_idx)?; + self.faults + .action_for_label(label) + .map(|action| match action { + chroma_faults::FaultActionKind::Unavailable => FragmentUploadFault::Unavailable, + chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay), + }) + } } #[cfg(feature = "faults")] @@ -442,6 +454,8 @@ impl<'a> FactoryCreationContext<'a> { region_names, self.collection_id.0, ); + let fragment_factory = fragment_factory + .with_fault_injector(self.fragment_upload_fault_injector.as_ref().map(Arc::clone)); let fragment_factory = maybe_wrap_fragment_manager_factory( fragment_factory, self.fragment_upload_fault_injector.as_ref().map(Arc::clone), @@ -703,6 +717,8 @@ async fn get_log_from_handle_with_mutex_held<'a>( region_names, collection_id.0, ); + let fragment_publisher_factory = fragment_publisher_factory + .with_fault_injector(fragment_upload_fault_injector.as_ref().map(Arc::clone)); let fragment_publisher_factory = maybe_wrap_fragment_manager_factory( fragment_publisher_factory, fragment_upload_fault_injector.as_ref().map(Arc::clone), @@ -1164,6 +1180,7 @@ pub struct LogServer { config: LogServerConfig, open_logs: Arc>, dirty_log: Option>, + #[cfg_attr(not(feature = "faults"), allow(unused))] faults: Arc, rolling_up_s3: tokio::sync::Mutex<()>, rolling_up_repl: tokio::sync::Mutex<()>, @@ -3229,18 +3246,35 @@ impl LogServerWrapper { let background_server = Arc::clone(&wrapper.log_server); let background = tokio::task::spawn(async move { background_server.background_task().await }); - let server = Server::builder() - .max_concurrent_streams(Some(max_concurrent_streams)) - .layer(chroma_tracing::GrpcServerTraceLayer) - .add_service(health_service) - .add_service(FaultInjectionServiceServer::from_arc( - wrapper.log_server.faults.clone(), - )) - .add_service( - chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) - .max_decoding_message_size(max_decoding_message_size) - .max_encoding_message_size(max_encoding_message_size), - ); + #[cfg(feature = "faults")] + let server = { + tracing::info!("fault injection enabled"); + Server::builder() + .max_concurrent_streams(Some(max_concurrent_streams)) + .layer(chroma_tracing::GrpcServerTraceLayer) + .add_service(health_service) + .add_service(FaultInjectionServiceServer::from_arc( + wrapper.log_server.faults.clone(), + )) + .add_service( + chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) + .max_decoding_message_size(max_decoding_message_size) + .max_encoding_message_size(max_encoding_message_size), + ) + }; + #[cfg(not(feature = "faults"))] + let server = { + tracing::info!("fault injection not enabled"); + Server::builder() + .max_concurrent_streams(Some(max_concurrent_streams)) + .layer(chroma_tracing::GrpcServerTraceLayer) + .add_service(health_service) + .add_service( + chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) + .max_decoding_message_size(max_decoding_message_size) + .max_encoding_message_size(max_encoding_message_size), + ) + }; let server = server.serve_with_shutdown(addr, async { let mut sigterm = match signal(SignalKind::terminate()) { @@ -3814,6 +3848,31 @@ mod tests { assert!(STABLE_PREFIX.is_valid()); } + #[cfg(feature = "faults")] + #[test] + fn fragment_upload_fault_injection_targets_hard_coded_replicas() { + let faults = Arc::new(FaultRegistry::new()); + let injector = LogServiceFragmentUploadFaultInjector::new(Arc::clone(&faults)); + + faults.inject( + chroma_faults::FaultSelectorKind::Label( + fragment_upload_replica_fault_label(1) + .expect("replica 1 label should exist") + .to_string(), + ), + chroma_faults::FaultActionKind::Unavailable, + ); + + assert_eq!(injector.fault_for_upload(), None); + assert_eq!(injector.fault_for_replica_upload(0), None); + assert_eq!( + injector.fault_for_replica_upload(1), + Some(FragmentUploadFault::Unavailable) + ); + assert_eq!(injector.fault_for_replica_upload(2), None); + assert_eq!(injector.fault_for_replica_upload(3), None); + } + #[test] fn dirty_marker_coalesce1() { // Test that a single collection gets coalesced to nothing. @@ -5430,6 +5489,7 @@ mod tests { let spanner = Arc::new(topology_config.config.spanner.clone()); let config = log_server.config.clone(); let repl_options = topology_config.config.repl.clone(); + let fragment_upload_fault_injector = log_server.fragment_upload_fault_injector(); Box::pin(async move { let (fragment_publisher_factory, manifest_publisher_factory) = create_repl_factories( config.writer.clone(), @@ -5440,6 +5500,8 @@ mod tests { region_names, collection_id.0, ); + let fragment_publisher_factory = fragment_publisher_factory + .with_fault_injector(fragment_upload_fault_injector.as_ref().map(Arc::clone)); let gc = wal3::GarbageCollector::< FragmentUuid, ReplicatedFragmentManagerFactory, diff --git a/rust/tracing/src/init_tracer.rs b/rust/tracing/src/init_tracer.rs index fd5677cdb69..a277c9b6806 100644 --- a/rust/tracing/src/init_tracer.rs +++ b/rust/tracing/src/init_tracer.rs @@ -78,6 +78,7 @@ pub fn init_global_filter_layer( "chroma_cache", "chroma_distance", "chroma_error", + "chroma_fault", "chroma_frontend", "chroma_index", "chroma_log", diff --git a/rust/wal3/src/interfaces/mod.rs b/rust/wal3/src/interfaces/mod.rs index eeeffb698ad..c79c55fe87f 100644 --- a/rust/wal3/src/interfaces/mod.rs +++ b/rust/wal3/src/interfaces/mod.rs @@ -90,6 +90,19 @@ pub trait FragmentManagerFactory { /// The label used by downstream services to target wal3 fragment uploads with fault injection. pub const FRAGMENT_UPLOAD_FAULT_LABEL: &str = "wal3.fragment_upload"; +/// The hard-coded fault labels used to target replicated wal3 fragment uploads by replica index. +pub const FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS: [&str; 3] = [ + "wal3.fragment_upload.0", + "wal3.fragment_upload.1", + "wal3.fragment_upload.2", +]; + +/// Returns the fault label for a specific replicated wal3 fragment upload replica index. +pub fn fragment_upload_replica_fault_label(replica_idx: usize) -> Option<&'static str> { + FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS + .get(replica_idx) + .copied() +} /// Faults that can be injected immediately before a fragment upload begins. #[derive(Clone, Debug, PartialEq, Eq)] @@ -101,6 +114,10 @@ pub enum FragmentUploadFault { /// Supplies optional upload faults to wal3 without coupling the crate to a specific registry. pub trait FragmentUploadFaultInjector: Send + Sync + 'static { fn fault_for_upload(&self) -> Option; + + fn fault_for_replica_upload(&self, _replica_idx: usize) -> Option { + None + } } impl FragmentUploadFaultInjector for () { @@ -173,15 +190,27 @@ impl> FragmentUploader cmek: Option, epoch_micros: u64, ) -> Result { + let fragment_identifier = pointer.identifier(); match self .fault_injector .as_ref() .and_then(|fault_injector| fault_injector.fault_for_upload()) { Some(FragmentUploadFault::Delay(delay)) => { + tracing::warn!( + fault_label = FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_identifier = %fragment_identifier, + delay_seconds = delay.as_secs_f64(), + "injecting wal3 upload delay fault" + ); tokio::time::sleep(delay).await; } Some(FragmentUploadFault::Unavailable) => { + tracing::warn!( + fault_label = FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_identifier = %fragment_identifier, + "injecting wal3 upload unavailable fault" + ); return Err(Error::TonicError(tonic::Status::unavailable(format!( "fault injected for {}", FRAGMENT_UPLOAD_FAULT_LABEL @@ -1059,6 +1088,23 @@ mod tests { #[derive(Clone)] struct InjectUnavailable; + #[test] + fn fragment_upload_replica_fault_labels_are_hard_coded() { + assert_eq!( + fragment_upload_replica_fault_label(0), + Some("wal3.fragment_upload.0") + ); + assert_eq!( + fragment_upload_replica_fault_label(1), + Some("wal3.fragment_upload.1") + ); + assert_eq!( + fragment_upload_replica_fault_label(2), + Some("wal3.fragment_upload.2") + ); + assert_eq!(fragment_upload_replica_fault_label(3), None); + } + impl FragmentUploadFaultInjector for InjectUnavailable { fn fault_for_upload(&self) -> Option { Some(FragmentUploadFault::Unavailable) diff --git a/rust/wal3/src/interfaces/repl/fragment_manager.rs b/rust/wal3/src/interfaces/repl/fragment_manager.rs index 5b52d27d27d..360569b1b75 100644 --- a/rust/wal3/src/interfaces/repl/fragment_manager.rs +++ b/rust/wal3/src/interfaces/repl/fragment_manager.rs @@ -10,7 +10,10 @@ use chroma_storage::{PutOptions, Storage, StorageError}; use chroma_types::Cmek; use crate::interfaces::batch_manager::upload_parquet; -use crate::interfaces::{FragmentConsumer, FragmentUploader, UploadResult}; +use crate::interfaces::{ + fragment_upload_replica_fault_label, FragmentConsumer, FragmentUploadFault, + FragmentUploadFaultInjector, FragmentUploader, UploadResult, +}; use crate::{Error, Fragment, FragmentIdentifier, FragmentUuid, LogPosition, LogWriterOptions}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -119,6 +122,7 @@ pub struct ReplicatedFragmentUploader { preferred: usize, storages: Arc>, bookkeeping: Arc>, + fault_injector: Option>, } impl ReplicatedFragmentUploader { @@ -140,9 +144,18 @@ impl ReplicatedFragmentUploader { preferred, storages, bookkeeping, + fault_injector: None, } } + pub fn with_fault_injector( + mut self, + fault_injector: Option>, + ) -> Self { + self.fault_injector = fault_injector; + self + } + fn compute_mask(&self) -> Result, Error> { // SAFETY(rescrv): Mutex poisoning. let mut bookkeeping = self.bookkeeping.lock().unwrap(); @@ -288,12 +301,44 @@ impl FragmentUploader for ReplicatedFragmentUploader { if should_try { let options = self.writer.clone(); let prefix = storage.prefix.clone(); + let region = storage.region.clone(); let storage = storage.storage.clone(); let fragment_identifier = (*pointer).into(); let log_position = None; let messages = messages.clone(); let cmek = cmek.clone(); + let fault_injector = self.fault_injector.as_ref().map(Arc::clone); futures.push(async move { + if let Some(fault) = fault_injector + .as_ref() + .and_then(|fault_injector| fault_injector.fault_for_replica_upload(idx)) + { + let fault_label = fragment_upload_replica_fault_label(idx) + .unwrap_or("wal3.fragment_upload."); + match fault { + FragmentUploadFault::Delay(delay) => { + tracing::warn!( + fault_label, + replica_idx = idx, + region = %region, + delay_seconds = delay.as_secs_f64(), + "injecting wal3 replicated upload delay fault" + ); + tokio::time::sleep(delay).await; + } + FragmentUploadFault::Unavailable => { + tracing::warn!( + fault_label, + replica_idx = idx, + region = %region, + "injecting wal3 replicated upload unavailable fault" + ); + return Err(Error::TonicError(tonic::Status::unavailable( + format!("fault injected for {fault_label}"), + ))); + } + } + } upload_parquet( &options, &storage, @@ -410,6 +455,17 @@ impl FragmentReader { return; } + let missing_regions = missing_storage_indices + .iter() + .map(|idx| self.storages[*idx].region.as_str()) + .collect::>(); + tracing::warn!( + path, + missing_regions = ?missing_regions, + missing_replica_count = missing_regions.len(), + "read repair triggered" + ); + for idx in missing_storage_indices { // Try to acquire a permit without blocking. If unavailable, skip this repair. let permit = match self.read_repair_semaphore.clone().try_acquire_owned() { @@ -1474,8 +1530,16 @@ mod tests { } } + fn make_storage_wrapper_with_region( + storage: chroma_storage::Storage, + prefix: &str, + region: &str, + ) -> StorageWrapper { + StorageWrapper::new(region.to_string(), storage, prefix.to_string()) + } + fn make_storage_wrapper(storage: chroma_storage::Storage, prefix: &str) -> StorageWrapper { - StorageWrapper::new("test-region".to_string(), storage, prefix.to_string()) + make_storage_wrapper_with_region(storage, prefix, "test-region") } // Single replica successfully uploads. @@ -1515,8 +1579,8 @@ mod tests { async fn test_k8s_mcmr_integration_replicated_uploader_two_replicas_both_succeed() { let storage1 = s3_client_for_test_with_new_bucket().await; let storage2 = s3_client_for_test_with_new_bucket().await; - let wrapper1 = make_storage_wrapper(storage1, "prefix1"); - let wrapper2 = make_storage_wrapper(storage2, "prefix2"); + let wrapper1 = make_storage_wrapper_with_region(storage1, "prefix1", "region-0"); + let wrapper2 = make_storage_wrapper_with_region(storage2, "prefix2", "region-1"); let storages = Arc::new(vec![wrapper1, wrapper2]); let uploader = ReplicatedFragmentUploader::new( make_test_options(2), diff --git a/rust/wal3/src/interfaces/repl/mod.rs b/rust/wal3/src/interfaces/repl/mod.rs index 211c8376335..921e9c423c2 100644 --- a/rust/wal3/src/interfaces/repl/mod.rs +++ b/rust/wal3/src/interfaces/repl/mod.rs @@ -12,7 +12,7 @@ mod manifest_manager; use crate::{Error, FragmentUuid, LogWriterOptions, Manifest}; use super::batch_manager::BatchManager; -use super::{FragmentManagerFactory, ManifestManagerFactory}; +use super::{FragmentManagerFactory, FragmentUploadFaultInjector, ManifestManagerFactory}; pub use fragment_manager::{FragmentReader, ReplicatedFragmentUploader}; pub use fragment_manager::{ReplicatedFragmentOptions, StorageWrapper}; @@ -40,6 +40,7 @@ pub fn create_repl_factories( preferred, storages, read_repair_semaphore, + fault_injector: None, }; let local_region = regions[preferred].clone(); let manifest_manager_factory = ReplicatedManifestManagerFactory { @@ -58,6 +59,7 @@ pub struct ReplicatedFragmentManagerFactory { preferred: usize, storages: Arc>, read_repair_semaphore: Arc, + fault_injector: Option>, } impl ReplicatedFragmentManagerFactory { @@ -76,9 +78,18 @@ impl ReplicatedFragmentManagerFactory { preferred, storages, read_repair_semaphore, + fault_injector: None, } } + pub fn with_fault_injector( + mut self, + fault_injector: Option>, + ) -> Self { + self.fault_injector = fault_injector; + self + } + fn build_fragment_uploader(&self) -> ReplicatedFragmentUploader { ReplicatedFragmentUploader::new( self.repl.clone(), @@ -86,6 +97,7 @@ impl ReplicatedFragmentManagerFactory { self.preferred, Arc::clone(&self.storages), ) + .with_fault_injector(self.fault_injector.as_ref().map(Arc::clone)) } } diff --git a/rust/wal3/src/lib.rs b/rust/wal3/src/lib.rs index 2549bbade5c..12c927067d7 100644 --- a/rust/wal3/src/lib.rs +++ b/rust/wal3/src/lib.rs @@ -34,10 +34,11 @@ pub use interfaces::s3::{ S3FragmentPuller, S3FragmentUploader, S3ManifestManagerFactory, }; pub use interfaces::{ - BatchManager, FaultInjectingFragmentManagerFactory, FragmentConsumer, FragmentManagerFactory, - FragmentPointer, FragmentPublisher, FragmentUploadFault, FragmentUploadFaultInjector, - FragmentUploader, ManifestConsumer, ManifestManagerFactory, ManifestPublisher, ManifestWitness, - PositionWitness, FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_upload_replica_fault_label, BatchManager, FaultInjectingFragmentManagerFactory, + FragmentConsumer, FragmentManagerFactory, FragmentPointer, FragmentPublisher, + FragmentUploadFault, FragmentUploadFaultInjector, FragmentUploader, ManifestConsumer, + ManifestManagerFactory, ManifestPublisher, ManifestWitness, PositionWitness, + FRAGMENT_UPLOAD_FAULT_LABEL, FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS, }; pub use manifest::{ unprefixed_snapshot_path, Manifest, ManifestAndWitness, ManifestBounds, diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 56af2af74fa..7e8461f7fa4 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -280,8 +280,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage" diff --git a/rust/worker/chroma_mcmr.yaml b/rust/worker/chroma_mcmr.yaml index 6f0f7513e37..55367422dc0 100644 --- a/rust/worker/chroma_mcmr.yaml +++ b/rust/worker/chroma_mcmr.yaml @@ -326,8 +326,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage" diff --git a/rust/worker/chroma_mcmr2.yaml b/rust/worker/chroma_mcmr2.yaml index 4db5c1ca126..4c656149a17 100644 --- a/rust/worker/chroma_mcmr2.yaml +++ b/rust/worker/chroma_mcmr2.yaml @@ -327,8 +327,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage2"