diff --git a/.github/actions/tilt-setup-prebuild/docker-bake.hcl b/.github/actions/tilt-setup-prebuild/docker-bake.hcl index e71a433c9e6..d1fa6c1f332 100644 --- a/.github/actions/tilt-setup-prebuild/docker-bake.hcl +++ b/.github/actions/tilt-setup-prebuild/docker-bake.hcl @@ -1,6 +1,9 @@ target "rust-log-service" { dockerfile = "rust/Dockerfile" target = "log_service" + args = { + LOG_SERVICE_CARGO_FEATURES = "faults" + } tags = [ "rust-log-service:ci" ] } diff --git a/Cargo.lock b/Cargo.lock index e06a5df0af8..193afff23f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1749,9 +1749,11 @@ dependencies = [ "async-trait", "chroma-config", "chroma-types", + "clap", "parking_lot", "tokio", "tonic 0.14.5", + "tracing", "uuid", ] diff --git a/Tiltfile b/Tiltfile index 93d0bbecfc9..ea54110caec 100644 --- a/Tiltfile +++ b/Tiltfile @@ -30,7 +30,8 @@ else: '.', only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"], dockerfile='./rust/Dockerfile', - target='log_service' + target='log_service', + build_args={'LOG_SERVICE_CARGO_FEATURES': 'faults'} ) if config.tilt_subcommand == "ci": @@ -320,7 +321,7 @@ k8s_resource('rust-sysdb-service:deployment:chroma', resource_deps = ['k8s_setup k8s_resource('rust-frontend-service:deployment:chroma', resource_deps=['sysdb:deployment:chroma', 'rust-log-service:statefulset:chroma'], labels=["chroma"], port_forwards='8000:8000') k8s_resource('query-service:statefulset:chroma', resource_deps=['sysdb:deployment:chroma'], labels=["chroma"], port_forwards='50053:50051') k8s_resource('compaction-service:statefulset:chroma', resource_deps=['sysdb:deployment:chroma'], labels=["chroma"], port_forwards="50057:50051") -k8s_resource('garbage-collector:statefulset:chroma', resource_deps=['k8s_setup', 'minio-deployment'], labels=["chroma"], port_forwards='50055:50055') +k8s_resource('garbage-collector:statefulset:chroma', resource_deps=['k8s_setup', 'minio-deployment', 'rust-log-service:statefulset:chroma'], labels=["chroma"], port_forwards='50055:50055') # Production Chroma 2 k8s_resource('postgres:deployment:chroma2', resource_deps=['k8s_setup2', 'postgres:deployment:chroma'], labels=["infrastructure2"], port_forwards='6432:5432') @@ -333,7 +334,7 @@ k8s_resource('rust-sysdb-service:deployment:chroma2', resource_deps=['k8s_setup2 k8s_resource('rust-frontend-service:deployment:chroma2', resource_deps=['sysdb:deployment:chroma2', 'rust-log-service:statefulset:chroma2', 'rust-frontend-service:deployment:chroma'], labels=["chroma2"], port_forwards='8001:8000') k8s_resource('query-service:statefulset:chroma2', resource_deps=['sysdb:deployment:chroma2', 'query-service:statefulset:chroma'], labels=["chroma2"], port_forwards='60053:50051') k8s_resource('compaction-service:statefulset:chroma2', resource_deps=['sysdb:deployment:chroma2', 'compaction-service:statefulset:chroma'], labels=["chroma2"]) -k8s_resource('garbage-collector:statefulset:chroma2', resource_deps=['k8s_setup2', 'minio-deployment', 'garbage-collector:statefulset:chroma'], labels=["chroma2"], port_forwards='60055:50055') +k8s_resource('garbage-collector:statefulset:chroma2', resource_deps=['k8s_setup2', 'minio-deployment', 'rust-log-service:statefulset:chroma2', 'garbage-collector:statefulset:chroma'], labels=["chroma2"], port_forwards='60055:50055') # Observability k8s_resource('jaeger', resource_deps=['k8s_setup'], labels=["observability"]) diff --git a/rust/Dockerfile b/rust/Dockerfile index 79d85208b7d..797c1e446ba 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -3,6 +3,7 @@ FROM rust:1.92.0 AS builder ARG RELEASE_MODE= ARG PROTOC_VERSION=31.1 ARG ENABLE_AVX512= +ARG LOG_SERVICE_CARGO_FEATURES= # ADDRESS_SANITIZER is an optional build argument to enable Address Sanitizer. ARG ADDRESS_SANITIZER @@ -57,7 +58,11 @@ RUN --mount=type=cache,sharing=locked,target=/chroma/target/ \ echo "Building without AVX512 optimizations"; \ fi && \ build_target=$( [ "${ADDRESS_SANITIZER}" = "1" ] && echo '--target x86_64-unknown-linux-gnu' || echo '' ) && \ - if [ "$RELEASE_MODE" = "1" ]; then cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES) --release; else cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES); fi && \ + release_flag=$( [ "$RELEASE_MODE" = "1" ] && echo '--release' || echo '' ) && \ + cargo build ${build_target} --workspace $(printf -- '--exclude %s ' $EXCLUDED_PACKAGES) ${release_flag} && \ + if [ -n "$LOG_SERVICE_CARGO_FEATURES" ]; then \ + cargo build ${build_target} -p chroma-log-service --bin log_service --features "$LOG_SERVICE_CARGO_FEATURES" ${release_flag}; \ + fi && \ build_dir=$( [ "$RELEASE_MODE" = "1" ] && echo release || echo debug ) && \ build_dir=$( [ "${ADDRESS_SANITIZER}" = "1" ] && echo "x86_64-unknown-linux-gnu/${build_dir}" || echo "${build_dir}" ) && \ for bin in chroma garbage_collector_service chroma-load log_service heap_tender_service query_service compaction_service sysdb_service spanner_migration; do \ diff --git a/rust/faults/Cargo.toml b/rust/faults/Cargo.toml index 41095e4585f..fabc0e8afb5 100644 --- a/rust/faults/Cargo.toml +++ b/rust/faults/Cargo.toml @@ -5,12 +5,12 @@ edition = "2021" [dependencies] async-trait = { workspace = true } +clap = { workspace = true } parking_lot = { workspace = true } +tokio = { workspace = true } tonic = { workspace = true } +tracing = { workspace = true } uuid = { workspace = true } chroma-config = { workspace = true } chroma-types = { workspace = true } - -[dev-dependencies] -tokio = { workspace = true } diff --git a/rust/faults/src/bin/chroma-fault.rs b/rust/faults/src/bin/chroma-fault.rs new file mode 100644 index 00000000000..fa50f60da29 --- /dev/null +++ b/rust/faults/src/bin/chroma-fault.rs @@ -0,0 +1,347 @@ +use std::error::Error; + +use chroma_types::chroma_proto::fault_action::Act; +use chroma_types::chroma_proto::fault_injection_service_client::FaultInjectionServiceClient; +use chroma_types::chroma_proto::fault_selector::By; +use chroma_types::chroma_proto::{ + ActionDelay, ActionUnavailable, ClearFaultsRequest, FaultAction, FaultEntry, FaultSelector, + InjectFaultsRequest, ListFaultsRequest, SelectFileLine, SelectLabel, +}; +use clap::{ArgGroup, Args, Parser, Subcommand, ValueEnum}; +use tonic::transport::Channel; + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +enum TiltInstance { + Chroma, + Chroma2, +} + +impl TiltInstance { + fn default_addr(self) -> &'static str { + match self { + TiltInstance::Chroma => "http://127.0.0.1:50054", + TiltInstance::Chroma2 => "http://127.0.0.1:60054", + } + } +} + +#[derive(Debug, Parser)] +#[command( + name = "chroma-fault", + about = "Inject, inspect, and clear faults against Tilt's rust-log-service." +)] +struct Cli { + #[arg( + long, + global = true, + help = "Fault injection service address. Defaults to the selected Tilt instance." + )] + addr: Option, + + #[arg( + long, + global = true, + value_enum, + default_value_t = TiltInstance::Chroma, + help = "Tilt instance to target when --addr is omitted." + )] + tilt_instance: TiltInstance, + + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Subcommand)] +enum Command { + /// Inject a new fault + Inject(InjectArgs), + /// List all injected faults + List, + /// Clear one fault selector or all faults + Clear(ClearArgs), +} + +#[derive(Debug, Args)] +struct InjectArgs { + #[command(flatten)] + selector: SelectorArgs, + + #[command(flatten)] + action: ActionArgs, +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("selector") + .required(true) + .multiple(false) + .args(["label", "file"]) +))] +struct SelectorArgs { + #[arg(long, help = "Match a named fault label.")] + label: Option, + + #[arg(long, requires = "line", help = "Match a specific source file.")] + file: Option, + + #[arg(long, requires = "file", help = "Match a specific source line.")] + line: Option, +} + +impl SelectorArgs { + fn to_proto(&self) -> FaultSelector { + match (&self.label, &self.file, self.line) { + (Some(label), None, None) => FaultSelector { + by: Some(By::Label(SelectLabel { + label: label.clone(), + })), + }, + (None, Some(file), Some(line)) => FaultSelector { + by: Some(By::FileLine(SelectFileLine { + file: file.clone(), + line, + })), + }, + _ => unreachable!("clap guarantees selector validation"), + } + } +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("action") + .required(true) + .multiple(false) + .args(["unavailable", "delay_seconds"]) +))] +struct ActionArgs { + #[arg(long, help = "Return UNAVAILABLE for matching requests.")] + unavailable: bool, + + #[arg(long, help = "Delay matching requests by this many seconds.")] + delay_seconds: Option, +} + +impl ActionArgs { + fn to_proto(&self) -> FaultAction { + let act = if self.unavailable { + Act::Unavailable(ActionUnavailable {}) + } else if let Some(delay_seconds) = self.delay_seconds { + Act::Delay(ActionDelay { delay_seconds }) + } else { + unreachable!("clap guarantees action validation") + }; + FaultAction { act: Some(act) } + } +} + +#[derive(Debug, Args)] +#[command(group( + ArgGroup::new("clear_target") + .required(true) + .multiple(false) + .args(["all", "label", "file"]) +))] +struct ClearArgs { + #[arg(long, help = "Clear every injected fault.")] + all: bool, + + #[arg(long, help = "Clear faults matching this label.")] + label: Option, + + #[arg( + long, + requires = "line", + help = "Clear faults matching this source file." + )] + file: Option, + + #[arg( + long, + requires = "file", + help = "Clear faults matching this source line." + )] + line: Option, +} + +impl ClearArgs { + fn selector(&self) -> Option { + if self.all { + return None; + } + + Some(match (&self.label, &self.file, self.line) { + (Some(label), None, None) => FaultSelector { + by: Some(By::Label(SelectLabel { + label: label.clone(), + })), + }, + (None, Some(file), Some(line)) => FaultSelector { + by: Some(By::FileLine(SelectFileLine { + file: file.clone(), + line, + })), + }, + _ => unreachable!("clap guarantees clear target validation"), + }) + } +} + +fn resolved_addr(cli: &Cli) -> String { + cli.addr + .clone() + .unwrap_or_else(|| cli.tilt_instance.default_addr().to_string()) +} + +fn format_selector(selector: &FaultSelector) -> String { + match selector.by.as_ref() { + Some(By::Label(SelectLabel { label })) => format!("label({label})"), + Some(By::FileLine(SelectFileLine { file, line })) => format!("file({file}:{line})"), + None => "".to_string(), + } +} + +fn format_action(action: &FaultAction) -> String { + match action.act.as_ref() { + Some(Act::Unavailable(_)) => "unavailable".to_string(), + Some(Act::Delay(ActionDelay { delay_seconds })) => format!("delay({delay_seconds}s)"), + None => "".to_string(), + } +} + +fn format_entry(entry: &FaultEntry) -> String { + let selector = entry + .selector + .as_ref() + .map(format_selector) + .unwrap_or_else(|| "".to_string()); + let action = entry + .action + .as_ref() + .map(format_action) + .unwrap_or_else(|| "".to_string()); + format!("{selector} -> {action}") +} + +async fn connect(addr: &str) -> Result, Box> { + let channel = Channel::from_shared(addr.to_string())?.connect().await?; + Ok(FaultInjectionServiceClient::new(channel)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + let addr = resolved_addr(&cli); + let mut client = connect(&addr).await?; + + match cli.command { + Command::Inject(args) => { + let selector = args.selector.to_proto(); + let action = args.action.to_proto(); + client + .inject_faults(InjectFaultsRequest { + selector: Some(selector.clone()), + action: Some(action), + }) + .await?; + println!( + "Injected {} on {}", + format_action(&action), + format_selector(&selector) + ); + } + Command::List => { + let response = client.list_faults(ListFaultsRequest {}).await?.into_inner(); + if response.faults.is_empty() { + println!("No faults configured."); + } else { + for (idx, fault) in response.faults.iter().enumerate() { + println!("{}. {}", idx + 1, format_entry(fault)); + } + } + } + Command::Clear(args) => { + let response = client + .clear_faults(ClearFaultsRequest { + id: None, + selector: args.selector(), + }) + .await? + .into_inner(); + println!("Cleared {} faults.", response.cleared_count); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tilt_default_addresses_match_tiltfile_port_forwards() { + assert_eq!( + TiltInstance::Chroma.default_addr(), + "http://127.0.0.1:50054" + ); + assert_eq!( + TiltInstance::Chroma2.default_addr(), + "http://127.0.0.1:60054" + ); + } + + #[test] + fn explicit_address_overrides_tilt_instance_default() { + let cli = Cli { + addr: Some("http://localhost:7000".to_string()), + tilt_instance: TiltInstance::Chroma2, + command: Command::List, + }; + + assert_eq!(resolved_addr(&cli), "http://localhost:7000"); + } + + #[test] + fn selector_args_build_label_selector() { + let selector = SelectorArgs { + label: Some("fragment-upload".to_string()), + file: None, + line: None, + } + .to_proto(); + + assert_eq!( + selector.by, + Some(By::Label(SelectLabel { + label: "fragment-upload".to_string(), + })) + ); + } + + #[test] + fn action_args_build_delay_action() { + let action = ActionArgs { + unavailable: false, + delay_seconds: Some(7), + } + .to_proto(); + + assert_eq!( + action.act, + Some(Act::Delay(ActionDelay { delay_seconds: 7 })) + ); + } + + #[test] + fn clear_all_omits_selector() { + let clear = ClearArgs { + all: true, + label: None, + file: None, + line: None, + }; + + assert_eq!(clear.selector(), None); + } +} diff --git a/rust/faults/src/lib.rs b/rust/faults/src/lib.rs index cf5b194c783..382e0b5e1a0 100644 --- a/rust/faults/src/lib.rs +++ b/rust/faults/src/lib.rs @@ -211,6 +211,7 @@ fn stored_fault_to_proto(fault: &StoredFault) -> FaultEntry { #[async_trait] impl FaultInjectionService for FaultRegistry { + #[tracing::instrument(skip(self, request))] async fn inject_faults( &self, request: Request, @@ -224,10 +225,12 @@ impl FaultInjectionService for FaultRegistry { .action .ok_or_else(|| invalid_argument("inject_faults requires an action")) .and_then(action_from_proto)?; + tracing::warn!(selector = ?selector, action = ?action, "inject fault configured"); let id = self.inject(selector, action); Ok(Response::new(InjectFaultsResponse { id: id.to_string() })) } + #[tracing::instrument(skip(self, _request))] async fn list_faults( &self, _request: Request, @@ -240,6 +243,7 @@ impl FaultInjectionService for FaultRegistry { Ok(Response::new(ListFaultsResponse { faults })) } + #[tracing::instrument(skip(self, request))] async fn clear_faults( &self, request: Request, @@ -248,16 +252,21 @@ impl FaultInjectionService for FaultRegistry { let fault_id: Option = request.id.as_deref().map(TryInto::try_into).transpose()?; let selector: Option = request.selector.map(TryInto::try_into).transpose()?; - let cleared_count = match (fault_id, selector) { + let cleared_count = match (&fault_id, &selector) { (Some(_), Some(_)) => { return Err(invalid_argument( "clear_faults requires exactly one of id or selector, not both", )); } - (Some(id), None) => self.clear_id(Some(&id)), - (None, Some(ref sel)) => self.clear_selector(sel), + (Some(id), None) => self.clear_id(Some(id)), + (None, Some(sel)) => self.clear_selector(sel), (None, None) => self.clear_all(), }; + tracing::warn!( + selector = ?selector, + cleared_count, + "inject fault cleared" + ); Ok(Response::new(ClearFaultsResponse { cleared_count: cleared_count as u64, })) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 643e665d614..3f24956b76f 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -20,8 +20,9 @@ use chroma_storage::config::StorageConfig; use chroma_storage::Storage; use chroma_tracing::OtelFilter; use chroma_tracing::OtelFilterLevel; +#[cfg(feature = "faults")] +use chroma_types::chroma_proto::fault_injection_service_server::FaultInjectionServiceServer; use chroma_types::chroma_proto::{ - fault_injection_service_server::FaultInjectionServiceServer, garbage_collect_phase2_request::LogToCollect, log_service_server::LogService, purge_from_cache_request::EntryToEvict, CollectionInfo, GarbageCollectPhase2Request, GarbageCollectPhase2Response, GetAllCollectionInfoToCompactRequest, @@ -65,7 +66,8 @@ use wal3::{ }; #[cfg(feature = "faults")] use wal3::{ - FaultInjectingFragmentManagerFactory, FragmentUploadFault, FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_upload_replica_fault_label, FaultInjectingFragmentManagerFactory, FragmentUploadFault, + FRAGMENT_UPLOAD_FAULT_LABEL, }; mod scrub; @@ -145,6 +147,16 @@ impl FragmentUploadFaultInjector for LogServiceFragmentUploadFaultInjector { chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay), }) } + + fn fault_for_replica_upload(&self, replica_idx: usize) -> Option { + let label = fragment_upload_replica_fault_label(replica_idx)?; + self.faults + .action_for_label(label) + .map(|action| match action { + chroma_faults::FaultActionKind::Unavailable => FragmentUploadFault::Unavailable, + chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay), + }) + } } #[cfg(feature = "faults")] @@ -442,6 +454,8 @@ impl<'a> FactoryCreationContext<'a> { region_names, self.collection_id.0, ); + let fragment_factory = fragment_factory + .with_fault_injector(self.fragment_upload_fault_injector.as_ref().map(Arc::clone)); let fragment_factory = maybe_wrap_fragment_manager_factory( fragment_factory, self.fragment_upload_fault_injector.as_ref().map(Arc::clone), @@ -703,6 +717,8 @@ async fn get_log_from_handle_with_mutex_held<'a>( region_names, collection_id.0, ); + let fragment_publisher_factory = fragment_publisher_factory + .with_fault_injector(fragment_upload_fault_injector.as_ref().map(Arc::clone)); let fragment_publisher_factory = maybe_wrap_fragment_manager_factory( fragment_publisher_factory, fragment_upload_fault_injector.as_ref().map(Arc::clone), @@ -1164,6 +1180,7 @@ pub struct LogServer { config: LogServerConfig, open_logs: Arc>, dirty_log: Option>, + #[cfg_attr(not(feature = "faults"), allow(unused))] faults: Arc, rolling_up_s3: tokio::sync::Mutex<()>, rolling_up_repl: tokio::sync::Mutex<()>, @@ -3229,18 +3246,35 @@ impl LogServerWrapper { let background_server = Arc::clone(&wrapper.log_server); let background = tokio::task::spawn(async move { background_server.background_task().await }); - let server = Server::builder() - .max_concurrent_streams(Some(max_concurrent_streams)) - .layer(chroma_tracing::GrpcServerTraceLayer) - .add_service(health_service) - .add_service(FaultInjectionServiceServer::from_arc( - wrapper.log_server.faults.clone(), - )) - .add_service( - chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) - .max_decoding_message_size(max_decoding_message_size) - .max_encoding_message_size(max_encoding_message_size), - ); + #[cfg(feature = "faults")] + let server = { + tracing::info!("fault injection enabled"); + Server::builder() + .max_concurrent_streams(Some(max_concurrent_streams)) + .layer(chroma_tracing::GrpcServerTraceLayer) + .add_service(health_service) + .add_service(FaultInjectionServiceServer::from_arc( + wrapper.log_server.faults.clone(), + )) + .add_service( + chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) + .max_decoding_message_size(max_decoding_message_size) + .max_encoding_message_size(max_encoding_message_size), + ) + }; + #[cfg(not(feature = "faults"))] + let server = { + tracing::info!("fault injection not enabled"); + Server::builder() + .max_concurrent_streams(Some(max_concurrent_streams)) + .layer(chroma_tracing::GrpcServerTraceLayer) + .add_service(health_service) + .add_service( + chroma_types::chroma_proto::log_service_server::LogServiceServer::new(wrapper) + .max_decoding_message_size(max_decoding_message_size) + .max_encoding_message_size(max_encoding_message_size), + ) + }; let server = server.serve_with_shutdown(addr, async { let mut sigterm = match signal(SignalKind::terminate()) { @@ -3814,6 +3848,31 @@ mod tests { assert!(STABLE_PREFIX.is_valid()); } + #[cfg(feature = "faults")] + #[test] + fn fragment_upload_fault_injection_targets_hard_coded_replicas() { + let faults = Arc::new(FaultRegistry::new()); + let injector = LogServiceFragmentUploadFaultInjector::new(Arc::clone(&faults)); + + faults.inject( + chroma_faults::FaultSelectorKind::Label( + fragment_upload_replica_fault_label(1) + .expect("replica 1 label should exist") + .to_string(), + ), + chroma_faults::FaultActionKind::Unavailable, + ); + + assert_eq!(injector.fault_for_upload(), None); + assert_eq!(injector.fault_for_replica_upload(0), None); + assert_eq!( + injector.fault_for_replica_upload(1), + Some(FragmentUploadFault::Unavailable) + ); + assert_eq!(injector.fault_for_replica_upload(2), None); + assert_eq!(injector.fault_for_replica_upload(3), None); + } + #[test] fn dirty_marker_coalesce1() { // Test that a single collection gets coalesced to nothing. @@ -5430,6 +5489,7 @@ mod tests { let spanner = Arc::new(topology_config.config.spanner.clone()); let config = log_server.config.clone(); let repl_options = topology_config.config.repl.clone(); + let fragment_upload_fault_injector = log_server.fragment_upload_fault_injector(); Box::pin(async move { let (fragment_publisher_factory, manifest_publisher_factory) = create_repl_factories( config.writer.clone(), @@ -5440,6 +5500,8 @@ mod tests { region_names, collection_id.0, ); + let fragment_publisher_factory = fragment_publisher_factory + .with_fault_injector(fragment_upload_fault_injector.as_ref().map(Arc::clone)); let gc = wal3::GarbageCollector::< FragmentUuid, ReplicatedFragmentManagerFactory, diff --git a/rust/tracing/src/init_tracer.rs b/rust/tracing/src/init_tracer.rs index fd5677cdb69..a277c9b6806 100644 --- a/rust/tracing/src/init_tracer.rs +++ b/rust/tracing/src/init_tracer.rs @@ -78,6 +78,7 @@ pub fn init_global_filter_layer( "chroma_cache", "chroma_distance", "chroma_error", + "chroma_fault", "chroma_frontend", "chroma_index", "chroma_log", diff --git a/rust/wal3/src/interfaces/mod.rs b/rust/wal3/src/interfaces/mod.rs index eeeffb698ad..c79c55fe87f 100644 --- a/rust/wal3/src/interfaces/mod.rs +++ b/rust/wal3/src/interfaces/mod.rs @@ -90,6 +90,19 @@ pub trait FragmentManagerFactory { /// The label used by downstream services to target wal3 fragment uploads with fault injection. pub const FRAGMENT_UPLOAD_FAULT_LABEL: &str = "wal3.fragment_upload"; +/// The hard-coded fault labels used to target replicated wal3 fragment uploads by replica index. +pub const FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS: [&str; 3] = [ + "wal3.fragment_upload.0", + "wal3.fragment_upload.1", + "wal3.fragment_upload.2", +]; + +/// Returns the fault label for a specific replicated wal3 fragment upload replica index. +pub fn fragment_upload_replica_fault_label(replica_idx: usize) -> Option<&'static str> { + FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS + .get(replica_idx) + .copied() +} /// Faults that can be injected immediately before a fragment upload begins. #[derive(Clone, Debug, PartialEq, Eq)] @@ -101,6 +114,10 @@ pub enum FragmentUploadFault { /// Supplies optional upload faults to wal3 without coupling the crate to a specific registry. pub trait FragmentUploadFaultInjector: Send + Sync + 'static { fn fault_for_upload(&self) -> Option; + + fn fault_for_replica_upload(&self, _replica_idx: usize) -> Option { + None + } } impl FragmentUploadFaultInjector for () { @@ -173,15 +190,27 @@ impl> FragmentUploader cmek: Option, epoch_micros: u64, ) -> Result { + let fragment_identifier = pointer.identifier(); match self .fault_injector .as_ref() .and_then(|fault_injector| fault_injector.fault_for_upload()) { Some(FragmentUploadFault::Delay(delay)) => { + tracing::warn!( + fault_label = FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_identifier = %fragment_identifier, + delay_seconds = delay.as_secs_f64(), + "injecting wal3 upload delay fault" + ); tokio::time::sleep(delay).await; } Some(FragmentUploadFault::Unavailable) => { + tracing::warn!( + fault_label = FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_identifier = %fragment_identifier, + "injecting wal3 upload unavailable fault" + ); return Err(Error::TonicError(tonic::Status::unavailable(format!( "fault injected for {}", FRAGMENT_UPLOAD_FAULT_LABEL @@ -1059,6 +1088,23 @@ mod tests { #[derive(Clone)] struct InjectUnavailable; + #[test] + fn fragment_upload_replica_fault_labels_are_hard_coded() { + assert_eq!( + fragment_upload_replica_fault_label(0), + Some("wal3.fragment_upload.0") + ); + assert_eq!( + fragment_upload_replica_fault_label(1), + Some("wal3.fragment_upload.1") + ); + assert_eq!( + fragment_upload_replica_fault_label(2), + Some("wal3.fragment_upload.2") + ); + assert_eq!(fragment_upload_replica_fault_label(3), None); + } + impl FragmentUploadFaultInjector for InjectUnavailable { fn fault_for_upload(&self) -> Option { Some(FragmentUploadFault::Unavailable) diff --git a/rust/wal3/src/interfaces/repl/fragment_manager.rs b/rust/wal3/src/interfaces/repl/fragment_manager.rs index 5b52d27d27d..360569b1b75 100644 --- a/rust/wal3/src/interfaces/repl/fragment_manager.rs +++ b/rust/wal3/src/interfaces/repl/fragment_manager.rs @@ -10,7 +10,10 @@ use chroma_storage::{PutOptions, Storage, StorageError}; use chroma_types::Cmek; use crate::interfaces::batch_manager::upload_parquet; -use crate::interfaces::{FragmentConsumer, FragmentUploader, UploadResult}; +use crate::interfaces::{ + fragment_upload_replica_fault_label, FragmentConsumer, FragmentUploadFault, + FragmentUploadFaultInjector, FragmentUploader, UploadResult, +}; use crate::{Error, Fragment, FragmentIdentifier, FragmentUuid, LogPosition, LogWriterOptions}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -119,6 +122,7 @@ pub struct ReplicatedFragmentUploader { preferred: usize, storages: Arc>, bookkeeping: Arc>, + fault_injector: Option>, } impl ReplicatedFragmentUploader { @@ -140,9 +144,18 @@ impl ReplicatedFragmentUploader { preferred, storages, bookkeeping, + fault_injector: None, } } + pub fn with_fault_injector( + mut self, + fault_injector: Option>, + ) -> Self { + self.fault_injector = fault_injector; + self + } + fn compute_mask(&self) -> Result, Error> { // SAFETY(rescrv): Mutex poisoning. let mut bookkeeping = self.bookkeeping.lock().unwrap(); @@ -288,12 +301,44 @@ impl FragmentUploader for ReplicatedFragmentUploader { if should_try { let options = self.writer.clone(); let prefix = storage.prefix.clone(); + let region = storage.region.clone(); let storage = storage.storage.clone(); let fragment_identifier = (*pointer).into(); let log_position = None; let messages = messages.clone(); let cmek = cmek.clone(); + let fault_injector = self.fault_injector.as_ref().map(Arc::clone); futures.push(async move { + if let Some(fault) = fault_injector + .as_ref() + .and_then(|fault_injector| fault_injector.fault_for_replica_upload(idx)) + { + let fault_label = fragment_upload_replica_fault_label(idx) + .unwrap_or("wal3.fragment_upload."); + match fault { + FragmentUploadFault::Delay(delay) => { + tracing::warn!( + fault_label, + replica_idx = idx, + region = %region, + delay_seconds = delay.as_secs_f64(), + "injecting wal3 replicated upload delay fault" + ); + tokio::time::sleep(delay).await; + } + FragmentUploadFault::Unavailable => { + tracing::warn!( + fault_label, + replica_idx = idx, + region = %region, + "injecting wal3 replicated upload unavailable fault" + ); + return Err(Error::TonicError(tonic::Status::unavailable( + format!("fault injected for {fault_label}"), + ))); + } + } + } upload_parquet( &options, &storage, @@ -410,6 +455,17 @@ impl FragmentReader { return; } + let missing_regions = missing_storage_indices + .iter() + .map(|idx| self.storages[*idx].region.as_str()) + .collect::>(); + tracing::warn!( + path, + missing_regions = ?missing_regions, + missing_replica_count = missing_regions.len(), + "read repair triggered" + ); + for idx in missing_storage_indices { // Try to acquire a permit without blocking. If unavailable, skip this repair. let permit = match self.read_repair_semaphore.clone().try_acquire_owned() { @@ -1474,8 +1530,16 @@ mod tests { } } + fn make_storage_wrapper_with_region( + storage: chroma_storage::Storage, + prefix: &str, + region: &str, + ) -> StorageWrapper { + StorageWrapper::new(region.to_string(), storage, prefix.to_string()) + } + fn make_storage_wrapper(storage: chroma_storage::Storage, prefix: &str) -> StorageWrapper { - StorageWrapper::new("test-region".to_string(), storage, prefix.to_string()) + make_storage_wrapper_with_region(storage, prefix, "test-region") } // Single replica successfully uploads. @@ -1515,8 +1579,8 @@ mod tests { async fn test_k8s_mcmr_integration_replicated_uploader_two_replicas_both_succeed() { let storage1 = s3_client_for_test_with_new_bucket().await; let storage2 = s3_client_for_test_with_new_bucket().await; - let wrapper1 = make_storage_wrapper(storage1, "prefix1"); - let wrapper2 = make_storage_wrapper(storage2, "prefix2"); + let wrapper1 = make_storage_wrapper_with_region(storage1, "prefix1", "region-0"); + let wrapper2 = make_storage_wrapper_with_region(storage2, "prefix2", "region-1"); let storages = Arc::new(vec![wrapper1, wrapper2]); let uploader = ReplicatedFragmentUploader::new( make_test_options(2), diff --git a/rust/wal3/src/interfaces/repl/mod.rs b/rust/wal3/src/interfaces/repl/mod.rs index 211c8376335..921e9c423c2 100644 --- a/rust/wal3/src/interfaces/repl/mod.rs +++ b/rust/wal3/src/interfaces/repl/mod.rs @@ -12,7 +12,7 @@ mod manifest_manager; use crate::{Error, FragmentUuid, LogWriterOptions, Manifest}; use super::batch_manager::BatchManager; -use super::{FragmentManagerFactory, ManifestManagerFactory}; +use super::{FragmentManagerFactory, FragmentUploadFaultInjector, ManifestManagerFactory}; pub use fragment_manager::{FragmentReader, ReplicatedFragmentUploader}; pub use fragment_manager::{ReplicatedFragmentOptions, StorageWrapper}; @@ -40,6 +40,7 @@ pub fn create_repl_factories( preferred, storages, read_repair_semaphore, + fault_injector: None, }; let local_region = regions[preferred].clone(); let manifest_manager_factory = ReplicatedManifestManagerFactory { @@ -58,6 +59,7 @@ pub struct ReplicatedFragmentManagerFactory { preferred: usize, storages: Arc>, read_repair_semaphore: Arc, + fault_injector: Option>, } impl ReplicatedFragmentManagerFactory { @@ -76,9 +78,18 @@ impl ReplicatedFragmentManagerFactory { preferred, storages, read_repair_semaphore, + fault_injector: None, } } + pub fn with_fault_injector( + mut self, + fault_injector: Option>, + ) -> Self { + self.fault_injector = fault_injector; + self + } + fn build_fragment_uploader(&self) -> ReplicatedFragmentUploader { ReplicatedFragmentUploader::new( self.repl.clone(), @@ -86,6 +97,7 @@ impl ReplicatedFragmentManagerFactory { self.preferred, Arc::clone(&self.storages), ) + .with_fault_injector(self.fault_injector.as_ref().map(Arc::clone)) } } diff --git a/rust/wal3/src/lib.rs b/rust/wal3/src/lib.rs index 2549bbade5c..12c927067d7 100644 --- a/rust/wal3/src/lib.rs +++ b/rust/wal3/src/lib.rs @@ -34,10 +34,11 @@ pub use interfaces::s3::{ S3FragmentPuller, S3FragmentUploader, S3ManifestManagerFactory, }; pub use interfaces::{ - BatchManager, FaultInjectingFragmentManagerFactory, FragmentConsumer, FragmentManagerFactory, - FragmentPointer, FragmentPublisher, FragmentUploadFault, FragmentUploadFaultInjector, - FragmentUploader, ManifestConsumer, ManifestManagerFactory, ManifestPublisher, ManifestWitness, - PositionWitness, FRAGMENT_UPLOAD_FAULT_LABEL, + fragment_upload_replica_fault_label, BatchManager, FaultInjectingFragmentManagerFactory, + FragmentConsumer, FragmentManagerFactory, FragmentPointer, FragmentPublisher, + FragmentUploadFault, FragmentUploadFaultInjector, FragmentUploader, ManifestConsumer, + ManifestManagerFactory, ManifestPublisher, ManifestWitness, PositionWitness, + FRAGMENT_UPLOAD_FAULT_LABEL, FRAGMENT_UPLOAD_REPLICA_FAULT_LABELS, }; pub use manifest::{ unprefixed_snapshot_path, Manifest, ManifestAndWitness, ManifestBounds, diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 56af2af74fa..7e8461f7fa4 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -280,8 +280,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage" diff --git a/rust/worker/chroma_mcmr.yaml b/rust/worker/chroma_mcmr.yaml index 6f0f7513e37..55367422dc0 100644 --- a/rust/worker/chroma_mcmr.yaml +++ b/rust/worker/chroma_mcmr.yaml @@ -326,8 +326,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage" diff --git a/rust/worker/chroma_mcmr2.yaml b/rust/worker/chroma_mcmr2.yaml index 4db5c1ca126..4c656149a17 100644 --- a/rust/worker/chroma_mcmr2.yaml +++ b/rust/worker/chroma_mcmr2.yaml @@ -327,8 +327,8 @@ garbage_collector: request_timeout_ms: 60000 dispatcher_config: num_worker_threads: 4 - dispatcher_queue_size: 1000 - worker_queue_size: 1000 + dispatcher_queue_size: 10000 + worker_queue_size: 10000 storage_config: s3: bucket: "chroma-storage2"