diff --git a/src/db_index.rs b/src/db_index.rs index da9211e..4c0cbd9 100644 --- a/src/db_index.rs +++ b/src/db_index.rs @@ -4,6 +4,7 @@ */ use crate::ColumnName; +use crate::Embeddings; use crate::IndexMetadata; use crate::Key; use crate::TableName; @@ -25,12 +26,23 @@ pub(crate) enum DbIndex { GetProcessedIds { tx: oneshot::Sender>>>, }, + + GetItems { + #[allow(clippy::type_complexity)] + tx: oneshot::Sender< + anyhow::Result>>, + >, + }, } pub(crate) trait DbIndexExt { async fn get_processed_ids( &self, ) -> anyhow::Result>>; + + async fn get_items( + &self, + ) -> anyhow::Result>>; } impl DbIndexExt for mpsc::Sender { @@ -41,6 +53,14 @@ impl DbIndexExt for mpsc::Sender { self.send(DbIndex::GetProcessedIds { tx }).await?; rx.await? } + + async fn get_items( + &self, + ) -> anyhow::Result>> { + let (tx, rx) = oneshot::channel(); + self.send(DbIndex::GetItems { tx }).await?; + rx.await? + } } pub(crate) async fn new( @@ -67,12 +87,17 @@ async fn process(statements: Arc, msg: DbIndex) { .unwrap_or_else(|_| { warn!("db_index::process: Db::GetProcessedIds: unable to send response") }), + + DbIndex::GetItems { tx } => tx + .send(statements.get_items().await) + .unwrap_or_else(|_| warn!("db_index::process: Db::GetItems: unable to send response")), } } struct Statements { session: Arc, st_get_processed_ids: PreparedStatement, + st_get_items: PreparedStatement, } impl Statements { @@ -86,6 +111,15 @@ impl Statements { .await .context("get_processed_ids_query")?, + st_get_items: session + .prepare(Self::get_items_query( + &metadata.table_name, + &metadata.key_name, + &metadata.target_name, + )) + .await + .context("get_items_query")?, + session, }) } @@ -112,4 +146,26 @@ impl Statements { .map_ok(|(key,)| (key as u64).into()) .boxed()) } + + fn get_items_query(table: &TableName, col_id: &ColumnName, col_emb: &ColumnName) -> String { + format!( + " + SELECT {col_id}, {col_emb} + FROM {table} + WHERE processed = FALSE + " + ) + } + + async fn get_items( + &self, + ) -> anyhow::Result>> { + Ok(self + .session + .execute_iter(self.st_get_items.clone(), ()) + .await? + .rows_stream::<(i64, Vec)>()? + .map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into())) + .boxed()) + } } diff --git a/src/monitor_items.rs b/src/monitor_items.rs index 1c062db..4485b21 100644 --- a/src/monitor_items.rs +++ b/src/monitor_items.rs @@ -3,7 +3,6 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ -use crate::Embeddings; use crate::IndexMetadata; use crate::Key; use crate::db_index::DbIndex; @@ -11,10 +10,8 @@ use crate::db_index::DbIndexExt; use crate::index::Index; use crate::index::IndexExt; use anyhow::Context; -use futures::Stream; use futures::TryStreamExt; use scylla::client::session::Session; -use scylla::errors::NextRowError; use scylla::statement::prepared::PreparedStatement; use std::mem; use std::sync::Arc; @@ -68,7 +65,7 @@ pub(crate) async fn new( } } State::Copy => { - if table_to_index(&db, &index) + if table_to_index(&db, &db_index, &index) .await .unwrap_or_else(|err| { warn!("monitor_items: unable to copy data from table to index: {err}"); @@ -95,7 +92,6 @@ enum State { struct Db { session: Arc, - st_get_items: PreparedStatement, st_reset_items: PreparedStatement, st_update_items: PreparedStatement, } @@ -103,10 +99,6 @@ struct Db { impl Db { async fn new(session: Arc, metadata: IndexMetadata) -> anyhow::Result { Ok(Self { - st_get_items: session - .prepare(Self::get_items_query(&metadata)) - .await - .context("get_items_query")?, st_reset_items: session .prepare(Self::reset_items_query(&metadata)) .await @@ -119,30 +111,6 @@ impl Db { }) } - fn get_items_query(metadata: &IndexMetadata) -> String { - format!( - " - SELECT {}, {} - FROM {}.{} - WHERE processed = FALSE - ", - metadata.key_name.0, - metadata.target_name.0, - metadata.keyspace_name.0, - metadata.table_name.0 - ) - } - async fn get_items( - &self, - ) -> anyhow::Result>> { - Ok(self - .session - .execute_iter(self.st_get_items.clone(), ()) - .await? - .rows_stream::<(i64, Vec)>()? - .map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into()))) - } - fn reset_items_query(metadata: &IndexMetadata) -> String { format!( " @@ -199,8 +167,12 @@ async fn reset_items(db: &Arc, db_index: &Sender) -> anyhow::Result } /// Get new embeddings from db and add to the index. Then mark embeddings in db as processed -async fn table_to_index(db: &Arc, index: &Sender) -> anyhow::Result { - let mut rows = db.get_items().await?; +async fn table_to_index( + db: &Arc, + db_index: &Sender, + index: &Sender, +) -> anyhow::Result { + let mut rows = db_index.get_items().await?; // The value was taken from initial benchmarks const PROCESSED_CHUNK_SIZE: usize = 100;