Skip to content

Move a monitor_items::Db::get_items into a db index actor #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/db_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

use crate::ColumnName;
use crate::Embeddings;
use crate::IndexMetadata;
use crate::Key;
use crate::TableName;
Expand All @@ -25,12 +26,23 @@ pub(crate) enum DbIndex {
GetProcessedIds {
tx: oneshot::Sender<anyhow::Result<BoxStream<'static, Result<Key, NextRowError>>>>,
},

GetItems {
#[allow(clippy::type_complexity)]
tx: oneshot::Sender<
anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>>,
>,
},
}

pub(crate) trait DbIndexExt {
async fn get_processed_ids(
&self,
) -> anyhow::Result<BoxStream<'static, Result<Key, NextRowError>>>;

async fn get_items(
&self,
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>>;
}

impl DbIndexExt for mpsc::Sender<DbIndex> {
Expand All @@ -41,6 +53,14 @@ impl DbIndexExt for mpsc::Sender<DbIndex> {
self.send(DbIndex::GetProcessedIds { tx }).await?;
rx.await?
}

async fn get_items(
&self,
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>> {
let (tx, rx) = oneshot::channel();
self.send(DbIndex::GetItems { tx }).await?;
rx.await?
}
}

pub(crate) async fn new(
Expand All @@ -67,12 +87,17 @@ async fn process(statements: Arc<Statements>, msg: DbIndex) {
.unwrap_or_else(|_| {
warn!("db_index::process: Db::GetProcessedIds: unable to send response")
}),

DbIndex::GetItems { tx } => tx
.send(statements.get_items().await)
.unwrap_or_else(|_| warn!("db_index::process: Db::GetItems: unable to send response")),
}
}

struct Statements {
session: Arc<Session>,
st_get_processed_ids: PreparedStatement,
st_get_items: PreparedStatement,
}

impl Statements {
Expand All @@ -86,6 +111,15 @@ impl Statements {
.await
.context("get_processed_ids_query")?,

st_get_items: session
.prepare(Self::get_items_query(
&metadata.table_name,
&metadata.key_name,
&metadata.target_name,
))
.await
.context("get_items_query")?,

session,
})
}
Expand All @@ -112,4 +146,26 @@ impl Statements {
.map_ok(|(key,)| (key as u64).into())
.boxed())
}

fn get_items_query(table: &TableName, col_id: &ColumnName, col_emb: &ColumnName) -> String {
format!(
"
SELECT {col_id}, {col_emb}
FROM {table}
WHERE processed = FALSE
"
)
}

async fn get_items(
&self,
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>> {
Ok(self
.session
.execute_iter(self.st_get_items.clone(), ())
.await?
.rows_stream::<(i64, Vec<f32>)>()?
.map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into()))
.boxed())
}
}
42 changes: 7 additions & 35 deletions src/monitor_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/

use crate::Embeddings;
use crate::IndexMetadata;
use crate::Key;
use crate::db_index::DbIndex;
use crate::db_index::DbIndexExt;
use crate::index::Index;
use crate::index::IndexExt;
use anyhow::Context;
use futures::Stream;
use futures::TryStreamExt;
use scylla::client::session::Session;
use scylla::errors::NextRowError;
use scylla::statement::prepared::PreparedStatement;
use std::mem;
use std::sync::Arc;
Expand Down Expand Up @@ -68,7 +65,7 @@ pub(crate) async fn new(
}
}
State::Copy => {
if table_to_index(&db, &index)
if table_to_index(&db, &db_index, &index)
.await
.unwrap_or_else(|err| {
warn!("monitor_items: unable to copy data from table to index: {err}");
Expand All @@ -95,18 +92,13 @@ enum State {

struct Db {
session: Arc<Session>,
st_get_items: PreparedStatement,
st_reset_items: PreparedStatement,
st_update_items: PreparedStatement,
}

impl Db {
async fn new(session: Arc<Session>, metadata: IndexMetadata) -> anyhow::Result<Self> {
Ok(Self {
st_get_items: session
.prepare(Self::get_items_query(&metadata))
.await
.context("get_items_query")?,
st_reset_items: session
.prepare(Self::reset_items_query(&metadata))
.await
Expand All @@ -119,30 +111,6 @@ impl Db {
})
}

fn get_items_query(metadata: &IndexMetadata) -> String {
format!(
"
SELECT {}, {}
FROM {}.{}
WHERE processed = FALSE
",
metadata.key_name.0,
metadata.target_name.0,
metadata.keyspace_name.0,
metadata.table_name.0
)
}
async fn get_items(
&self,
) -> anyhow::Result<impl Stream<Item = Result<(Key, Embeddings), NextRowError>>> {
Ok(self
.session
.execute_iter(self.st_get_items.clone(), ())
.await?
.rows_stream::<(i64, Vec<f32>)>()?
.map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into())))
}

fn reset_items_query(metadata: &IndexMetadata) -> String {
format!(
"
Expand Down Expand Up @@ -199,8 +167,12 @@ async fn reset_items(db: &Arc<Db>, db_index: &Sender<DbIndex>) -> anyhow::Result
}

/// Get new embeddings from db and add to the index. Then mark embeddings in db as processed
async fn table_to_index(db: &Arc<Db>, index: &Sender<Index>) -> anyhow::Result<bool> {
let mut rows = db.get_items().await?;
async fn table_to_index(
db: &Arc<Db>,
db_index: &Sender<DbIndex>,
index: &Sender<Index>,
) -> anyhow::Result<bool> {
let mut rows = db_index.get_items().await?;

// The value was taken from initial benchmarks
const PROCESSED_CHUNK_SIZE: usize = 100;
Expand Down