Skip to content

Commit b6bdad1

Browse files
[storage/qmdb/current] current dbs with variable length values (#2819)
1 parent b37c8f2 commit b6bdad1

20 files changed

Lines changed: 3337 additions & 1251 deletions

File tree

storage/src/qmdb/any/ordered/variable.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,21 @@ impl<E: Storage + Clock + Metrics, K: Array, V: VariableValue, H: Hasher, T: Tra
4141
pub async fn init(
4242
context: E,
4343
cfg: VariableConfig<T, <Operation<K, V> as Read>::Cfg>,
44+
) -> Result<Self, Error> {
45+
Self::init_with_callback(context, cfg, None, |_, _| {}).await
46+
}
47+
48+
/// Initialize the DB, invoking `callback` for each operation processed during recovery.
49+
///
50+
/// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
51+
/// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
52+
/// snapshot is built from the log, `callback` is invoked for each operation with its activity
53+
/// status and previous location (if any).
54+
pub(crate) async fn init_with_callback(
55+
context: E,
56+
cfg: VariableConfig<T, <Operation<K, V> as Read>::Cfg>,
57+
known_inactivity_floor: Option<Location>,
58+
callback: impl FnMut(bool, Option<Location>),
4459
) -> Result<Self, Error> {
4560
let mmr_config = MmrConfig {
4661
journal_partition: cfg.mmr_journal_partition,
@@ -75,7 +90,7 @@ impl<E: Storage + Clock + Metrics, K: Array, V: VariableValue, H: Hasher, T: Tra
7590
}
7691

7792
let index = Index::new(context.with_label("index"), cfg.translator);
78-
let log = Self::init_from_log(index, log, None, |_, _| {}).await?;
93+
let log = Self::init_from_log(index, log, known_inactivity_floor, callback).await?;
7994

8095
Ok(log)
8196
}

storage/src/qmdb/any/unordered/variable/mod.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ impl<E: Storage + Clock + Metrics, K: Array, V: VariableValue, H: Hasher, T: Tra
4242
pub async fn init(
4343
context: E,
4444
cfg: VariableConfig<T, <Operation<K, V> as Read>::Cfg>,
45+
) -> Result<Self, Error> {
46+
Self::init_with_callback(context, cfg, None, |_, _| {}).await
47+
}
48+
49+
/// Initialize the DB, invoking `callback` for each operation processed during recovery.
50+
///
51+
/// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
52+
/// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
53+
/// snapshot is built from the log, `callback` is invoked for each operation with its activity
54+
/// status and previous location (if any).
55+
pub(crate) async fn init_with_callback(
56+
context: E,
57+
cfg: VariableConfig<T, <Operation<K, V> as Read>::Cfg>,
58+
known_inactivity_floor: Option<Location>,
59+
callback: impl FnMut(bool, Option<Location>),
4560
) -> Result<Self, Error> {
4661
let mmr_config = MmrConfig {
4762
journal_partition: cfg.mmr_journal_partition,
@@ -76,13 +91,8 @@ impl<E: Storage + Clock + Metrics, K: Array, V: VariableValue, H: Hasher, T: Tra
7691
log.sync().await?;
7792
}
7893

79-
let log = Self::init_from_log(
80-
Index::new(context.with_label("index"), cfg.translator),
81-
log,
82-
None,
83-
|_, _| {},
84-
)
85-
.await?;
94+
let index = Index::new(context.with_label("index"), cfg.translator);
95+
let log = Self::init_from_log(index, log, known_inactivity_floor, callback).await?;
8696

8797
Ok(log)
8898
}

storage/src/qmdb/benches/fixed/generate.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
use crate::fixed::{
55
gen_random_kv, gen_random_kv_batched, get_any_ordered_fixed, get_any_ordered_variable,
66
get_any_unordered_fixed, get_any_unordered_variable, get_current_ordered_fixed,
7-
get_current_unordered_fixed, Digest, Variant, VARIANTS,
7+
get_current_ordered_variable, get_current_unordered_fixed, get_current_unordered_variable,
8+
Digest, Variant, VARIANTS,
89
};
910
use commonware_runtime::{
1011
benchmarks::{context, tokio},
@@ -118,6 +119,32 @@ fn bench_fixed_generate(c: &mut Criterion) {
118119
.await
119120
.unwrap()
120121
}
122+
Variant::CurrentUnorderedVariable => {
123+
let db =
124+
get_current_unordered_variable(ctx.clone()).await;
125+
test_db(
126+
db,
127+
use_batch,
128+
elements,
129+
operations,
130+
commit_frequency,
131+
)
132+
.await
133+
.unwrap()
134+
}
135+
Variant::CurrentOrderedVariable => {
136+
let db =
137+
get_current_ordered_variable(ctx.clone()).await;
138+
test_db(
139+
db,
140+
use_batch,
141+
elements,
142+
operations,
143+
commit_frequency,
144+
)
145+
.await
146+
.unwrap()
147+
}
121148
};
122149
total_elapsed += duration;
123150
}

storage/src/qmdb/benches/fixed/init.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
use crate::fixed::{
55
any_cfg, current_cfg, gen_random_kv, get_any_ordered_fixed, get_any_ordered_variable,
66
get_any_unordered_fixed, get_any_unordered_variable, get_current_ordered_fixed,
7-
get_current_unordered_fixed, variable_any_cfg, Digest, OCurrentDb, OFixedDb, OVAnyDb,
8-
UCurrentDb, UFixedDb, UVAnyDb, Variant, THREADS, VARIANTS,
7+
get_current_ordered_variable, get_current_unordered_fixed, get_current_unordered_variable,
8+
variable_any_cfg, variable_current_cfg, Digest, OCurrentDb, OFixedDb, OVAnyDb, OVCurrentDb,
9+
UCurrentDb, UFixedDb, UVAnyDb, UVCurrentDb, Variant, THREADS, VARIANTS,
910
};
1011
use commonware_runtime::{
1112
benchmarks::{context, tokio},
@@ -83,6 +84,14 @@ fn bench_fixed_init(c: &mut Criterion) {
8384
let db = get_any_ordered_variable(ctx.clone()).await;
8485
setup_db(db, elements, operations).await;
8586
}
87+
Variant::CurrentUnorderedVariable => {
88+
let db = get_current_unordered_variable(ctx.clone()).await;
89+
setup_db(db, elements, operations).await;
90+
}
91+
Variant::CurrentOrderedVariable => {
92+
let db = get_current_ordered_variable(ctx.clone()).await;
93+
setup_db(db, elements, operations).await;
94+
}
8695
}
8796
});
8897

@@ -102,7 +111,8 @@ fn bench_fixed_init(c: &mut Criterion) {
102111
let pool = ctx.create_pool(THREADS).unwrap();
103112
let any_cfg = any_cfg(pool.clone());
104113
let current_cfg = current_cfg(pool.clone());
105-
let variable_any_cfg = variable_any_cfg(pool);
114+
let variable_any_cfg = variable_any_cfg(pool.clone());
115+
let variable_current_cfg = variable_current_cfg(pool);
106116
let start = Instant::now();
107117
for _ in 0..iters {
108118
match variant {
@@ -144,6 +154,24 @@ fn bench_fixed_init(c: &mut Criterion) {
144154
.unwrap();
145155
assert_ne!(db.op_count(), 0);
146156
}
157+
Variant::CurrentUnorderedVariable => {
158+
let db = UVCurrentDb::init(
159+
ctx.clone(),
160+
variable_current_cfg.clone(),
161+
)
162+
.await
163+
.unwrap();
164+
assert_ne!(db.op_count(), 0);
165+
}
166+
Variant::CurrentOrderedVariable => {
167+
let db = OVCurrentDb::init(
168+
ctx.clone(),
169+
variable_current_cfg.clone(),
170+
)
171+
.await
172+
.unwrap();
173+
assert_ne!(db.op_count(), 0);
174+
}
147175
}
148176
}
149177
start.elapsed()
@@ -179,6 +207,14 @@ fn bench_fixed_init(c: &mut Criterion) {
179207
let db = get_any_ordered_variable(ctx.clone()).await;
180208
db.destroy().await.unwrap();
181209
}
210+
Variant::CurrentUnorderedVariable => {
211+
let db = get_current_unordered_variable(ctx.clone()).await;
212+
db.destroy().await.unwrap();
213+
}
214+
Variant::CurrentOrderedVariable => {
215+
let db = get_current_ordered_variable(ctx.clone()).await;
216+
db.destroy().await.unwrap();
217+
}
182218
}
183219
});
184220
}

storage/src/qmdb/benches/fixed/mod.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ use commonware_storage::{
1717
FixedConfig as AConfig, VariableConfig as VariableAnyConfig,
1818
},
1919
current::{
20-
ordered::fixed::Db as OCurrent, unordered::fixed::Db as UCurrent,
21-
FixedConfig as CConfig,
20+
ordered::{fixed::Db as OCurrent, variable::Db as OVCurrent},
21+
unordered::{fixed::Db as UCurrent, variable::Db as UVCurrent},
22+
FixedConfig as CConfig, VariableConfig as VariableCurrentConfig,
2223
},
2324
store::LogStore,
2425
},
@@ -41,6 +42,8 @@ enum Variant {
4142
AnyOrderedVariable,
4243
CurrentUnorderedFixed,
4344
CurrentOrderedFixed,
45+
CurrentUnorderedVariable,
46+
CurrentOrderedVariable,
4447
}
4548

4649
impl Variant {
@@ -52,17 +55,21 @@ impl Variant {
5255
Self::AnyOrderedVariable => "any::ordered::variable",
5356
Self::CurrentUnorderedFixed => "current::unordered::fixed",
5457
Self::CurrentOrderedFixed => "current::ordered::fixed",
58+
Self::CurrentUnorderedVariable => "current::unordered::variable",
59+
Self::CurrentOrderedVariable => "current::ordered::variable",
5560
}
5661
}
5762
}
5863

59-
const VARIANTS: [Variant; 6] = [
64+
const VARIANTS: [Variant; 8] = [
6065
Variant::AnyUnorderedFixed,
6166
Variant::AnyOrderedFixed,
6267
Variant::AnyUnorderedVariable,
6368
Variant::AnyOrderedVariable,
6469
Variant::CurrentUnorderedFixed,
6570
Variant::CurrentOrderedFixed,
71+
Variant::CurrentUnorderedVariable,
72+
Variant::CurrentOrderedVariable,
6673
];
6774

6875
const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(50_000);
@@ -96,6 +103,8 @@ type OVAnyDb = OVariable<Context, Digest, Digest, Sha256, EightCap>;
96103

97104
type UCurrentDb = UCurrent<Context, Digest, Digest, Sha256, EightCap, CHUNK_SIZE>;
98105
type OCurrentDb = OCurrent<Context, Digest, Digest, Sha256, EightCap, CHUNK_SIZE>;
106+
type UVCurrentDb = UVCurrent<Context, Digest, Digest, Sha256, EightCap, CHUNK_SIZE>;
107+
type OVCurrentDb = OVCurrent<Context, Digest, Digest, Sha256, EightCap, CHUNK_SIZE>;
99108

100109
/// Configuration for any QMDB.
101110
fn any_cfg(pool: ThreadPool) -> AConfig<EightCap> {
@@ -147,6 +156,25 @@ fn variable_any_cfg(pool: ThreadPool) -> VariableAnyConfig<EightCap, ()> {
147156
}
148157
}
149158

159+
/// Configuration for variable current QMDB.
160+
fn variable_current_cfg(pool: ThreadPool) -> VariableCurrentConfig<EightCap, ()> {
161+
VariableCurrentConfig::<EightCap, ()> {
162+
mmr_journal_partition: format!("journal_{PARTITION_SUFFIX}"),
163+
mmr_metadata_partition: format!("metadata_{PARTITION_SUFFIX}"),
164+
mmr_items_per_blob: ITEMS_PER_BLOB,
165+
mmr_write_buffer: WRITE_BUFFER_SIZE,
166+
log_partition: format!("log_journal_{PARTITION_SUFFIX}"),
167+
log_codec_config: (),
168+
log_items_per_blob: ITEMS_PER_BLOB,
169+
log_write_buffer: WRITE_BUFFER_SIZE,
170+
log_compression: None,
171+
bitmap_metadata_partition: format!("bitmap_metadata_{PARTITION_SUFFIX}"),
172+
translator: EightCap,
173+
thread_pool: Some(pool),
174+
buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
175+
}
176+
}
177+
150178
/// Get an unordered fixed Any QMDB instance in clean state.
151179
async fn get_any_unordered_fixed(ctx: Context) -> UFixedDb {
152180
let pool = ctx.clone().create_pool(THREADS).unwrap();
@@ -193,6 +221,24 @@ async fn get_current_ordered_fixed(ctx: Context) -> OCurrentDb {
193221
.unwrap()
194222
}
195223

224+
/// Get an unordered variable current QMDB instance.
225+
async fn get_current_unordered_variable(ctx: Context) -> UVCurrentDb {
226+
let pool = ctx.clone().create_pool(THREADS).unwrap();
227+
let variable_current_cfg = variable_current_cfg(pool);
228+
UVCurrent::<_, _, _, Sha256, EightCap, CHUNK_SIZE>::init(ctx, variable_current_cfg)
229+
.await
230+
.unwrap()
231+
}
232+
233+
/// Get an ordered variable current QMDB instance.
234+
async fn get_current_ordered_variable(ctx: Context) -> OVCurrentDb {
235+
let pool = ctx.clone().create_pool(THREADS).unwrap();
236+
let variable_current_cfg = variable_current_cfg(pool);
237+
OVCurrent::<_, _, _, Sha256, EightCap, CHUNK_SIZE>::init(ctx, variable_current_cfg)
238+
.await
239+
.unwrap()
240+
}
241+
196242
/// Generate a large db with random data. The function seeds the db with exactly `num_elements`
197243
/// elements by inserting them in order, each with a new random value. Then, it performs
198244
/// `num_operations` over these elements, each selected uniformly at random for each operation. The

storage/src/qmdb/benches/variable/generate.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
//! that supports variable-size values.
33
44
use crate::variable::{
5-
gen_random_kv, gen_random_kv_batched, get_any_ordered, get_any_unordered, Digest, Variant,
6-
VARIANTS,
5+
gen_random_kv, gen_random_kv_batched, get_any_ordered, get_any_unordered, get_current_ordered,
6+
get_current_unordered, Digest, Variant, VARIANTS,
77
};
88
use commonware_runtime::{
99
benchmarks::{context, tokio},
@@ -70,6 +70,30 @@ fn bench_variable_generate(c: &mut Criterion) {
7070
.await
7171
.unwrap()
7272
}
73+
Variant::CurrentUnordered => {
74+
let db = get_current_unordered(ctx.clone()).await;
75+
test_db(
76+
db,
77+
use_batch,
78+
elements,
79+
operations,
80+
commit_frequency,
81+
)
82+
.await
83+
.unwrap()
84+
}
85+
Variant::CurrentOrdered => {
86+
let db = get_current_ordered(ctx.clone()).await;
87+
test_db(
88+
db,
89+
use_batch,
90+
elements,
91+
operations,
92+
commit_frequency,
93+
)
94+
.await
95+
.unwrap()
96+
}
7397
};
7498
total_elapsed += elapsed;
7599
}

0 commit comments

Comments
 (0)