Skip to content

Commit 30c5435

Browse files
Add LIMIT clause to fulltext scans to prevent OOM issues (#202)
1 parent 06c047f commit 30c5435

File tree

5 files changed

+73
-22
lines changed

5 files changed

+73
-22
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ jobs:
2424
format:
2525
name: Check format
2626
runs-on: [runner-amd64-large]
27-
continue-on-error: true
2827
steps:
2928
- name: Install stable toolchain
3029
uses: dtolnay/rust-toolchain@stable
@@ -46,7 +45,6 @@ jobs:
4645
clippy:
4746
name: Check clippy
4847
runs-on: [runner-amd64-large]
49-
continue-on-error: true
5048
steps:
5149
- name: Install stable toolchain
5250
uses: dtolnay/rust-toolchain@stable
@@ -68,7 +66,6 @@ jobs:
6866
test:
6967
name: Check tests
7068
runs-on: [runner-amd64-large]
71-
continue-on-error: true
7269
steps:
7370
- name: Install stable toolchain
7471
uses: dtolnay/rust-toolchain@stable
@@ -97,7 +94,6 @@ jobs:
9794
runner: runner-arm64-large
9895
target: aarch64-unknown-linux-gnu
9996
runs-on: ${{ matrix.runner }}
100-
continue-on-error: true
10197
steps:
10298
- name: Install stable toolchain
10399
uses: dtolnay/rust-toolchain@stable
@@ -124,7 +120,6 @@ jobs:
124120
name: Build Docker image
125121
runs-on: [runner-amd64-large-private]
126122
needs: build
127-
continue-on-error: true
128123
steps:
129124
- name: Checkout sources
130125
uses: actions/checkout@v4
@@ -194,7 +189,6 @@ jobs:
194189
name: Benchmark ${{ matrix.description }}
195190
needs: build
196191
runs-on: [ubuntu-latest]
197-
continue-on-error: true
198192
env:
199193
CRUD_BENCH_BATCHES: >-
200194
[

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ pub(crate) struct Args {
181181
"fields": ["number"]
182182
}
183183
},
184-
{ "name": "where_field_fulltext_single", "samples": 100, "projection": "FULL",
184+
{ "name": "where_field_fulltext_single", "samples": 100, "projection": "FULL", "limit": 1000,
185185
"condition": {
186186
"sql": "to_tsvector('english', words) @@ to_tsquery('english', 'hello')",
187187
"mysql": "MATCH(words) AGAINST('hello' IN NATURAL LANGUAGE MODE)",
@@ -194,7 +194,7 @@ pub(crate) struct Args {
194194
"index_type": "fulltext"
195195
}
196196
},
197-
{ "name": "where_field_fulltext_multi_and", "samples": 100, "projection": "FULL",
197+
{ "name": "where_field_fulltext_multi_and", "samples": 100, "projection": "FULL", "limit": 1000,
198198
"condition": {
199199
"sql": "to_tsvector('english', words) @@ to_tsquery('english', 'hello & world')",
200200
"mysql": "MATCH(words) AGAINST('+hello +world' IN NATURAL LANGUAGE MODE)",
@@ -207,7 +207,7 @@ pub(crate) struct Args {
207207
"index_type": "fulltext"
208208
}
209209
},
210-
{ "name": "where_field_fulltext_multi_or", "samples": 100, "projection": "FULL",
210+
{ "name": "where_field_fulltext_multi_or", "samples": 100, "projection": "FULL", "limit": 1000,
211211
"condition": {
212212
"sql": "to_tsvector('english', words) @@ to_tsquery('english', 'foo | bar')",
213213
"mysql": "MATCH(words) AGAINST('foo bar' IN NATURAL LANGUAGE MODE)",

src/neo4j.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl Neo4jClient {
238238
Projection::Id => {
239239
let stm = match fts {
240240
true => format!(
241-
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r {s} {l} RETURN r.id"
241+
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r WITH r {s} {l} RETURN r.id"
242242
),
243243
false => format!("MATCH (r) {c} {s} {l} RETURN r.id"),
244244
};
@@ -253,7 +253,7 @@ impl Neo4jClient {
253253
Projection::Full => {
254254
let stm = match fts {
255255
true => format!(
256-
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r {s} {l} RETURN r"
256+
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r WITH r {s} {l} RETURN r"
257257
),
258258
false => format!("MATCH (r) {c} {s} {l} RETURN r"),
259259
};
@@ -268,7 +268,7 @@ impl Neo4jClient {
268268
Projection::Count => {
269269
let stm = match fts {
270270
true => format!(
271-
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r {s} {l} RETURN count(r) as count"
271+
"CALL db.index.fulltext.queryNodes('{n}', '{c}') YIELD node as r WITH r {s} {l} RETURN count(r) as count"
272272
),
273273
false => format!("MATCH (r) {c} {s} {l} RETURN count(r) as count"),
274274
};

src/sqlite.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use std::cmp::max;
1313
use std::hint::black_box;
1414
use std::sync::Arc;
1515
use std::time::Duration;
16-
use tokio_rusqlite::Connection;
1716
use tokio_rusqlite::types::ToSqlOutput;
1817
use tokio_rusqlite::types::Value;
18+
use tokio_rusqlite::{Connection, rusqlite};
1919

2020
const DATABASE_DIR: &str = "sqlite";
2121

@@ -265,7 +265,7 @@ impl BenchmarkClient for SqliteClient {
265265

266266
impl SqliteClient {
267267
async fn execute_batch(&self, query: Cow<'static, str>) -> Result<()> {
268-
self.conn.call(move |conn| conn.execute_batch(query.as_ref()).map_err(Into::into)).await?;
268+
self.conn.call(move |conn| conn.execute_batch(query.as_ref())).await?;
269269
Ok(())
270270
}
271271

@@ -275,7 +275,7 @@ impl SqliteClient {
275275
params: ToSqlOutput<'static>,
276276
) -> Result<usize> {
277277
self.conn
278-
.call(move |conn| conn.execute(query.as_ref(), [&params]).map_err(Into::into))
278+
.call(move |conn| conn.execute(query.as_ref(), [&params]))
279279
.await
280280
.map_err(Into::into)
281281
}
@@ -286,7 +286,7 @@ impl SqliteClient {
286286
params: Option<ToSqlOutput<'static>>,
287287
) -> Result<Vec<Row>> {
288288
self.conn
289-
.call(move |conn| {
289+
.call(move |conn| -> rusqlite::Result<Vec<Row>> {
290290
let mut stmt = conn.prepare(stmt.as_ref())?;
291291
let mut rows = match params {
292292
Some(params) => stmt.query([&params])?,
@@ -424,7 +424,7 @@ impl SqliteClient {
424424
let conn = self.conn.clone();
425425
let column_defs = self.columns.clone();
426426

427-
conn.call(move |conn| {
427+
conn.call(move |conn| -> rusqlite::Result<()> {
428428
// Store the records to insert
429429
let mut inserts = Vec::with_capacity(key_vals.len());
430430
// Store all parameter values as owned types
@@ -482,7 +482,7 @@ impl SqliteClient {
482482
{
483483
let conn = self.conn.clone();
484484

485-
conn.call(move |conn| {
485+
conn.call(move |conn| -> rusqlite::Result<()> {
486486
// Build the IN clause with positional parameters
487487
let ids = (1..=keys.len()).map(|i| format!("${i}")).collect::<Vec<String>>().join(", ");
488488

@@ -540,7 +540,7 @@ impl SqliteClient {
540540
let conn = self.conn.clone();
541541
let column_defs = self.columns.clone();
542542

543-
conn.call(move |conn| {
543+
conn.call(move |conn| -> rusqlite::Result<()> {
544544
// For SQLite, we'll use multiple UPDATE statements in a transaction
545545
// since SQLite doesn't support UPDATE FROM as elegantly as PostgreSQL
546546

@@ -609,7 +609,7 @@ impl SqliteClient {
609609
{
610610
let conn = self.conn.clone();
611611

612-
conn.call(move |conn| {
612+
conn.call(move |conn| -> rusqlite::Result<()> {
613613
// Build the IN clause with positional parameters
614614
let ids = (1..=keys.len()).map(|i| format!("${i}")).collect::<Vec<String>>().join(", ");
615615

src/surrealdb.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::memory::Config as MemoryConfig;
99
use crate::valueprovider::Columns;
1010
use crate::{Benchmark, Index, KeyType, Projection, Scan};
1111
use anyhow::{Result, bail};
12+
use log::warn;
1213
use serde_json::Value;
1314
use std::time::Duration;
1415
use surrealdb::Surreal;
@@ -258,12 +259,68 @@ impl BenchmarkClient for SurrealDBClient {
258259
}
259260

260261
async fn drop_index(&self, name: &str) -> Result<()> {
262+
// Retry helper closure for handling transient "Resource busy" errors.
263+
//
264+
// ## Why Retry is Necessary
265+
//
266+
// After intensive concurrent scan operations (e.g., 12 clients × 48 threads = 576 tasks),
267+
// each scan creates a READ transaction in SurrealDB that holds a RocksDB snapshot.
268+
// These snapshots capture the database state and provide MVCC (Multi-Version Concurrency Control).
269+
//
270+
// When REMOVE INDEX executes immediately after scans complete:
271+
// 1. Client-side: All Rust futures have finished (via try_join_all)
272+
// 2. Server-side: SurrealDB/RocksDB may still have:
273+
// - Active transaction objects not yet fully released
274+
// - Snapshot references held in memory
275+
// - Deferred cleanup operations in progress
276+
//
277+
// ## The Conflict
278+
//
279+
// REMOVE INDEX runs in a WRITE transaction that needs to:
280+
// - Delete index metadata keys (del_tb_index)
281+
// - Update table definition (put_tb)
282+
// - Clear caches
283+
//
284+
// RocksDB's optimistic transaction engine detects conflicts between:
285+
// - Active READ snapshots from completed scan operations
286+
// - WRITE transaction from REMOVE INDEX trying to modify metadata
287+
//
288+
// This results in: "The query was not executed due to a failed transaction. Resource busy:"
289+
//
290+
// ## Why Retry Works
291+
//
292+
// The error is transient. As transaction objects are dropped and snapshots released,
293+
// the metadata locks become available. The 500ms sleep allows sufficient time for:
294+
// - Async transaction cleanup to complete
295+
// - RocksDB to release internal snapshot references
296+
// - Memory management to finalize deferred operations
297+
//
298+
// Since REMOVE INDEX IF EXISTS is idempotent, retrying is safe and appropriate
299+
// for benchmark scenarios where the goal is reliable completion rather than
300+
// immediate failure on transient resource contention.
301+
let retry = |sql: String| async move {
302+
loop {
303+
match self.db.query(&sql).await?.check() {
304+
Ok(_) => return Ok(()),
305+
Err(e) => {
306+
if e.to_string().eq(
307+
"The query was not executed due to a failed transaction. Resource busy: ",
308+
) {
309+
warn!("Retrying {sql} due to {e}");
310+
sleep(Duration::from_millis(500)).await;
311+
} else {
312+
return Err(e);
313+
}
314+
}
315+
}
316+
}
317+
};
261318
// Remove the index
262319
let sql = format!("REMOVE INDEX IF EXISTS {name} ON TABLE record");
263-
self.db.query(sql).await?.check()?;
320+
retry(sql).await?;
264321
// Remove the analyzer
265322
let sql = format!("REMOVE ANALYZER IF EXISTS {name}");
266-
self.db.query(sql).await?.check()?;
323+
retry(sql).await?;
267324
// All ok
268325
Ok(())
269326
}

0 commit comments

Comments
 (0)