Skip to content

Commit 43628b4

Browse files
authored
feat: multi-thread reads and batched writes (#56)
Write batching and read multi-threading improves throughput by 5-18x, depending on workload type.
1 parent 3ef711a commit 43628b4

File tree

11 files changed

+593
-291
lines changed

11 files changed

+593
-291
lines changed

Cargo.lock

Lines changed: 1 addition & 97 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ env_logger = "0.10.0"
3131
futures = "0.3.28"
3232
hex = "0.4"
3333
hyper = { version = "0.14", features = ["client"] }
34-
hyper-proxy = "0.9.1"
34+
hyper-proxy = { version = "0.9.1", default-features = false }
3535
log = "0.4.20"
3636
num-bigint = "0.4"
3737
prost = "0.11"
@@ -44,6 +44,7 @@ serde_json = "1.0.107"
4444
tempfile = "3"
4545
thiserror = "1"
4646
tokio = { version = "1.33.0", features = ["full"] }
47+
tokio-stream = "0.1"
4748
tokio-util = "0.7"
4849
url = "2"
4950
uuid = { version = "1.4.1", features = ["v4", "serde"] }

denokv/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ authors.workspace = true
1111
path = "main.rs"
1212
name = "denokv"
1313

14+
[features]
15+
default = ["bundled-sqlite"]
16+
bundled-sqlite = ["rusqlite/bundled"]
17+
1418
[dependencies]
1519
anyhow.workspace = true
1620
aws-config.workspace = true

denokv/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ pub struct ServeOptions {
3939
#[clap(long, conflicts_with = "read_only")]
4040
pub sync_from_s3: bool,
4141

42+
/// Atomic write batch timeout. Batching is disabled if this is not set.
43+
#[clap(long, env = "DENO_KV_ATOMIC_WRITE_BATCH_TIMEOUT_MS")]
44+
pub atomic_write_batch_timeout_ms: Option<u64>,
45+
46+
/// Number of worker threads.
47+
#[clap(long, env = "DENO_KV_NUM_WORKERS", default_value = "1")]
48+
pub num_workers: usize,
49+
4250
#[command(flatten)]
4351
pub replica: ReplicaOptions,
4452
}

denokv/main.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use denokv_proto::SnapshotReadOptions;
4747
use denokv_sqlite::Connection;
4848
use denokv_sqlite::Sqlite;
4949
use denokv_sqlite::SqliteBackendError;
50+
use denokv_sqlite::SqliteConfig;
5051
use denokv_sqlite::SqliteNotifier;
5152
use denokv_timemachine::backup_source_s3::DatabaseBackupSourceS3;
5253
use denokv_timemachine::backup_source_s3::DatabaseBackupSourceS3Config;
@@ -212,11 +213,18 @@ async fn run_serve(
212213

213214
let path = Path::new(&config.sqlite_path);
214215
let read_only = options.read_only || options.sync_from_s3;
215-
let (sqlite, path) = open_sqlite(path, read_only)?;
216+
let sqlite_config = SqliteConfig {
217+
batch_timeout: options
218+
.atomic_write_batch_timeout_ms
219+
.map(std::time::Duration::from_millis),
220+
num_workers: options.num_workers,
221+
};
222+
let sqlite = open_sqlite(path, read_only, sqlite_config.clone())?;
216223
info!(
217-
"Opened{} database at {}",
224+
"Opened{} database at {}. Batch timeout: {:?}",
218225
if read_only { " read only" } else { "" },
219-
path,
226+
path.to_string_lossy(),
227+
sqlite_config.batch_timeout,
220228
);
221229

222230
let access_token = options.access_token.as_str();
@@ -274,7 +282,7 @@ async fn run_sync(
274282
let proxy_uri = https_proxy.parse().unwrap();
275283
let proxy = Proxy::new(Intercept::All, proxy_uri);
276284
let connector = HttpConnector::new();
277-
ProxyConnector::from_proxy(connector, proxy).unwrap()
285+
ProxyConnector::from_proxy_unsecured(connector, proxy)
278286
};
279287
let hyper_client =
280288
aws_smithy_client::hyper_ext::Adapter::builder().build(proxy);
@@ -327,20 +335,27 @@ async fn run_sync(
327335
fn open_sqlite(
328336
path: &Path,
329337
read_only: bool,
330-
) -> Result<(Sqlite, String), anyhow::Error> {
338+
config: SqliteConfig,
339+
) -> Result<Sqlite, anyhow::Error> {
331340
let flags = if read_only {
332341
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX
333342
} else {
334343
OpenFlags::SQLITE_OPEN_READ_WRITE
335344
| OpenFlags::SQLITE_OPEN_CREATE
336345
| OpenFlags::SQLITE_OPEN_NO_MUTEX
337346
};
338-
let conn = Connection::open_with_flags(path, flags)?;
339347
let notifier = SqliteNotifier::default();
340-
let rng: Box<_> = Box::new(rand::rngs::StdRng::from_entropy());
341-
let path = conn.path().unwrap().to_owned();
342-
let sqlite = Sqlite::new(conn, notifier, rng)?;
343-
Ok((sqlite, path))
348+
let sqlite = Sqlite::new(
349+
|| {
350+
Ok((
351+
Connection::open_with_flags(path, flags)?,
352+
Box::new(rand::rngs::StdRng::from_entropy()),
353+
))
354+
},
355+
notifier,
356+
config,
357+
)?;
358+
Ok(sqlite)
344359
}
345360

346361
#[axum::debug_handler]

npm/napi/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
[package]
44
authors = []
55
edition = "2021"
6-
name = "deno-kv-napi"
7-
version = "0.0.0" # never published
6+
name = "deno-kv-napi"
7+
version = "0.0.0" # never published
88

99
[lib]
1010
crate-type = ["cdylib"]
1111

1212
[dependencies]
1313
futures = "0.3.28"
14-
napi = {version="2", features=["async","anyhow"]}
14+
napi = { version = "2", features = ["async", "anyhow"] }
1515
napi-derive = "2"
1616
denokv_sqlite = { path = "../../sqlite" }
1717
denokv_proto = { path = "../../proto" }
@@ -21,7 +21,7 @@ tokio = { version = "1.33.0", features = ["full"] }
2121
anyhow = "1"
2222
prost = "0.11"
2323
once_cell = "1.18.0"
24-
rusqlite = "0.29.0"
24+
rusqlite = { version = "0.29.0", features = ["bundled"] }
2525

2626
[build-dependencies]
2727
napi-build = "2"

npm/napi/src/lib.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use denokv_proto::WatchKeyOutput;
1010
use denokv_sqlite::Connection;
1111
use denokv_sqlite::Sqlite;
1212
use denokv_sqlite::SqliteBackendError;
13+
use denokv_sqlite::SqliteConfig;
1314
use denokv_sqlite::SqliteMessageHandle;
1415
use denokv_sqlite::SqliteNotifier;
1516
use futures::stream::BoxStream;
@@ -38,14 +39,11 @@ pub fn open(path: String, in_memory: Option<bool>, debug: bool) -> Result<u32> {
3839
} else {
3940
OpenFlags::default()
4041
};
41-
let conn = Connection::open_with_flags(Path::new(&path), flags)
42-
.map_err(anyhow::Error::from)?;
43-
conn
44-
.pragma_update(None, "journal_mode", "wal")
45-
.map_err(anyhow::Error::from)?;
46-
let rng = Box::new(rand::rngs::StdRng::from_entropy());
47-
48-
let opened_path = conn.path().map(|p| p.to_string());
42+
43+
let opened_path = Connection::open_with_flags(Path::new(&path), flags)
44+
.map_err(anyhow::Error::from)?
45+
.path()
46+
.map(|p| p.to_string());
4947
let notifier = match &opened_path {
5048
Some(opened_path) if !opened_path.is_empty() => NOTIFIERS_MAP
5149
.lock()
@@ -56,7 +54,20 @@ pub fn open(path: String, in_memory: Option<bool>, debug: bool) -> Result<u32> {
5654
_ => SqliteNotifier::default(),
5755
};
5856

59-
let sqlite = Sqlite::new(conn, notifier, rng)?;
57+
let sqlite = Sqlite::new(
58+
|| {
59+
Ok((
60+
Connection::open_with_flags(Path::new(&path), flags)
61+
.map_err(anyhow::Error::from)?,
62+
Box::new(rand::rngs::StdRng::from_entropy()),
63+
))
64+
},
65+
notifier,
66+
SqliteConfig {
67+
num_workers: 1,
68+
batch_timeout: None,
69+
},
70+
)?;
6071

6172
let db_id = DB_ID.fetch_add(1, Ordering::Relaxed);
6273
DBS.lock().unwrap().insert(db_id, sqlite);

0 commit comments

Comments
 (0)