Skip to content

Commit b5f1c70

Browse files
committed
Move a monitor_items::Db::get_items into a db index actor
This is a part of #3. The change moves get_items method from monitor_tems into a db index actor. Next paches will move other methods into a db index actor.
1 parent 1561282 commit b5f1c70

File tree

2 files changed

+63
-35
lines changed

2 files changed

+63
-35
lines changed

src/db_index.rs

+56
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55

66
use crate::ColumnName;
7+
use crate::Embeddings;
78
use crate::IndexMetadata;
89
use crate::Key;
910
use crate::TableName;
@@ -25,12 +26,23 @@ pub(crate) enum DbIndex {
2526
GetProcessedIds {
2627
tx: oneshot::Sender<anyhow::Result<BoxStream<'static, Result<Key, NextRowError>>>>,
2728
},
29+
30+
GetItems {
31+
#[allow(clippy::type_complexity)]
32+
tx: oneshot::Sender<
33+
anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>>,
34+
>,
35+
},
2836
}
2937

3038
pub(crate) trait DbIndexExt {
3139
async fn get_processed_ids(
3240
&self,
3341
) -> anyhow::Result<BoxStream<'static, Result<Key, NextRowError>>>;
42+
43+
async fn get_items(
44+
&self,
45+
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>>;
3446
}
3547

3648
impl DbIndexExt for mpsc::Sender<DbIndex> {
@@ -41,6 +53,14 @@ impl DbIndexExt for mpsc::Sender<DbIndex> {
4153
self.send(DbIndex::GetProcessedIds { tx }).await?;
4254
rx.await?
4355
}
56+
57+
async fn get_items(
58+
&self,
59+
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>> {
60+
let (tx, rx) = oneshot::channel();
61+
self.send(DbIndex::GetItems { tx }).await?;
62+
rx.await?
63+
}
4464
}
4565

4666
pub(crate) async fn new(
@@ -67,12 +87,17 @@ async fn process(statements: Arc<Statements>, msg: DbIndex) {
6787
.unwrap_or_else(|_| {
6888
warn!("db_index::process: Db::GetProcessedIds: unable to send response")
6989
}),
90+
91+
DbIndex::GetItems { tx } => tx
92+
.send(statements.get_items().await)
93+
.unwrap_or_else(|_| warn!("db_index::process: Db::GetItems: unable to send response")),
7094
}
7195
}
7296

7397
struct Statements {
7498
session: Arc<Session>,
7599
st_get_processed_ids: PreparedStatement,
100+
st_get_items: PreparedStatement,
76101
}
77102

78103
impl Statements {
@@ -86,6 +111,15 @@ impl Statements {
86111
.await
87112
.context("get_processed_ids_query")?,
88113

114+
st_get_items: session
115+
.prepare(Self::get_items_query(
116+
&metadata.table_name,
117+
&metadata.key_name,
118+
&metadata.target_name,
119+
))
120+
.await
121+
.context("get_items_query")?,
122+
89123
session,
90124
})
91125
}
@@ -112,4 +146,26 @@ impl Statements {
112146
.map_ok(|(key,)| (key as u64).into())
113147
.boxed())
114148
}
149+
150+
fn get_items_query(table: &TableName, col_id: &ColumnName, col_emb: &ColumnName) -> String {
151+
format!(
152+
"
153+
SELECT {col_id}, {col_emb}
154+
FROM {table}
155+
WHERE processed = FALSE
156+
"
157+
)
158+
}
159+
160+
async fn get_items(
161+
&self,
162+
) -> anyhow::Result<BoxStream<'static, Result<(Key, Embeddings), NextRowError>>> {
163+
Ok(self
164+
.session
165+
.execute_iter(self.st_get_items.clone(), ())
166+
.await?
167+
.rows_stream::<(i64, Vec<f32>)>()?
168+
.map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into()))
169+
.boxed())
170+
}
115171
}

src/monitor_items.rs

+7-35
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,15 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6-
use crate::Embeddings;
76
use crate::IndexMetadata;
87
use crate::Key;
98
use crate::db_index::DbIndex;
109
use crate::db_index::DbIndexExt;
1110
use crate::index::Index;
1211
use crate::index::IndexExt;
1312
use anyhow::Context;
14-
use futures::Stream;
1513
use futures::TryStreamExt;
1614
use scylla::client::session::Session;
17-
use scylla::errors::NextRowError;
1815
use scylla::statement::prepared::PreparedStatement;
1916
use std::mem;
2017
use std::sync::Arc;
@@ -68,7 +65,7 @@ pub(crate) async fn new(
6865
}
6966
}
7067
State::Copy => {
71-
if table_to_index(&db, &index)
68+
if table_to_index(&db, &db_index, &index)
7269
.await
7370
.unwrap_or_else(|err| {
7471
warn!("monitor_items: unable to copy data from table to index: {err}");
@@ -95,18 +92,13 @@ enum State {
9592

9693
struct Db {
9794
session: Arc<Session>,
98-
st_get_items: PreparedStatement,
9995
st_reset_items: PreparedStatement,
10096
st_update_items: PreparedStatement,
10197
}
10298

10399
impl Db {
104100
async fn new(session: Arc<Session>, metadata: IndexMetadata) -> anyhow::Result<Self> {
105101
Ok(Self {
106-
st_get_items: session
107-
.prepare(Self::get_items_query(&metadata))
108-
.await
109-
.context("get_items_query")?,
110102
st_reset_items: session
111103
.prepare(Self::reset_items_query(&metadata))
112104
.await
@@ -119,30 +111,6 @@ impl Db {
119111
})
120112
}
121113

122-
fn get_items_query(metadata: &IndexMetadata) -> String {
123-
format!(
124-
"
125-
SELECT {}, {}
126-
FROM {}.{}
127-
WHERE processed = FALSE
128-
",
129-
metadata.key_name.0,
130-
metadata.target_name.0,
131-
metadata.keyspace_name.0,
132-
metadata.table_name.0
133-
)
134-
}
135-
async fn get_items(
136-
&self,
137-
) -> anyhow::Result<impl Stream<Item = Result<(Key, Embeddings), NextRowError>>> {
138-
Ok(self
139-
.session
140-
.execute_iter(self.st_get_items.clone(), ())
141-
.await?
142-
.rows_stream::<(i64, Vec<f32>)>()?
143-
.map_ok(|(key, embeddings)| ((key as u64).into(), embeddings.into())))
144-
}
145-
146114
fn reset_items_query(metadata: &IndexMetadata) -> String {
147115
format!(
148116
"
@@ -199,8 +167,12 @@ async fn reset_items(db: &Arc<Db>, db_index: &Sender<DbIndex>) -> anyhow::Result
199167
}
200168

201169
/// Get new embeddings from db and add to the index. Then mark embeddings in db as processed
202-
async fn table_to_index(db: &Arc<Db>, index: &Sender<Index>) -> anyhow::Result<bool> {
203-
let mut rows = db.get_items().await?;
170+
async fn table_to_index(
171+
db: &Arc<Db>,
172+
db_index: &Sender<DbIndex>,
173+
index: &Sender<Index>,
174+
) -> anyhow::Result<bool> {
175+
let mut rows = db_index.get_items().await?;
204176

205177
// The value was taken from initial benchmarks
206178
const PROCESSED_CHUNK_SIZE: usize = 100;

0 commit comments

Comments
 (0)