Skip to content

Commit 9409b40

Browse files
committed
Move a modify_indexes::Db::update_items_count into a db actor
This is a part of #3. The change moves update_items_count method from modify_indexes into a db actor. Next paches will move other methods into a db actor.
1 parent 05f8f91 commit 9409b40

File tree

4 files changed

+69
-60
lines changed

4 files changed

+69
-60
lines changed

src/db.rs

+51
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::Dimensions;
99
use crate::ExpansionAdd;
1010
use crate::ExpansionSearch;
1111
use crate::IndexId;
12+
use crate::IndexItemsCount;
1213
use crate::IndexMetadata;
1314
use crate::IndexVersion;
1415
use crate::KeyspaceName;
@@ -64,6 +65,11 @@ pub(crate) enum Db {
6465
#[allow(clippy::type_complexity)]
6566
tx: oneshot::Sender<anyhow::Result<Option<(Connectivity, ExpansionAdd, ExpansionSearch)>>>,
6667
},
68+
69+
UpdateItemsCount {
70+
id: IndexId,
71+
items_count: IndexItemsCount,
72+
},
6773
}
6874

6975
pub(crate) trait DbExt {
@@ -90,6 +96,12 @@ pub(crate) trait DbExt {
9096
&self,
9197
id: IndexId,
9298
) -> anyhow::Result<Option<(Connectivity, ExpansionAdd, ExpansionSearch)>>;
99+
100+
async fn update_items_count(
101+
&self,
102+
id: IndexId,
103+
items_count: IndexItemsCount,
104+
) -> anyhow::Result<()>;
93105
}
94106

95107
impl DbExt for mpsc::Sender<Db> {
@@ -151,6 +163,15 @@ impl DbExt for mpsc::Sender<Db> {
151163
self.send(Db::GetIndexParams { id, tx }).await?;
152164
rx.await?
153165
}
166+
167+
async fn update_items_count(
168+
&self,
169+
id: IndexId,
170+
items_count: IndexItemsCount,
171+
) -> anyhow::Result<()> {
172+
self.send(Db::UpdateItemsCount { id, items_count }).await?;
173+
Ok(())
174+
}
154175
}
155176

156177
pub(crate) async fn new(db_session: Arc<Session>) -> anyhow::Result<mpsc::Sender<Db>> {
@@ -207,6 +228,13 @@ async fn process(statements: Arc<Statements>, msg: Db) {
207228
Db::GetIndexParams { id, tx } => tx
208229
.send(statements.get_index_params(id).await)
209230
.unwrap_or_else(|_| warn!("db::process: Db::GetIndexParams: unable to send response")),
231+
232+
Db::UpdateItemsCount { id, items_count } => {
233+
statements
234+
.update_items_count(id, items_count)
235+
.await
236+
.unwrap_or_else(|err| warn!("db::process: Db::UpdateItemsCount: {err}"));
237+
}
210238
}
211239
}
212240

@@ -232,6 +260,7 @@ struct Statements {
232260
st_get_index_target_type: PreparedStatement,
233261
re_get_index_target_type: Regex,
234262
st_get_index_params: PreparedStatement,
263+
st_update_items_count: PreparedStatement,
235264
}
236265

237266
impl Statements {
@@ -265,6 +294,11 @@ impl Statements {
265294
.await
266295
.context("ST_GET_INDEX_PARAMS")?,
267296

297+
st_update_items_count: session
298+
.prepare(Self::ST_UPDATE_ITEMS_COUNT)
299+
.await
300+
.context("ST_UPDATE_ITEMS_COUNT")?,
301+
268302
session,
269303
})
270304
}
@@ -410,4 +444,21 @@ impl Statements {
410444
.try_next()
411445
.await?)
412446
}
447+
448+
const ST_UPDATE_ITEMS_COUNT: &str = "
449+
UPDATE vector_benchmark.vector_indexes
450+
SET indexed_elements_count = ?
451+
WHERE id = ?
452+
";
453+
454+
async fn update_items_count(
455+
&self,
456+
id: IndexId,
457+
items_count: IndexItemsCount,
458+
) -> anyhow::Result<()> {
459+
self.session
460+
.execute_unpaged(&self.st_update_items_count, (items_count, id))
461+
.await?;
462+
Ok(())
463+
}
413464
}

src/engine.rs

-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ pub(crate) async fn new(
106106

107107
let Ok(index_actor) = index::new(
108108
id.clone(),
109-
modify_actor.clone(),
110109
db.clone(),
111110
metadata.dimensions,
112111
metadata.connectivity,

src/index.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6+
use crate::db::Db;
7+
use crate::db::DbExt;
68
use crate::Connectivity;
79
use crate::Dimensions;
810
use crate::Distance;
@@ -13,25 +15,22 @@ use crate::IndexId;
1315
use crate::IndexItemsCount;
1416
use crate::Key;
1517
use crate::Limit;
16-
use crate::db::Db;
17-
use crate::modify_indexes::ModifyIndexes;
18-
use crate::modify_indexes::ModifyIndexesExt;
1918
use anyhow::anyhow;
2019
use std::num::NonZeroUsize;
21-
use std::sync::Arc;
22-
use std::sync::RwLock;
2320
use std::sync::atomic::AtomicU32;
2421
use std::sync::atomic::AtomicUsize;
2522
use std::sync::atomic::Ordering;
23+
use std::sync::Arc;
24+
use std::sync::RwLock;
2625
use tokio::sync::mpsc;
2726
use tokio::sync::oneshot;
2827
use tokio::time;
29-
use tracing::Instrument;
3028
use tracing::debug;
3129
use tracing::debug_span;
3230
use tracing::error;
3331
use tracing::info;
3432
use tracing::warn;
33+
use tracing::Instrument;
3534
use usearch::IndexOptions;
3635
use usearch::ScalarKind;
3736

@@ -89,8 +88,7 @@ impl IndexExt for mpsc::Sender<Index> {
8988

9089
pub(crate) fn new(
9190
id: IndexId,
92-
modify_actor: mpsc::Sender<ModifyIndexes>,
93-
_db: mpsc::Sender<Db>,
91+
db: mpsc::Sender<Db>,
9492
dimensions: Dimensions,
9593
connectivity: Connectivity,
9694
expansion_add: ExpansionAdd,
@@ -119,9 +117,11 @@ pub(crate) fn new(
119117
async move {
120118
let mut items_count_db = IndexItemsCount(0);
121119
let items_count = Arc::new(AtomicU32::new(0));
122-
modify_actor
123-
.update_items_count(id.clone(), items_count_db)
124-
.await;
120+
121+
db.update_items_count(id.clone(), items_count_db)
122+
.await
123+
.unwrap_or_else(|err| warn!("index::new: unable update items count: {err}"));
124+
125125
let mut housekeeping_interval = time::interval(time::Duration::from_secs(1));
126126
let idx_lock = Arc::new(RwLock::new(()));
127127
let counter_add = Arc::new(AtomicUsize::new(0));
@@ -131,7 +131,7 @@ pub(crate) fn new(
131131
tokio::select! {
132132
_ = housekeeping_interval.tick() => {
133133
housekeeping(
134-
&modify_actor,
134+
&db,
135135
id.clone(),
136136
&mut items_count_db,
137137
&items_count,
@@ -181,7 +181,7 @@ pub(crate) fn new(
181181
}
182182

183183
async fn housekeeping(
184-
modify_actor: &mpsc::Sender<ModifyIndexes>,
184+
db: &mpsc::Sender<Db>,
185185
id: IndexId,
186186
items_count_db: &mut IndexItemsCount,
187187
items_count: &AtomicU32,
@@ -200,7 +200,9 @@ async fn housekeeping(
200200
if items != items_count_db.0 {
201201
debug!("housekeeping update items count: {items_count_db}",);
202202
items_count_db.0 = items;
203-
modify_actor.update_items_count(id, *items_count_db).await;
203+
db.update_items_count(id, *items_count_db)
204+
.await
205+
.unwrap_or_else(|err| warn!("index::housekeeping: unable update items count: {err}"));
204206
}
205207
}
206208

src/modify_indexes.rs

+2-45
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6-
use crate::IndexId;
7-
use crate::IndexItemsCount;
86
use crate::db;
7+
use crate::IndexId;
98
use anyhow::Context;
109
use scylla::client::session::Session;
1110
use scylla::statement::prepared::PreparedStatement;
@@ -15,29 +14,14 @@ use tokio::sync::mpsc::Sender;
1514
use tracing::warn;
1615

1716
pub(crate) enum ModifyIndexes {
18-
UpdateItemsCount {
19-
id: IndexId,
20-
items_count: IndexItemsCount,
21-
},
22-
Del {
23-
id: IndexId,
24-
},
17+
Del { id: IndexId },
2518
}
2619

2720
pub(crate) trait ModifyIndexesExt {
28-
async fn update_items_count(&self, id: IndexId, items_count: IndexItemsCount);
2921
async fn del(&self, id: IndexId);
3022
}
3123

3224
impl ModifyIndexesExt for Sender<ModifyIndexes> {
33-
async fn update_items_count(&self, id: IndexId, items_count: IndexItemsCount) {
34-
self.send(ModifyIndexes::UpdateItemsCount { id, items_count })
35-
.await
36-
.unwrap_or_else(|err| {
37-
warn!("ModifyIndexesExt::update_items_count: unable to send request: {err}")
38-
});
39-
}
40-
4125
async fn del(&self, id: IndexId) {
4226
self.send(ModifyIndexes::Del { id })
4327
.await
@@ -58,12 +42,6 @@ pub(crate) async fn new(
5842
tokio::spawn(async move {
5943
while let Some(msg) = rx.recv().await {
6044
match msg {
61-
ModifyIndexes::UpdateItemsCount { id, items_count } => db
62-
.update_items_count(id, items_count)
63-
.await
64-
.unwrap_or_else(|err| {
65-
warn!("modify_indexes: unable to update items count for index in db: {err}")
66-
}),
6745
ModifyIndexes::Del { id } => db.remove_index(id).await.unwrap_or_else(|err| {
6846
warn!("modify_indexes: unable to remove index from db: {err}")
6947
}),
@@ -75,17 +53,12 @@ pub(crate) async fn new(
7553

7654
struct Db {
7755
session: Arc<Session>,
78-
st_update_items_count: PreparedStatement,
7956
st_remove_index: PreparedStatement,
8057
}
8158

8259
impl Db {
8360
async fn new(session: Arc<Session>) -> anyhow::Result<Self> {
8461
Ok(Self {
85-
st_update_items_count: session
86-
.prepare(Self::UPDATE_ITEMS_COUNT)
87-
.await
88-
.context("UPDATE_ITEMS_COUNT")?,
8962
st_remove_index: session
9063
.prepare(Self::REMOVE_INDEX)
9164
.await
@@ -94,22 +67,6 @@ impl Db {
9467
})
9568
}
9669

97-
const UPDATE_ITEMS_COUNT: &str = "
98-
UPDATE vector_benchmark.vector_indexes
99-
SET indexed_elements_count = ?
100-
WHERE id = ?
101-
";
102-
async fn update_items_count(
103-
&self,
104-
id: IndexId,
105-
items_count: IndexItemsCount,
106-
) -> anyhow::Result<()> {
107-
self.session
108-
.execute_unpaged(&self.st_update_items_count, (items_count, id))
109-
.await?;
110-
Ok(())
111-
}
112-
11370
const REMOVE_INDEX: &str = "DELETE FROM vector_benchmark.vector_indexes WHERE id = ?";
11471
async fn remove_index(&self, id: IndexId) -> anyhow::Result<()> {
11572
self.session

0 commit comments

Comments
 (0)