Skip to content

Commit 0cd1a6c

Browse files
committed
perf(store): Shard sqlite
1 parent b7c479f commit 0cd1a6c

9 files changed

+343
-51
lines changed

benches/store_bench.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
55
use rand::Rng;
66
use taskbroker::{
77
store::inflight_activation::{
8-
InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig,
8+
InflightActivationShard, InflightActivationStatus, InflightActivationStoreConfig,
99
},
1010
test_utils::{generate_temp_filename, make_activations},
1111
};
@@ -23,9 +23,10 @@ async fn process_activations(num_activations: u32, num_workers: u32) {
2323
generate_temp_filename()
2424
};
2525
let store = Arc::new(
26-
InflightActivationStore::new(
26+
InflightActivationShard::new(
2727
&url,
2828
InflightActivationStoreConfig {
29+
sharding_factor: 8,
2930
max_processing_attempts: 1,
3031
},
3132
)
@@ -80,7 +81,7 @@ fn store_bench(c: &mut Criterion) {
8081
let num_activations: u32 = 4_096;
8182
let num_workers = 64;
8283

83-
c.benchmark_group("bench_InflightActivationStore")
84+
c.benchmark_group("bench_InflightActivationShard")
8485
.sample_size(256)
8586
.throughput(criterion::Throughput::Elements(num_activations.into()))
8687
.bench_function("process_activations", |b| {

src/config.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub struct Config {
7575
/// The path to the sqlite database
7676
pub db_path: String,
7777

78+
/// The number of physical files to shard the database by
79+
pub db_sharding_factor: u8,
80+
7881
/// The maximum number of pending records that can be
7982
/// in the InflightTaskStore (sqlite)
8083
pub max_pending_count: usize,
@@ -115,7 +118,8 @@ impl Default for Config {
115118
kafka_auto_commit_interval_ms: 5000,
116119
kafka_auto_offset_reset: "latest".to_owned(),
117120
kafka_send_timeout_ms: 500,
118-
db_path: "./taskbroker-inflight.sqlite".to_owned(),
121+
db_path: "./taskbroker-inflight".to_owned(),
122+
db_sharding_factor: 8,
119123
max_pending_count: 2048,
120124
max_pending_buffer_count: 128,
121125
max_processing_attempts: 5,
@@ -197,7 +201,7 @@ mod tests {
197201
assert_eq!(config.log_format, LogFormat::Text);
198202
assert_eq!(config.grpc_port, 50051);
199203
assert_eq!(config.kafka_topic, "task-worker");
200-
assert_eq!(config.db_path, "./taskbroker-inflight.sqlite");
204+
assert_eq!(config.db_path, "./taskbroker-inflight");
201205
assert_eq!(config.max_pending_count, 2048);
202206
}
203207

@@ -284,7 +288,7 @@ mod tests {
284288
assert_eq!(config.log_filter, "error");
285289
assert_eq!(config.kafka_topic, "task-worker".to_owned());
286290
assert_eq!(config.kafka_deadletter_topic, "task-worker-dlq".to_owned());
287-
assert_eq!(config.db_path, "./taskbroker-inflight.sqlite".to_owned());
291+
assert_eq!(config.db_path, "./taskbroker-inflight".to_owned());
288292
assert_eq!(config.max_pending_count, 2048);
289293
assert_eq!(config.max_processing_attempts, 5);
290294
assert_eq!(

src/grpc/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use std::time::Instant;
99
use tonic::{Request, Response, Status};
1010

1111
use crate::store::inflight_activation::{
12-
InflightActivation, InflightActivationStatus, InflightActivationStore,
12+
InflightActivation, InflightActivationShard, InflightActivationStatus,
1313
};
1414
use tracing::{error, instrument};
1515

1616
pub struct TaskbrokerServer {
17-
pub store: Arc<InflightActivationStore>,
17+
pub store: Arc<InflightActivationShard>,
1818
}
1919

2020
#[tonic::async_trait]

src/kafka/inflight_activation_writer.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tracing::{debug, instrument};
66

77
use crate::{
88
config::Config,
9-
store::inflight_activation::{InflightActivation, InflightActivationStore},
9+
store::inflight_activation::{InflightActivation, InflightActivationShard},
1010
};
1111

1212
use super::consumer::{
@@ -31,12 +31,12 @@ impl ActivationWriterConfig {
3131

3232
pub struct InflightActivationWriter {
3333
config: ActivationWriterConfig,
34-
store: Arc<InflightActivationStore>,
34+
store: Arc<InflightActivationShard>,
3535
batch: Option<Vec<InflightActivation>>,
3636
}
3737

3838
impl InflightActivationWriter {
39-
pub fn new(store: Arc<InflightActivationStore>, config: ActivationWriterConfig) -> Self {
39+
pub fn new(store: Arc<InflightActivationShard>, config: ActivationWriterConfig) -> Self {
4040
Self {
4141
config,
4242
store,
@@ -87,14 +87,13 @@ impl Reducer for InflightActivationWriter {
8787
.min_by_key(|item| item.timestamp())
8888
.unwrap();
8989

90-
let res = self.store.store(take(&mut self.batch).unwrap()).await?;
90+
let rows_affected = self.store.store(take(&mut self.batch).unwrap()).await?;
9191
metrics::histogram!("consumer.inflight_activation_writer.insert_lag")
9292
.record(lag.num_seconds() as f64);
93-
metrics::counter!("consumer.inflight_activation_writer.stored")
94-
.increment(res.rows_affected);
93+
metrics::counter!("consumer.inflight_activation_writer.stored").increment(rows_affected);
9594
debug!(
9695
"Inserted {:?} entries with max lag: {:?}s",
97-
res.rows_affected,
96+
rows_affected,
9897
lag.num_seconds()
9998
);
10099

src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use taskbroker::logging;
2828
use taskbroker::metrics;
2929
use taskbroker::processing_strategy;
3030
use taskbroker::store::inflight_activation::{
31-
InflightActivationStore, InflightActivationStoreConfig,
31+
InflightActivationShard, InflightActivationStoreConfig,
3232
};
3333
use taskbroker::{Args, get_version};
3434

@@ -57,7 +57,7 @@ async fn main() -> Result<(), Error> {
5757
logging::init(logging::LoggingConfig::from_config(&config));
5858
metrics::init(metrics::MetricsConfig::from_config(&config));
5959
let store = Arc::new(
60-
InflightActivationStore::new(
60+
InflightActivationShard::new(
6161
&config.db_path,
6262
InflightActivationStoreConfig::from_config(&config),
6363
)

0 commit comments

Comments
 (0)