diff --git a/go/pkg/log/server/server.go b/go/pkg/log/server/server.go index bcbd36a8e3e..c514eaf055f 100644 --- a/go/pkg/log/server/server.go +++ b/go/pkg/log/server/server.go @@ -54,16 +54,18 @@ func (s *logServer) ScoutLogs(ctx context.Context, req *logservicepb.ScoutLogsRe if err != nil { return } + var start int64 var limit int64 - _, limit, err = s.lr.GetBoundsForCollection(ctx, collectionID.String()) + start, limit, err = s.lr.GetBoundsForCollection(ctx, collectionID.String()) if err != nil { return } // +1 to convert from the (] bound to a [) bound. res = &logservicepb.ScoutLogsResponse{ + FirstUncompactedRecordOffset: int64(start + 1), FirstUninsertedRecordOffset: int64(limit + 1), } - trace_log.Info("Scouted Logs", zap.Int64("limit", int64(limit + 1)), zap.String("collectionId", req.CollectionId)) + trace_log.Info("Scouted Logs", zap.Int64("start", int64(start + 1)), zap.Int64("limit", int64(limit + 1)), zap.String("collectionId", req.CollectionId)) return } diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index 6e6ceaa76b9..d270b5189b3 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -24,6 +24,8 @@ message ScoutLogsResponse { reserved 1; // The next record to insert will have this offset. int64 first_uninserted_record_offset = 2; + // The oldest record on the log will have this offset. + int64 first_uncompacted_record_offset = 3; } message PullLogsRequest { diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 724713ae851..cc254f2abea 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -10,14 +10,16 @@ use bytes::Bytes; use chroma_cache::CacheConfig; use chroma_config::Configurable; use chroma_error::ChromaError; +use chroma_log::{config::GrpcLogConfig, grpc_log::GrpcLog}; use chroma_storage::config::StorageConfig; use chroma_storage::Storage; use chroma_types::chroma_proto::{ - log_service_server::LogService, CollectionInfo, GetAllCollectionInfoToCompactRequest, - GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse, - LogRecord, OperationRecord, PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest, - PurgeDirtyForCollectionResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest, - ScoutLogsResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse, + log_service_client::LogServiceClient, log_service_server::LogService, CollectionInfo, + GetAllCollectionInfoToCompactRequest, GetAllCollectionInfoToCompactResponse, + InspectDirtyLogRequest, InspectDirtyLogResponse, LogRecord, OperationRecord, PullLogsRequest, + PullLogsResponse, PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse, + PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, + UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse, }; use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse}; use chroma_types::CollectionUuid; @@ -201,7 +203,7 @@ async fn get_log_from_handle<'a>( }); } tracing::info!("Opening log at {}", prefix); - let opened = LogWriter::open_or_initialize( + let opened = LogWriter::open( options.clone(), Arc::clone(storage), prefix, @@ -630,11 +632,173 @@ pub struct LogServer { storage: Arc, open_logs: Arc>, dirty_log: Arc, + #[allow(clippy::type_complexity)] + proxy: Option>>, compacting: tokio::sync::Mutex<()>, cache: Option>>, metrics: Metrics, } +impl LogServer { + async fn effectuate_log_transfer( + &self, + collection_id: CollectionUuid, + mut proxy: LogServiceClient>, + ) -> Result<(), Status> { + let scout_request = Request::new(ScoutLogsRequest { + collection_id: collection_id.to_string(), + }); + let scout_resp = proxy.clone().scout_logs(scout_request).await?.into_inner(); + let start = scout_resp.first_uncompacted_record_offset as u64; + let limit = scout_resp.first_uninserted_record_offset as u64; + const STEP: u64 = 100; + let num_steps = (limit.saturating_sub(start) + STEP - 1) / STEP; + let actual_steps = (0..num_steps) + .map(|x| { + ( + start + x * STEP, + std::cmp::min(start + x * STEP + STEP, limit), + ) + }) + .collect::>(); + let pull_logs_reqs = actual_steps + .iter() + .cloned() + .map(|(start, limit)| PullLogsRequest { + collection_id: collection_id.to_string(), + start_from_offset: start as i64 - 1, + // SAFETY(rescrv): STEP fits a i32. + batch_size: (limit - start) as i32, + end_timestamp: i64::MAX, + }); + let mut responses = vec![]; + for req in pull_logs_reqs { + let resp = proxy.pull_logs(Request::new(req)).await?.into_inner(); + responses.push(resp); + } + let mut records = vec![]; + for ((start, limit), resp) in + std::iter::zip(actual_steps.into_iter(), responses.into_iter()) + { + for (expect, (idx, record)) in + std::iter::zip(start..limit, resp.records.into_iter().enumerate()) + { + if expect != idx as u64 { + return Err(Status::data_loss(format!( + "expected log position {expect} but got {idx}" + ))); + } + if (record.log_offset as u64).wrapping_add(1) != expect { + return Err(Status::data_loss(format!( + "expected log position {expect} but got {}", + (record.log_offset as u64).wrapping_add(1) + ))); + } + records.push(record); + } + } + let record_bytes = records + .into_iter() + .map(|record| -> Result, Status> { + let mut buf = vec![]; + record + .encode(&mut buf) + .map_err(|err| Status::internal(err.to_string()))?; + Ok(buf) + }) + .collect::, Status>>()?; + let prefix = storage_prefix_for_log(collection_id); + let mark_dirty = MarkDirty { + collection_id, + dirty_log: Arc::clone(&self.dirty_log), + }; + LogWriter::bootstrap( + &self.config.writer, + &self.storage, + &prefix, + "effectuate log transfer", + mark_dirty, + LogPosition::from_offset(start), + record_bytes, + ) + .await + .map_err(|err| { + Status::new( + err.code().into(), + format!("failed to effectuate log transfer: {err:?}"), + ) + }) + } + + async fn forward_push_logs( + &self, + collection_id: CollectionUuid, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + if let Some(proxy) = self.proxy.as_ref() { + let resp = proxy + .clone() + .push_logs(Request::new(request.clone())) + .await? + .into_inner(); + if resp.log_is_sealed { + self.effectuate_log_transfer(collection_id, proxy.clone()) + .await?; + self.push_logs(Request::new(request)).await + } else { + Ok(Response::new(resp)) + } + } else { + Err(Status::failed_precondition("proxy not initialized")) + } + } + + async fn forward_scout_logs( + &self, + request: Request, + ) -> Result, Status> { + if let Some(proxy) = self.proxy.as_ref() { + proxy.clone().scout_logs(request).await + } else { + Err(Status::failed_precondition("proxy not initialized")) + } + } + + async fn forward_pull_logs( + &self, + request: Request, + ) -> Result, Status> { + if let Some(proxy) = self.proxy.as_ref() { + proxy.clone().pull_logs(request).await + } else { + Err(Status::failed_precondition("proxy not initialized")) + } + } + + async fn forward_update_collection_log_offset( + &self, + request: Request, + ) -> Result, Status> { + if let Some(proxy) = self.proxy.as_ref() { + proxy.clone().update_collection_log_offset(request).await + } else { + Err(Status::failed_precondition("proxy not initialized")) + } + } + + async fn forward_fork_logs( + &self, + request: Request, + ) -> Result, Status> { + if let Some(proxy) = self.proxy.as_ref() { + proxy.clone().fork_logs(request).await + } else { + Err(Status::failed_precondition("proxy not initialized")) + } + } +} + #[async_trait::async_trait] impl LogService for LogServer { async fn push_logs( @@ -662,7 +826,7 @@ impl LogService for LogServer { collection_id, dirty_log: Arc::clone(&self.dirty_log), }; - let log = get_log_from_handle( + let log = match get_log_from_handle( &handle, &self.config.writer, &self.storage, @@ -670,8 +834,17 @@ impl LogService for LogServer { mark_dirty, ) .await - // TODO(rescrv): better error handling. - .map_err(|err| Status::unknown(err.to_string()))?; + { + Ok(log) => log, + Err(wal3::Error::UninitializedLog) => { + return self + .forward_push_logs(collection_id, Request::new(push_logs)) + .await; + } + Err(err) => { + return Err(Status::unknown(err.to_string())); + } + }; let mut messages = Vec::with_capacity(push_logs.records.len()); for record in push_logs.records { let mut buf = vec![]; @@ -716,18 +889,25 @@ impl LogService for LogServer { Arc::clone(&self.storage), prefix, ); - let limit_position = match log_reader.maximum_log_position().await { - Ok(limit_position) => limit_position, + let (start_position, limit_position) = match log_reader.manifest().await { + Ok(Some(manifest)) => ( + manifest.minimum_log_position(), + manifest.maximum_log_position(), + ), + Ok(None) | Err(wal3::Error::UninitializedLog) => { + return self.forward_scout_logs(Request::new(scout_logs)).await; + } Err(err) => { - if err.code() == chroma_error::ErrorCodes::FailedPrecondition { - LogPosition::from_offset(1) - } else { - return Err(Status::new(err.code().into(), err.to_string())); - } + return Err(Status::new( + err.code().into(), + format!("could not scout logs: {err:?}"), + )); } }; + let start_offset = start_position.offset() as i64; let limit_offset = limit_position.offset() as i64; Ok(Response::new(ScoutLogsResponse { + first_uncompacted_record_offset: start_offset, first_uninserted_record_offset: limit_offset, })) } @@ -768,13 +948,11 @@ impl LogService for LogServer { .await { Ok(fragments) => fragments, + Err(wal3::Error::UninitializedLog) => { + return self.forward_pull_logs(Request::new(pull_logs)).await; + } Err(err) => { - if let wal3::Error::UninitializedLog = err { - tracing::info!("Uninitialized log for collection {}", collection_id); - return Ok(Response::new(PullLogsResponse { records: vec![] })); - } else { - return Err(Status::new(err.code().into(), err.to_string())); - } + return Err(Status::new(err.code().into(), err.to_string())); } }; let futures = fragments @@ -854,6 +1032,19 @@ impl LogService for LogServer { Arc::clone(&storage), source_prefix.clone(), ); + if let Err(err) = log_reader.maximum_log_position().await { + match err { + wal3::Error::UninitializedLog => { + return self.forward_fork_logs(Request::new(request)).await; + } + _ => { + return Err(Status::new( + err.code().into(), + format!("Failed to load log: {}", err), + )); + } + } + } let cursors = CursorStore::new( CursorStoreOptions::default(), Arc::clone(&storage), @@ -862,7 +1053,6 @@ impl LogService for LogServer { ); let cursor_name = &COMPACTION; let witness = cursors.load(cursor_name).await.map_err(|err| { - tracing::info!("FINDME"); Status::new(err.code().into(), format!("Failed to load cursor: {}", err)) })?; // This is the existing compaction_offset, which is the last record that was compacted. @@ -879,7 +1069,6 @@ impl LogService for LogServer { ) .await .map_err(|err| { - tracing::info!("FINDME"); Status::new(err.code().into(), format!("Failed to copy log: {}", err)) })?; let log_reader = LogReader::new( @@ -889,11 +1078,9 @@ impl LogService for LogServer { ); // This is the next record to insert, so we'll have to adjust downwards. let max_offset = log_reader.maximum_log_position().await.map_err(|err| { - tracing::info!("FINDME"); Status::new(err.code().into(), format!("Failed to copy log: {}", err)) })?; if max_offset < offset { - tracing::info!("FINDME"); return Err(Status::new( chroma_error::ErrorCodes::Internal.into(), format!("max_offset={:?} < offset={:?}", max_offset, offset), @@ -1074,6 +1261,20 @@ impl LogService for LogServer { request.log_offset ); let storage_prefix = storage_prefix_for_log(collection_id); + let log_reader = LogReader::new( + self.config.reader.clone(), + Arc::clone(&self.storage), + storage_prefix.clone(), + ); + + let res = log_reader.maximum_log_position().await; + if let Err(wal3::Error::UninitializedLog) = res { + return self + .forward_update_collection_log_offset(Request::new(request)) + .await; + } + res.map_err(|err| Status::unknown(err.to_string()))?; + let cursor_name = &COMPACTION; let cursor_store = CursorStore::new( CursorStoreOptions::default(), @@ -1416,6 +1617,8 @@ pub struct LogServerConfig { pub reinsert_threshold: u64, #[serde(default = "LogServerConfig::default_timeout_us")] pub timeout_us: u64, + #[serde(default)] + pub proxy_to: Option, } impl LogServerConfig { @@ -1447,6 +1650,7 @@ impl Default for LogServerConfig { record_count_threshold: Self::default_record_count_threshold(), reinsert_threshold: Self::default_reinsert_threshold(), timeout_us: Self::default_timeout_us(), + proxy_to: None, } } } @@ -1484,6 +1688,16 @@ impl Configurable for LogServer { .await .map_err(|err| -> Box { Box::new(err) as _ })?; let dirty_log = Arc::new(dirty_log); + let proxy = if let Some(proxy_to) = config.proxy_to.as_ref() { + match GrpcLog::primary_client_from_config(proxy_to).await { + Ok(log) => Some(log), + Err(err) => { + return Err(err); + } + } + } else { + None + }; let compacting = tokio::sync::Mutex::new(()); let metrics = Metrics::new(opentelemetry::global::meter("chroma")); Ok(Self { @@ -1491,6 +1705,7 @@ impl Configurable for LogServer { open_logs: Arc::new(StateHashTable::default()), storage, dirty_log, + proxy, compacting, cache, metrics, diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index c3b8a3e28bf..30c5298bfb8 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -172,6 +172,7 @@ impl Configurable for GrpcLog { my_config: &GrpcLogConfig, _registry: &Registry, ) -> Result> { + // NOTE(rescrv): This code is duplicated with primary_client_from_config below. A transient hack. let host = &my_config.host; let port = &my_config.port; let max_encoding_message_size = my_config.max_encoding_message_size; @@ -209,6 +210,40 @@ impl Configurable for GrpcLog { } impl GrpcLog { + // NOTE(rescrv) This is a transient hack, so the code duplication is not worth eliminating. + pub async fn primary_client_from_config( + my_config: &GrpcLogConfig, + ) -> Result< + LogServiceClient>, + Box, + > { + let host = &my_config.host; + let port = &my_config.port; + let max_encoding_message_size = my_config.max_encoding_message_size; + let max_decoding_message_size = my_config.max_decoding_message_size; + let connection_string = format!("http://{}:{}", host, port); + let client_for_conn_str = + |connection_string: String| -> Result, Box> { + tracing::info!("Connecting to log service at {}", connection_string); + let endpoint_res = match Endpoint::from_shared(connection_string) { + Ok(endpoint) => endpoint, + Err(e) => return Err(Box::new(GrpcLogError::FailedToConnect(e))), + }; + let endpoint_res = endpoint_res + .connect_timeout(Duration::from_millis(my_config.connect_timeout_ms)) + .timeout(Duration::from_millis(my_config.request_timeout_ms)); + let channel = endpoint_res.connect_lazy(); + let channel = ServiceBuilder::new() + .layer(chroma_tracing::GrpcTraceLayer) + .service(channel); + let client = LogServiceClient::new(channel) + .max_encoding_message_size(max_encoding_message_size) + .max_decoding_message_size(max_decoding_message_size); + Ok(client) + }; + client_for_conn_str(connection_string) + } + fn client_for( &mut self, tenant: &str, diff --git a/rust/wal3/src/manifest.rs b/rust/wal3/src/manifest.rs index 318cbcfbc0b..bf8dd0b6290 100644 --- a/rust/wal3/src/manifest.rs +++ b/rust/wal3/src/manifest.rs @@ -276,7 +276,7 @@ impl Snapshot { }, (Some(f), None) => f, (None, Some(s)) => s, - (None, None) => LogPosition::default(), + (None, None) => LogPosition::from_offset(1), } }