Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/db/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ pub struct DbBuilder<S = Unconfigured> {
state: S,
compaction_options: Option<CompactionOptions>,
minor_compaction: Option<MinorCompactionOptions>,
seal_policy: Option<Arc<dyn crate::inmem::policy::SealPolicy + Send + Sync>>,
}

/// Error returned when building a [`DB`] through [`DbBuilder`].
Expand Down Expand Up @@ -779,6 +780,7 @@ impl DbBuilder<Unconfigured> {
state: Unconfigured,
compaction_options: None,
minor_compaction: Some(MinorCompactionOptions::default()),
seal_policy: None,
}
}

Expand All @@ -799,6 +801,7 @@ impl DbBuilder<Unconfigured> {
state: StorageConfig::new(fs, root, DurabilityClass::Volatile),
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}

Expand All @@ -822,6 +825,7 @@ impl DbBuilder<Unconfigured> {
state,
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}

Expand All @@ -839,6 +843,7 @@ impl DbBuilder<Unconfigured> {
state: StorageConfig::new(fs, root, DurabilityClass::Durable),
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}

Expand Down Expand Up @@ -935,6 +940,21 @@ where
self
}

/// Set the memtable sealing policy.
///
/// The seal policy controls when the mutable memtable is frozen into an
/// immutable segment. For example, `BatchesThreshold { batches: 1 }` seals
/// after every ingested batch, which is useful for low-latency flush in
/// WASM or S3-backed deployments.
#[must_use]
pub fn with_seal_policy(
mut self,
policy: Arc<dyn crate::inmem::policy::SealPolicy + Send + Sync>,
) -> Self {
self.seal_policy = Some(policy);
self
}

#[allow(clippy::arc_with_non_send_sync)]
fn build_minor_compaction_state(
layout: &StorageLayout<FS>,
Expand Down Expand Up @@ -1027,6 +1047,7 @@ where
state,
compaction_options,
minor_compaction,
seal_policy,
} = self;
let manifest_init = ManifestBootstrap::new(&layout);
let file_ids = FileIdGenerator::default();
Expand Down Expand Up @@ -1104,6 +1125,9 @@ where
);

inner.minor_compaction = minor_compaction_state;
if let Some(policy) = seal_policy {
inner.set_seal_policy(policy);
}
if let Some(options) = compaction_options.as_ref() {
inner.l0_backpressure = options.backpressure().cloned();
inner.cas_backoff = options.cas_backoff_config().clone();
Expand Down Expand Up @@ -1623,6 +1647,7 @@ where
state,
compaction_options,
minor_compaction,
seal_policy,
} = self;
state.prepare().await?;
let layout = state.layout()?;
Expand Down Expand Up @@ -1694,6 +1719,9 @@ where
.await
.map_err(DbBuildError::Mode)?;
inner.minor_compaction = minor_compaction_state;
if let Some(ref policy) = seal_policy {
inner.set_seal_policy(Arc::clone(policy));
}
if let Some(options) = compaction_options.as_ref() {
inner.l0_backpressure = options.backpressure().cloned();
inner.cas_backoff = options.cas_backoff_config().clone();
Expand Down Expand Up @@ -1733,6 +1761,7 @@ where
state,
compaction_options,
minor_compaction,
seal_policy,
}
.build_with_layout(executor, layout)
.await
Expand Down
26 changes: 19 additions & 7 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use std::{
Arc, Mutex, MutexGuard,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
time::Duration,
};

use aisle::Pruner;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef};
use fusio::{
DynFs,
executor::{Executor, Timer},
executor::{Executor, Instant as ExecInstant, Timer},
mem::fs::InMemoryFs,
};
use futures::lock::Mutex as AsyncMutex;
Expand Down Expand Up @@ -131,12 +131,12 @@ struct L0Stats {

struct L0StatsCache {
stats: L0Stats,
last_refresh: Instant,
last_refresh: ExecInstant,
valid: bool,
}

impl L0StatsCache {
fn new(now: Instant) -> Self {
fn new(now: ExecInstant) -> Self {
Self {
stats: L0Stats {
file_count: 0,
Expand All @@ -147,15 +147,15 @@ impl L0StatsCache {
}
}

fn is_fresh(&self, now: Instant, refresh_interval: Duration) -> bool {
fn is_fresh(&self, now: ExecInstant, refresh_interval: Duration) -> bool {
self.valid && now.saturating_duration_since(self.last_refresh) < refresh_interval
}

fn snapshot(&self) -> L0Stats {
self.stats
}

fn update(&mut self, now: Instant, stats: L0Stats) {
fn update(&mut self, now: ExecInstant, stats: L0Stats) {
self.stats = stats;
self.last_refresh = now;
self.valid = true;
Expand Down Expand Up @@ -329,6 +329,19 @@ where
DbBuilder::new(config)
}

/// Set the sealing policy.
///
/// Returns `true` if the policy was updated, or `false` if the DB handle
/// is shared and cannot be mutated.
pub fn set_seal_policy(&mut self, policy: Arc<dyn SealPolicy + Send + Sync>) -> bool {
if let Some(inner) = Arc::get_mut(&mut self.inner) {
inner.set_seal_policy(policy);
true
} else {
false
}
}

/// Begin a read-only snapshot for queries.
pub async fn begin_snapshot(&self) -> Result<TxSnapshot, SnapshotError> {
self.inner.begin_snapshot().await
Expand Down Expand Up @@ -1194,7 +1207,6 @@ where
}

/// Set or replace the sealing policy used by this DB.
#[cfg(test)]
pub fn set_seal_policy(&mut self, policy: Arc<dyn SealPolicy + Send + Sync>) {
self.policy = policy;
}
Expand Down
1 change: 1 addition & 0 deletions src/db/tests/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ mod ingest;
mod metadata;
mod recovery;
mod scan;
mod seal_policy;
mod wal;
mod wal_pruning;
153 changes: 153 additions & 0 deletions src/db/tests/core/seal_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::sync::Arc;

use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use fusio::{executor::NoopExecutor, mem::fs::InMemoryFs};
use typed_arrow_dyn::{DynCell, DynRow};

use crate::{
db::{DB, DbBuilder, Expr, ScalarValue},
inmem::policy::BatchesThreshold,
mode::DynModeConfig,
test::build_batch,
};

type TestDb = DB<InMemoryFs, NoopExecutor>;

fn test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]))
}

fn extract_rows(batches: &[RecordBatch]) -> Vec<(String, i32)> {
let mut rows = Vec::new();
for batch in batches {
let ids = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("id col");
let vals = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.expect("v col");
for (id, v) in ids.iter().zip(vals.iter()) {
if let (Some(id), Some(v)) = (id, v) {
rows.push((id.to_string(), v));
}
}
}
rows
}

#[tokio::test(flavor = "current_thread")]
async fn set_seal_policy_succeeds_on_exclusive_handle() {
let schema = test_schema();
let config = DynModeConfig::from_key_name(schema, "id").expect("config");
let executor = Arc::new(NoopExecutor);
let mut db: TestDb = DB::new(config, executor).await.expect("db");

let updated = db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
assert!(updated, "set_seal_policy should succeed on an exclusive Arc");
}

#[tokio::test(flavor = "current_thread")]
async fn set_seal_policy_fails_on_shared_handle() {
let schema = test_schema();
let config = DynModeConfig::from_key_name(schema, "id").expect("config");
let executor = Arc::new(NoopExecutor);
let mut db: TestDb = DB::new(config, executor).await.expect("db");

// Clone the inner Arc to simulate a shared handle.
let _shared = db.inner().clone();

let updated = db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
assert!(
!updated,
"set_seal_policy should fail when Arc has multiple references"
);
}

#[tokio::test(flavor = "current_thread")]
async fn set_seal_policy_activates_sealing() {
let schema = test_schema();
let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("config");
let executor = Arc::new(NoopExecutor);
let mut db: TestDb = DB::new(config, executor).await.expect("db");

db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));

let rows = vec![DynRow(vec![
Some(DynCell::Str("k1".into())),
Some(DynCell::I32(1)),
])];
let batch = build_batch(schema, rows).expect("batch");
db.ingest(batch).await.expect("ingest");

assert!(
db.inner().num_immutable_segments() >= 1,
"sealing should trigger after ingest with BatchesThreshold {{ batches: 1 }}"
);
}

#[tokio::test(flavor = "current_thread")]
async fn builder_with_seal_policy_applies_policy() {
let schema = test_schema();
let db: TestDb = DbBuilder::from_schema_key_name(schema.clone(), "id")
.expect("config")
.in_memory("seal-policy-builder-test")
.expect("in memory config")
.with_seal_policy(Arc::new(BatchesThreshold { batches: 1 }))
.open_with_executor(Arc::new(NoopExecutor))
.await
.expect("db");

let rows = vec![DynRow(vec![
Some(DynCell::Str("k1".into())),
Some(DynCell::I32(10)),
])];
let batch = build_batch(schema, rows).expect("batch");
db.ingest(batch).await.expect("ingest");

assert!(
db.inner().num_immutable_segments() >= 1,
"seal policy set via builder should trigger sealing"
);
}

#[tokio::test(flavor = "current_thread")]
async fn builder_with_seal_policy_data_remains_readable() {
let schema = test_schema();
let db: TestDb = DbBuilder::from_schema_key_name(schema.clone(), "id")
.expect("config")
.in_memory("seal-policy-read-test")
.expect("in memory config")
.with_seal_policy(Arc::new(BatchesThreshold { batches: 1 }))
.open_with_executor(Arc::new(NoopExecutor))
.await
.expect("db");

let rows = vec![
DynRow(vec![
Some(DynCell::Str("a".into())),
Some(DynCell::I32(1)),
]),
DynRow(vec![
Some(DynCell::Str("b".into())),
Some(DynCell::I32(2)),
]),
];
let batch = build_batch(schema, rows).expect("batch");
db.ingest(batch).await.expect("ingest");

// Data should still be readable even after being sealed into immutable segments.
let predicate = Expr::gt("v", ScalarValue::from(0i64));
let batches = db.scan().filter(predicate).collect().await.expect("scan");
let rows = extract_rows(&batches);
assert_eq!(rows.len(), 2);
assert!(rows.iter().any(|(id, _)| id == "a"));
assert!(rows.iter().any(|(id, _)| id == "b"));
}
29 changes: 29 additions & 0 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,13 @@ pub(crate) fn validate_page_indexes(
metadata: &ParquetMetaData,
) -> Result<(), SsTableError> {
let path = path.to_string();

// Empty files (0 row groups) don't have page indexes - this is valid
// when the data batch is empty but deletes exist in the sidecar
if metadata.num_row_groups() == 0 {
return Ok(());
}

let column_index = metadata
.column_index()
.ok_or_else(|| SsTableError::MissingPageIndex {
Expand Down Expand Up @@ -2201,4 +2208,26 @@ mod tests {
other => panic!("unexpected error: {other:?}"),
}
}

#[test]
fn validate_page_indexes_accepts_zero_row_groups() {
use parquet::{
file::metadata::{FileMetaData, ParquetMetaData},
schema::types::{SchemaDescriptor, Type as SchemaType},
};

let parquet_schema = SchemaType::group_type_builder("schema")
.build()
.expect("parquet schema");
let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
let file_meta = FileMetaData::new(2, 0, None, None, schema_descr, None);
let metadata = ParquetMetaData::new(file_meta, vec![]);

let path = Path::from("test/empty.parquet");
let result = validate_page_indexes(&path, &metadata);
assert!(
result.is_ok(),
"validate_page_indexes should accept files with 0 row groups"
);
}
}
Loading
Loading