Skip to content

Commit 605427f

Browse files
committed
collapse into one trait
1 parent 252e8a2 commit 605427f

File tree

7 files changed

+735
-26
lines changed

7 files changed

+735
-26
lines changed

rust/log-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ edition = "2021"
77
name = "log_service"
88
path = "src/bin/log.rs"
99

10+
[features]
11+
faults = []
12+
1013
[dependencies]
1114
arrow = { workspace = true }
1215
async-trait = { workspace = true }

rust/log-service/src/lib.rs

Lines changed: 137 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,15 @@ use wal3::{
5757
create_repl_factories, create_s3_factories,
5858
interfaces::repl::ManifestManager as ReplManifestManager, interfaces::ManifestManagerFactory,
5959
scan_from_manifest, Cursor, CursorName, CursorStore, CursorStoreOptions, CursorWitness,
60-
Fragment, FragmentManagerFactory, GarbageCollectionOptions, Limits, LogPosition, LogReader,
61-
LogReaderOptions, LogReaderTrait, LogWriter, LogWriterOptions, LogWriterTrait, Manifest,
62-
ManifestAndWitness, MarkDirty as MarkDirtyTrait, ReplicatedFragmentOptions, Snapshot,
63-
SnapshotCache, SnapshotPointer, StorageWrapper, INTRINSIC_CURSOR,
60+
Fragment, FragmentManagerFactory, FragmentUploadFaultInjector, GarbageCollectionOptions,
61+
Limits, LogPosition, LogReader, LogReaderOptions, LogReaderTrait, LogWriter, LogWriterOptions,
62+
LogWriterTrait, Manifest, ManifestAndWitness, MarkDirty as MarkDirtyTrait,
63+
ReplicatedFragmentOptions, Snapshot, SnapshotCache, SnapshotPointer, StorageWrapper,
64+
INTRINSIC_CURSOR,
65+
};
66+
#[cfg(feature = "faults")]
67+
use wal3::{
68+
FaultInjectingFragmentManagerFactory, FragmentUploadFault, FRAGMENT_UPLOAD_FAULT_LABEL,
6469
};
6570

6671
mod scrub;
@@ -117,6 +122,50 @@ const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml";
117122

118123
const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";
119124

125+
#[cfg(feature = "faults")]
126+
#[derive(Clone)]
127+
struct LogServiceFragmentUploadFaultInjector {
128+
faults: Arc<FaultRegistry>,
129+
}
130+
131+
#[cfg(feature = "faults")]
132+
impl LogServiceFragmentUploadFaultInjector {
133+
fn new(faults: Arc<FaultRegistry>) -> Self {
134+
Self { faults }
135+
}
136+
}
137+
138+
#[cfg(feature = "faults")]
139+
impl FragmentUploadFaultInjector for LogServiceFragmentUploadFaultInjector {
140+
fn fault_for_upload(&self) -> Option<FragmentUploadFault> {
141+
self.faults
142+
.action_for_label(FRAGMENT_UPLOAD_FAULT_LABEL)
143+
.map(|action| match action {
144+
chroma_faults::FaultActionKind::Unavailable => FragmentUploadFault::Unavailable,
145+
chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay),
146+
})
147+
}
148+
}
149+
150+
#[cfg(feature = "faults")]
151+
fn maybe_wrap_fragment_manager_factory<F>(
152+
fragment_manager_factory: F,
153+
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
154+
) -> FaultInjectingFragmentManagerFactory<F> {
155+
FaultInjectingFragmentManagerFactory::new(
156+
fragment_manager_factory,
157+
fragment_upload_fault_injector,
158+
)
159+
}
160+
161+
#[cfg(not(feature = "faults"))]
162+
fn maybe_wrap_fragment_manager_factory<F>(
163+
fragment_manager_factory: F,
164+
_fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
165+
) -> F {
166+
fragment_manager_factory
167+
}
168+
120169
// SAFETY(rescrv): There's a test that this produces a valid type.
121170
static STABLE_PREFIX: CursorName = unsafe { CursorName::from_string_unchecked("stable_prefix") };
122171

@@ -224,6 +273,7 @@ struct FactoryCreationContext<'a> {
224273
collection_id: CollectionUuid,
225274
prefix: String,
226275
snapshot_cache: Arc<dyn SnapshotCache>,
276+
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
227277
}
228278

229279
impl<'a> FactoryCreationContext<'a> {
@@ -232,6 +282,7 @@ impl<'a> FactoryCreationContext<'a> {
232282
topology_name: Option<&'a TopologyName>,
233283
collection_id: CollectionUuid,
234284
snapshot_cache: Arc<dyn SnapshotCache>,
285+
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
235286
) -> Self {
236287
let prefix = collection_id.storage_prefix_for_log();
237288
Self {
@@ -240,6 +291,7 @@ impl<'a> FactoryCreationContext<'a> {
240291
collection_id,
241292
prefix,
242293
snapshot_cache,
294+
fragment_upload_fault_injector,
243295
}
244296
}
245297

@@ -390,6 +442,10 @@ impl<'a> FactoryCreationContext<'a> {
390442
region_names,
391443
self.collection_id.0,
392444
);
445+
let fragment_factory = maybe_wrap_fragment_manager_factory(
446+
fragment_factory,
447+
self.fragment_upload_fault_injector.as_ref().map(Arc::clone),
448+
);
393449
let fragment_publisher = fragment_factory.make_publisher().await?;
394450
Ok(wal3::copy(reader, cursor, &fragment_publisher, manifest_factory, cmek).await?)
395451
}
@@ -417,6 +473,10 @@ impl<'a> FactoryCreationContext<'a> {
417473
Arc::new(()),
418474
Arc::clone(&self.snapshot_cache),
419475
);
476+
let fragment_factory = maybe_wrap_fragment_manager_factory(
477+
fragment_factory,
478+
self.fragment_upload_fault_injector.as_ref().map(Arc::clone),
479+
);
420480
let fragment_publisher = fragment_factory.make_publisher().await?;
421481
Ok(wal3::copy(reader, cursor, &fragment_publisher, manifest_factory, cmek).await?)
422482
}
@@ -563,6 +623,7 @@ async fn get_log_from_handle<'a>(
563623
prefix: &str,
564624
mark_dirty: MarkDirty,
565625
snapshot_cache: Arc<dyn SnapshotCache>,
626+
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
566627
cmek: Option<Cmek>,
567628
) -> Result<LogRef<'a>, Error> {
568629
let active = handle.active.lock().await;
@@ -577,6 +638,7 @@ async fn get_log_from_handle<'a>(
577638
prefix,
578639
mark_dirty,
579640
snapshot_cache,
641+
fragment_upload_fault_injector,
580642
cmek,
581643
)
582644
.await
@@ -594,6 +656,7 @@ async fn get_log_from_handle_with_mutex_held<'a>(
594656
prefix: &str,
595657
mark_dirty: MarkDirty,
596658
snapshot_cache: Arc<dyn SnapshotCache>,
659+
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
597660
cmek: Option<Cmek>,
598661
) -> Result<LogRef<'a>, Error> {
599662
if active.log.is_some() {
@@ -640,6 +703,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
640703
region_names,
641704
collection_id.0,
642705
);
706+
let fragment_publisher_factory = maybe_wrap_fragment_manager_factory(
707+
fragment_publisher_factory,
708+
fragment_upload_fault_injector.as_ref().map(Arc::clone),
709+
);
643710
let opened = LogWriter::open_or_initialize(
644711
write_options.clone(),
645712
"log writer",
@@ -690,6 +757,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
690757
mark_dirty_arc,
691758
snapshot_cache,
692759
);
760+
let fragment_publisher_factory = maybe_wrap_fragment_manager_factory(
761+
fragment_publisher_factory,
762+
fragment_upload_fault_injector.as_ref().map(Arc::clone),
763+
);
693764
let opened = LogWriter::open_or_initialize(
694765
write_options.clone(),
695766
"log writer",
@@ -1115,6 +1186,19 @@ impl LogServer {
11151186
.storage)
11161187
}
11171188

1189+
fn fragment_upload_fault_injector(&self) -> Option<Arc<dyn FragmentUploadFaultInjector>> {
1190+
#[cfg(feature = "faults")]
1191+
{
1192+
Some(Arc::new(LogServiceFragmentUploadFaultInjector::new(
1193+
Arc::clone(&self.faults),
1194+
)))
1195+
}
1196+
#[cfg(not(feature = "faults"))]
1197+
{
1198+
None
1199+
}
1200+
}
1201+
11181202
fn snapshot_cache_for_collection(
11191203
&self,
11201204
collection_id: CollectionUuid,
@@ -1155,6 +1239,7 @@ impl LogServer {
11551239
topology_name,
11561240
collection_id,
11571241
snapshot_cache,
1242+
self.fragment_upload_fault_injector(),
11581243
);
11591244
ctx.make_log_reader(&self.config.writer, &self.config.reader)
11601245
.await
@@ -1309,6 +1394,7 @@ impl LogServer {
13091394
&storage_prefix,
13101395
mark_dirty,
13111396
snapshot_cache,
1397+
self.fragment_upload_fault_injector(),
13121398
None, // Offset updates don't use CMEK
13131399
)
13141400
.await
@@ -2089,6 +2175,7 @@ impl LogServer {
20892175
&prefix,
20902176
mark_dirty,
20912177
snapshot_cache,
2178+
self.fragment_upload_fault_injector(),
20922179
cmek,
20932180
)
20942181
.await
@@ -2541,6 +2628,7 @@ impl LogServer {
25412628
topology_name.as_ref(),
25422629
target_collection_id,
25432630
snapshot_cache,
2631+
self.fragment_upload_fault_injector(),
25442632
);
25452633
target_ctx
25462634
.fork_to_target(
@@ -2926,6 +3014,7 @@ impl LogServer {
29263014
&prefix,
29273015
mark_dirty,
29283016
snapshot_cache,
3017+
self.fragment_upload_fault_injector(),
29293018
None, // GC doesn't use CMEK
29303019
)
29313020
.await
@@ -5090,6 +5179,50 @@ mod tests {
50905179
}
50915180
}
50925181

5182+
#[cfg(feature = "faults")]
5183+
#[tokio::test]
5184+
async fn fragment_upload_fault_injection_rejects_then_recovers() {
5185+
let (ctor, dtor) = s3_setup_log_server();
5186+
let log_server = ctor.await;
5187+
let collection_id = CollectionUuid::new();
5188+
let make_request = || PushLogsRequest {
5189+
collection_id: collection_id.to_string(),
5190+
records: vec![OperationRecord {
5191+
id: "fault-test".to_string(),
5192+
embedding: None,
5193+
encoding: None,
5194+
metadata: None,
5195+
document: None,
5196+
operation: Operation::Delete,
5197+
}
5198+
.try_into()
5199+
.expect("operation record should convert to proto")],
5200+
cmek: None,
5201+
database_name: "default_database".to_string(),
5202+
};
5203+
5204+
log_server.faults.inject(
5205+
chroma_faults::FaultSelectorKind::Label(FRAGMENT_UPLOAD_FAULT_LABEL.to_string()),
5206+
chroma_faults::FaultActionKind::Unavailable,
5207+
);
5208+
5209+
let err = log_server
5210+
.push_logs(Request::new(make_request()))
5211+
.await
5212+
.expect_err("fault injection should reject fragment upload");
5213+
assert_eq!(err.code(), Code::Unavailable);
5214+
5215+
log_server.faults.clear_all();
5216+
5217+
let response = log_server
5218+
.push_logs(Request::new(make_request()))
5219+
.await
5220+
.expect("write should succeed after clearing injected fault");
5221+
assert_eq!(response.into_inner().record_count, 1);
5222+
5223+
dtor.await;
5224+
}
5225+
50935226
async fn validate_log_on_server(
50945227
server: &LogServer,
50955228
db_name: &str,

0 commit comments

Comments
 (0)