Skip to content

Commit 613c61f

Browse files
committed
[ENH] Add feature to the rust log service to proxy the go log service.
Following the rollout ADR, this will encode table to go classic->transitioning->transitioned. | | **inactive** | **active** | |------------|---------------|--------------| | **open** | classic | BUG | | **sealed** | transitioning | transitioned |
1 parent 573b791 commit 613c61f

File tree

2 files changed

+208
-16
lines changed

2 files changed

+208
-16
lines changed

rust/log-service/src/lib.rs

+173-16
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use bytes::Bytes;
1010
use chroma_cache::CacheConfig;
1111
use chroma_config::Configurable;
1212
use chroma_error::ChromaError;
13+
use chroma_log::{config::GrpcLogConfig, grpc_log::GrpcLog};
1314
use chroma_storage::config::StorageConfig;
1415
use chroma_storage::Storage;
1516
use chroma_types::chroma_proto::{
16-
log_service_server::LogService, CollectionInfo, GetAllCollectionInfoToCompactRequest,
17-
GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse,
18-
LogRecord, OperationRecord, PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest,
19-
PurgeDirtyForCollectionResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest,
20-
ScoutLogsResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
17+
log_service_client::LogServiceClient, log_service_server::LogService, CollectionInfo,
18+
GetAllCollectionInfoToCompactRequest, GetAllCollectionInfoToCompactResponse,
19+
InspectDirtyLogRequest, InspectDirtyLogResponse, LogRecord, OperationRecord, PullLogsRequest,
20+
PullLogsResponse, PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse,
21+
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse,
22+
UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
2123
};
2224
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
2325
use chroma_types::CollectionUuid;
@@ -201,7 +203,7 @@ async fn get_log_from_handle<'a>(
201203
});
202204
}
203205
tracing::info!("Opening log at {}", prefix);
204-
let opened = LogWriter::open_or_initialize(
206+
let opened = LogWriter::open(
205207
options.clone(),
206208
Arc::clone(storage),
207209
prefix,
@@ -630,11 +632,70 @@ pub struct LogServer {
630632
storage: Arc<Storage>,
631633
open_logs: Arc<StateHashTable<LogKey, LogStub>>,
632634
dirty_log: Arc<LogWriter>,
635+
#[allow(clippy::type_complexity)]
636+
proxy: Option<LogServiceClient<chroma_tracing::GrpcTraceService<tonic::transport::Channel>>>,
633637
compacting: tokio::sync::Mutex<()>,
634638
cache: Option<Box<dyn chroma_cache::PersistentCache<String, CachedParquetFragment>>>,
635639
metrics: Metrics,
636640
}
637641

642+
impl LogServer {
643+
fn should_initialize_log(&self, collection: CollectionUuid) -> bool {
644+
todo!();
645+
}
646+
647+
async fn forward_push_logs(
648+
&self,
649+
request: Request<PushLogsRequest>,
650+
) -> Result<Response<PushLogsResponse>, Status> {
651+
todo!();
652+
}
653+
654+
async fn forward_scout_logs(
655+
&self,
656+
request: Request<ScoutLogsRequest>,
657+
) -> Result<Response<ScoutLogsResponse>, Status> {
658+
if let Some(proxy) = self.proxy.as_ref() {
659+
proxy.clone().scout_logs(request).await
660+
} else {
661+
Err(Status::failed_precondition("proxy not initialized"))
662+
}
663+
}
664+
665+
async fn forward_pull_logs(
666+
&self,
667+
request: Request<PullLogsRequest>,
668+
) -> Result<Response<PullLogsResponse>, Status> {
669+
if let Some(proxy) = self.proxy.as_ref() {
670+
proxy.clone().pull_logs(request).await
671+
} else {
672+
Err(Status::failed_precondition("proxy not initialized"))
673+
}
674+
}
675+
676+
async fn forward_update_collection_log_offset(
677+
&self,
678+
request: Request<UpdateCollectionLogOffsetRequest>,
679+
) -> Result<Response<UpdateCollectionLogOffsetResponse>, Status> {
680+
if let Some(proxy) = self.proxy.as_ref() {
681+
proxy.clone().update_collection_log_offset(request).await
682+
} else {
683+
Err(Status::failed_precondition("proxy not initialized"))
684+
}
685+
}
686+
687+
async fn forward_fork_logs(
688+
&self,
689+
request: Request<ForkLogsRequest>,
690+
) -> Result<Response<ForkLogsResponse>, Status> {
691+
if let Some(proxy) = self.proxy.as_ref() {
692+
proxy.clone().fork_logs(request).await
693+
} else {
694+
Err(Status::failed_precondition("proxy not initialized"))
695+
}
696+
}
697+
}
698+
638699
#[async_trait::async_trait]
639700
impl LogService for LogServer {
640701
async fn push_logs(
@@ -662,16 +723,36 @@ impl LogService for LogServer {
662723
collection_id,
663724
dirty_log: Arc::clone(&self.dirty_log),
664725
};
665-
let log = get_log_from_handle(
726+
let log = match get_log_from_handle(
666727
&handle,
667728
&self.config.writer,
668729
&self.storage,
669730
&prefix,
670731
mark_dirty,
671732
)
672733
.await
673-
// TODO(rescrv): better error handling.
674-
.map_err(|err| Status::unknown(err.to_string()))?;
734+
{
735+
Ok(log) => log,
736+
Err(wal3::Error::UninitializedLog) => {
737+
if self.should_initialize_log(collection_id) {
738+
if let Err(err) = LogWriter::initialize(
739+
&self.config.writer,
740+
&self.storage,
741+
&prefix,
742+
"push_logs initializer",
743+
)
744+
.await
745+
{
746+
return Err(Status::unknown(err.to_string()));
747+
}
748+
return Box::pin(self.push_logs(Request::new(push_logs))).await;
749+
}
750+
return self.forward_push_logs(Request::new(push_logs)).await;
751+
}
752+
Err(err) => {
753+
return Err(Status::unknown(err.to_string()));
754+
}
755+
};
675756
let mut messages = Vec::with_capacity(push_logs.records.len());
676757
for record in push_logs.records {
677758
let mut buf = vec![];
@@ -718,6 +799,17 @@ impl LogService for LogServer {
718799
);
719800
let limit_position = match log_reader.maximum_log_position().await {
720801
Ok(limit_position) => limit_position,
802+
Err(wal3::Error::UninitializedLog) => {
803+
// NOTE(rescrv): In this case, what we have is a guarantee that we know the
804+
// log is ours. This only comes from the by-tenant or by-collection alt-log
805+
// shunts. The log is uninitialized, so we know it's not because
806+
// has_manifest().
807+
if self.should_initialize_log(collection_id) {
808+
LogPosition::from_offset(1)
809+
} else {
810+
return self.forward_scout_logs(Request::new(scout_logs)).await;
811+
}
812+
}
721813
Err(err) => {
722814
if err.code() == chroma_error::ErrorCodes::FailedPrecondition {
723815
LogPosition::from_offset(1)
@@ -768,14 +860,18 @@ impl LogService for LogServer {
768860
.await
769861
{
770862
Ok(fragments) => fragments,
771-
Err(err) => {
772-
if let wal3::Error::UninitializedLog = err {
863+
Err(wal3::Error::UninitializedLog) => {
864+
// NOTE(rescrv): Same as with ScoutLogs.
865+
if self.should_initialize_log(collection_id) {
773866
tracing::info!("Uninitialized log for collection {}", collection_id);
774867
return Ok(Response::new(PullLogsResponse { records: vec![] }));
775868
} else {
776-
return Err(Status::new(err.code().into(), err.to_string()));
869+
return self.forward_pull_logs(Request::new(pull_logs)).await;
777870
}
778871
}
872+
Err(err) => {
873+
return Err(Status::new(err.code().into(), err.to_string()));
874+
}
779875
};
780876
let futures = fragments
781877
.iter()
@@ -854,6 +950,37 @@ impl LogService for LogServer {
854950
Arc::clone(&storage),
855951
source_prefix.clone(),
856952
);
953+
if let Err(err) = log_reader.maximum_log_position().await {
954+
match err {
955+
wal3::Error::UninitializedLog => {
956+
// NOTE(rescrv): Same as with ScoutLogs.
957+
if self.should_initialize_log(source_collection_id) {
958+
LogWriter::initialize(
959+
&self.config.writer,
960+
&storage,
961+
&source_prefix,
962+
"fork logs initializer",
963+
)
964+
.await
965+
.map_err(|err| {
966+
Status::new(
967+
err.code().into(),
968+
format!("Failed to initialize log for fork: {err:?}"),
969+
)
970+
})?;
971+
return Box::pin(self.fork_logs(Request::new(request))).await;
972+
} else {
973+
return self.forward_fork_logs(Request::new(request)).await;
974+
}
975+
}
976+
_ => {
977+
return Err(Status::new(
978+
err.code().into(),
979+
format!("Failed to load log: {}", err),
980+
));
981+
}
982+
}
983+
}
857984
let cursors = CursorStore::new(
858985
CursorStoreOptions::default(),
859986
Arc::clone(&storage),
@@ -862,7 +989,6 @@ impl LogService for LogServer {
862989
);
863990
let cursor_name = &COMPACTION;
864991
let witness = cursors.load(cursor_name).await.map_err(|err| {
865-
tracing::info!("FINDME");
866992
Status::new(err.code().into(), format!("Failed to load cursor: {}", err))
867993
})?;
868994
// This is the existing compaction_offset, which is the last record that was compacted.
@@ -879,7 +1005,6 @@ impl LogService for LogServer {
8791005
)
8801006
.await
8811007
.map_err(|err| {
882-
tracing::info!("FINDME");
8831008
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
8841009
})?;
8851010
let log_reader = LogReader::new(
@@ -889,11 +1014,9 @@ impl LogService for LogServer {
8891014
);
8901015
// This is the next record to insert, so we'll have to adjust downwards.
8911016
let max_offset = log_reader.maximum_log_position().await.map_err(|err| {
892-
tracing::info!("FINDME");
8931017
Status::new(err.code().into(), format!("Failed to copy log: {}", err))
8941018
})?;
8951019
if max_offset < offset {
896-
tracing::info!("FINDME");
8971020
return Err(Status::new(
8981021
chroma_error::ErrorCodes::Internal.into(),
8991022
format!("max_offset={:?} < offset={:?}", max_offset, offset),
@@ -1074,6 +1197,26 @@ impl LogService for LogServer {
10741197
request.log_offset
10751198
);
10761199
let storage_prefix = storage_prefix_for_log(collection_id);
1200+
let log_reader = LogReader::new(
1201+
self.config.reader.clone(),
1202+
Arc::clone(&self.storage),
1203+
storage_prefix.clone(),
1204+
);
1205+
1206+
let res = log_reader.maximum_log_position().await;
1207+
if let Err(wal3::Error::UninitializedLog) = res {
1208+
if self.should_initialize_log(collection_id) {
1209+
return Err(Status::failed_precondition(
1210+
"uninitialized log has its cursor updated",
1211+
));
1212+
} else {
1213+
return self
1214+
.forward_update_collection_log_offset(Request::new(request))
1215+
.await;
1216+
}
1217+
}
1218+
res.map_err(|err| Status::unknown(err.to_string()))?;
1219+
10771220
let cursor_name = &COMPACTION;
10781221
let cursor_store = CursorStore::new(
10791222
CursorStoreOptions::default(),
@@ -1416,6 +1559,8 @@ pub struct LogServerConfig {
14161559
pub reinsert_threshold: u64,
14171560
#[serde(default = "LogServerConfig::default_timeout_us")]
14181561
pub timeout_us: u64,
1562+
#[serde(default)]
1563+
pub proxy_to: Option<GrpcLogConfig>,
14191564
}
14201565

14211566
impl LogServerConfig {
@@ -1447,6 +1592,7 @@ impl Default for LogServerConfig {
14471592
record_count_threshold: Self::default_record_count_threshold(),
14481593
reinsert_threshold: Self::default_reinsert_threshold(),
14491594
timeout_us: Self::default_timeout_us(),
1595+
proxy_to: None,
14501596
}
14511597
}
14521598
}
@@ -1484,13 +1630,24 @@ impl Configurable<LogServerConfig> for LogServer {
14841630
.await
14851631
.map_err(|err| -> Box<dyn ChromaError> { Box::new(err) as _ })?;
14861632
let dirty_log = Arc::new(dirty_log);
1633+
let proxy = if let Some(proxy_to) = config.proxy_to.as_ref() {
1634+
match GrpcLog::primary_client_from_config(proxy_to).await {
1635+
Ok(log) => Some(log),
1636+
Err(err) => {
1637+
return Err(err);
1638+
}
1639+
}
1640+
} else {
1641+
None
1642+
};
14871643
let compacting = tokio::sync::Mutex::new(());
14881644
let metrics = Metrics::new(opentelemetry::global::meter("chroma"));
14891645
Ok(Self {
14901646
config: config.clone(),
14911647
open_logs: Arc::new(StateHashTable::default()),
14921648
storage,
14931649
dirty_log,
1650+
proxy,
14941651
compacting,
14951652
cache,
14961653
metrics,

rust/log/src/grpc_log.rs

+35
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ impl Configurable<GrpcLogConfig> for GrpcLog {
172172
my_config: &GrpcLogConfig,
173173
_registry: &Registry,
174174
) -> Result<Self, Box<dyn ChromaError>> {
175+
// NOTE(rescrv): This code is duplicated with primary_client_from_config below. A transient hack.
175176
let host = &my_config.host;
176177
let port = &my_config.port;
177178
let max_encoding_message_size = my_config.max_encoding_message_size;
@@ -209,6 +210,40 @@ impl Configurable<GrpcLogConfig> for GrpcLog {
209210
}
210211

211212
impl GrpcLog {
213+
// NOTE(rescrv) This is a transient hack, so the code duplication is not worth eliminating.
214+
pub async fn primary_client_from_config(
215+
my_config: &GrpcLogConfig,
216+
) -> Result<
217+
LogServiceClient<chroma_tracing::GrpcTraceService<tonic::transport::Channel>>,
218+
Box<dyn ChromaError>,
219+
> {
220+
let host = &my_config.host;
221+
let port = &my_config.port;
222+
let max_encoding_message_size = my_config.max_encoding_message_size;
223+
let max_decoding_message_size = my_config.max_decoding_message_size;
224+
let connection_string = format!("http://{}:{}", host, port);
225+
let client_for_conn_str =
226+
|connection_string: String| -> Result<LogServiceClient<_>, Box<dyn ChromaError>> {
227+
tracing::info!("Connecting to log service at {}", connection_string);
228+
let endpoint_res = match Endpoint::from_shared(connection_string) {
229+
Ok(endpoint) => endpoint,
230+
Err(e) => return Err(Box::new(GrpcLogError::FailedToConnect(e))),
231+
};
232+
let endpoint_res = endpoint_res
233+
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms))
234+
.timeout(Duration::from_millis(my_config.request_timeout_ms));
235+
let channel = endpoint_res.connect_lazy();
236+
let channel = ServiceBuilder::new()
237+
.layer(chroma_tracing::GrpcTraceLayer)
238+
.service(channel);
239+
let client = LogServiceClient::new(channel)
240+
.max_encoding_message_size(max_encoding_message_size)
241+
.max_decoding_message_size(max_decoding_message_size);
242+
Ok(client)
243+
};
244+
client_for_conn_str(connection_string)
245+
}
246+
212247
fn client_for(
213248
&mut self,
214249
tenant: &str,

0 commit comments

Comments
 (0)