Skip to content

Commit 708173b

Browse files
committed
fix: stabilize expanded compaction correctness randomized lane
1 parent 2a25935 commit 708173b

File tree

4 files changed

+235
-21
lines changed

4 files changed

+235
-21
lines changed

.github/workflows/rust.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ jobs:
6868
name: compaction correctness (optional expanded randomized sst+reopen)
6969
runs-on: ubuntu-latest
7070
needs: build
71-
continue-on-error: true
7271
env:
7372
TONBO_COMPACTION_MODEL_SST: "1"
7473
TONBO_COMPACTION_REOPEN: "1"

src/db/tests/core/compaction_correctness.rs

Lines changed: 110 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,11 @@ impl ScenarioHarness {
278278
}
279279

280280
async fn compact_l0(
281-
&self,
281+
&mut self,
282282
sst_ids: Vec<u64>,
283283
target_level: u32,
284284
) -> Result<crate::compaction::executor::CompactionOutcome, Box<dyn std::error::Error>> {
285-
let start_id = 10_000;
285+
let start_id = self.next_sst_id.max(10_000);
286286
let outcome = compact_merge_l0(
287287
self.db.inner().as_ref(),
288288
sst_ids,
@@ -291,6 +291,14 @@ impl ScenarioHarness {
291291
start_id,
292292
)
293293
.await?;
294+
let next_generated_id = outcome
295+
.add_ssts
296+
.iter()
297+
.map(|entry| entry.sst_id().raw())
298+
.max()
299+
.map(|id| id.saturating_add(1))
300+
.unwrap_or_else(|| start_id.saturating_add(1));
301+
self.next_sst_id = self.next_sst_id.max(next_generated_id);
294302
Ok(outcome)
295303
}
296304

@@ -636,6 +644,7 @@ struct ModelRunner {
636644
oracle: MvccOracle,
637645
l0_ssts: Vec<u64>,
638646
active_snapshot_ts: Option<u64>,
647+
active_snapshot: Option<TxSnapshot>,
639648
allow_reopen: bool,
640649
eager_flush: bool,
641650
allow_sst: bool,
@@ -659,6 +668,7 @@ impl ModelRunner {
659668
oracle: MvccOracle::default(),
660669
l0_ssts: Vec::new(),
661670
active_snapshot_ts: None,
671+
active_snapshot: None,
662672
allow_reopen,
663673
eager_flush,
664674
allow_sst,
@@ -673,6 +683,11 @@ impl ModelRunner {
673683
}
674684
}
675685

686+
fn clear_active_snapshot(&mut self) {
687+
self.active_snapshot_ts = None;
688+
self.active_snapshot = None;
689+
}
690+
676691
async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
677692
for _ in 0..MODEL_OPS_PER_SEED {
678693
let op_kind = self.pick_op();
@@ -703,6 +718,7 @@ impl ModelRunner {
703718
async fn apply_op(&mut self, op_kind: OpKind) -> Result<(), Box<dyn std::error::Error>> {
704719
match op_kind {
705720
OpKind::Put => {
721+
self.clear_active_snapshot();
706722
let key = self.pick_key();
707723
let value = self.rng.next_i64(10_000);
708724
self.trace.push(Op::Put {
@@ -720,6 +736,7 @@ impl ModelRunner {
720736
}
721737
}
722738
OpKind::Delete => {
739+
self.clear_active_snapshot();
723740
let key = self.pick_key();
724741
self.trace.push(Op::Delete { key: key.clone() });
725742
self.harness.ingest_delete(&key, &mut self.oracle).await?;
@@ -731,6 +748,7 @@ impl ModelRunner {
731748
}
732749
}
733750
OpKind::Flush => {
751+
self.clear_active_snapshot();
734752
self.trace.push(Op::Flush);
735753
if self.allow_sst {
736754
if let Some(sst_id) = self.harness.try_flush_immutables_to_l0().await? {
@@ -764,18 +782,7 @@ impl ModelRunner {
764782
if outcome.add_ssts.is_empty() {
765783
return Err("compaction produced no output sst".into());
766784
}
767-
let snapshot_ts = match self.active_snapshot_ts {
768-
Some(ts) => ts,
769-
None => {
770-
let snapshot = self.harness.db.begin_snapshot().await?;
771-
snapshot.read_view().read_ts().get()
772-
}
773-
};
774-
let snapshot = self
775-
.harness
776-
.db
777-
.snapshot_at(Timestamp::new(snapshot_ts))
778-
.await?;
785+
let (snapshot, snapshot_ts) = self.read_snapshot().await?;
779786
let ctx = self.failure_context(Some(snapshot_ts));
780787
assert_oracle_matches(
781788
"model_based_compaction",
@@ -836,6 +843,7 @@ impl ModelRunner {
836843
let snapshot = self.harness.db.begin_snapshot().await?;
837844
let snapshot_ts = snapshot.read_view().read_ts().get();
838845
self.active_snapshot_ts = Some(snapshot_ts);
846+
self.active_snapshot = Some(snapshot.clone());
839847
self.trace.push(Op::Snapshot { snapshot_ts });
840848
let ctx = self.failure_context(Some(snapshot_ts));
841849
assert_oracle_matches(
@@ -868,15 +876,23 @@ impl ModelRunner {
868876
Some(&ctx),
869877
)
870878
.await?;
879+
self.active_snapshot = Some(snapshot);
880+
} else {
881+
self.active_snapshot = None;
871882
}
872883
}
873884
}
874885
Ok(())
875886
}
876887

877888
async fn read_snapshot(&mut self) -> Result<(TxSnapshot, u64), Box<dyn std::error::Error>> {
878-
if let Some(ts) = self.active_snapshot_ts {
889+
if let Some(snapshot) = self.active_snapshot.as_ref()
890+
&& let Some(ts) = self.active_snapshot_ts
891+
{
892+
Ok((snapshot.clone(), ts))
893+
} else if let Some(ts) = self.active_snapshot_ts {
879894
let snapshot = self.harness.db.snapshot_at(Timestamp::new(ts)).await?;
895+
self.active_snapshot = Some(snapshot.clone());
880896
Ok((snapshot, ts))
881897
} else {
882898
let snapshot = self.harness.db.begin_snapshot().await?;
@@ -1236,9 +1252,6 @@ async fn compaction_correctness_reopen_snapshot_durability()
12361252
let snapshot_ts = snapshot.read_view().read_ts().get();
12371253
assert_oracle_matches(scenario, snapshot_ts, &snapshot, &oracle, &harness.db, None).await?;
12381254

1239-
// Note: avoid compaction here because some SSTs can be missing Parquet page indexes,
1240-
// and the compaction read path currently requires them. Re-enable compaction coverage
1241-
// once page index emission is consistent for all SSTs.
12421255
harness.reopen().await?;
12431256
let reopened_snapshot = harness.db.snapshot_at(Timestamp::new(snapshot_ts)).await?;
12441257
assert_oracle_matches(
@@ -1254,6 +1267,85 @@ async fn compaction_correctness_reopen_snapshot_durability()
12541267
Ok(())
12551268
}
12561269

1270+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1271+
async fn compaction_correctness_delete_only_sst_reopen_and_compact()
1272+
-> Result<(), Box<dyn std::error::Error>> {
1273+
let scenario = "delete_only_sst_reopen_and_compact";
1274+
let mut harness = ScenarioHarness::new("compaction-correctness-delete-only-sst").await?;
1275+
let mut oracle = MvccOracle::default();
1276+
1277+
let _ts0 = harness.ingest_delete("k01", &mut oracle).await?;
1278+
let sst0 = harness.flush_immutables_to_l0().await?;
1279+
1280+
let _ts1 = harness.ingest_put("k02", 20, &mut oracle).await?;
1281+
let sst1 = harness.flush_immutables_to_l0().await?;
1282+
1283+
let snapshot = harness.db.begin_snapshot().await?;
1284+
let snapshot_ts = snapshot.read_view().read_ts().get();
1285+
assert_oracle_matches(scenario, snapshot_ts, &snapshot, &oracle, &harness.db, None).await?;
1286+
assert_range_matches(
1287+
scenario,
1288+
snapshot_ts,
1289+
&snapshot,
1290+
&oracle,
1291+
&harness.db,
1292+
"k01",
1293+
"k02",
1294+
None,
1295+
)
1296+
.await?;
1297+
1298+
harness.reopen().await?;
1299+
let reopened = harness.db.snapshot_at(Timestamp::new(snapshot_ts)).await?;
1300+
assert_oracle_matches(scenario, snapshot_ts, &reopened, &oracle, &harness.db, None).await?;
1301+
assert_range_matches(
1302+
scenario,
1303+
snapshot_ts,
1304+
&reopened,
1305+
&oracle,
1306+
&harness.db,
1307+
"k01",
1308+
"k02",
1309+
None,
1310+
)
1311+
.await?;
1312+
1313+
let outcome = harness.compact_l0(vec![sst0, sst1], 1).await?;
1314+
assert_eq!(
1315+
outcome.remove_ssts.len(),
1316+
2,
1317+
"scenario={scenario} expected compaction to remove 2 SSTs"
1318+
);
1319+
assert!(
1320+
!outcome.add_ssts.is_empty(),
1321+
"scenario={scenario} expected compaction to add SSTs"
1322+
);
1323+
1324+
let post_snapshot = harness.db.snapshot_at(Timestamp::new(snapshot_ts)).await?;
1325+
assert_oracle_matches(
1326+
scenario,
1327+
snapshot_ts,
1328+
&post_snapshot,
1329+
&oracle,
1330+
&harness.db,
1331+
None,
1332+
)
1333+
.await?;
1334+
assert_range_matches(
1335+
scenario,
1336+
snapshot_ts,
1337+
&post_snapshot,
1338+
&oracle,
1339+
&harness.db,
1340+
"k01",
1341+
"k02",
1342+
None,
1343+
)
1344+
.await?;
1345+
1346+
Ok(())
1347+
}
1348+
12571349
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
12581350
async fn compaction_correctness_iterator_seek_stability() -> Result<(), Box<dyn std::error::Error>>
12591351
{

src/db/tests/core/scan.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,44 @@ async fn plan_scan_missing_page_indexes_is_error() {
217217
);
218218
}
219219

220+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
221+
async fn plan_scan_empty_sst_without_page_indexes_is_allowed() {
222+
let schema = Arc::new(Schema::new(vec![
223+
Field::new("id", DataType::Utf8, false),
224+
Field::new("v", DataType::Int32, false),
225+
]));
226+
let db = db_with_schema(schema.clone()).await;
227+
228+
let sst_root = Path::from("scan-empty-no-page-index");
229+
db.fs.create_dir_all(&sst_root).await.expect("create dir");
230+
let data_path = sst_root.child("000.parquet");
231+
let empty_batch = rows_with_commit_ts(0, 0, Timestamp::MIN.get());
232+
write_parquet_data_missing_page_index(Arc::clone(&db.fs), data_path.clone(), empty_batch).await;
233+
234+
let sst_entry = SstEntry::new(SsTableId::new(12), None, None, data_path, None);
235+
db.manifest
236+
.apply_version_edits(
237+
db.manifest_table,
238+
&[VersionEdit::AddSsts {
239+
level: 0,
240+
entries: vec![sst_entry],
241+
}],
242+
)
243+
.await
244+
.expect("add sst");
245+
246+
let snapshot = db.begin_snapshot().await.expect("snapshot");
247+
let plan = snapshot
248+
.plan_scan(&db, &Expr::True, None, None)
249+
.await
250+
.expect("empty sst without page indexes should plan");
251+
assert_eq!(plan.sst_selections.len(), 1);
252+
253+
let stream = db.execute_scan(plan).await.expect("execute");
254+
let batches = stream.try_collect::<Vec<_>>().await.expect("collect");
255+
assert!(collect_ids(&batches).is_empty());
256+
}
257+
220258
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
221259
async fn plan_scan_prunes_sst_row_groups_and_pages() {
222260
let schema = Arc::new(Schema::new(vec![

src/ondisk/sstable.rs

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,6 +1487,36 @@ pub(crate) fn validate_page_indexes(
14871487
metadata: &ParquetMetaData,
14881488
) -> Result<(), SsTableError> {
14891489
let path = path.to_string();
1490+
let row_groups = metadata.num_row_groups();
1491+
1492+
// Empty SST data files (e.g. delete-only runs) have no pages, so parquet metadata may omit
1493+
// page-index sections entirely. Treat that representation as valid.
1494+
if row_groups == 0 {
1495+
if let Some(column_index) = metadata.column_index()
1496+
&& !column_index.is_empty()
1497+
{
1498+
return Err(SsTableError::MissingPageIndex {
1499+
path: path.clone(),
1500+
reason: format!(
1501+
"column index row group count mismatch: expected {row_groups}, got {}",
1502+
column_index.len()
1503+
),
1504+
});
1505+
}
1506+
if let Some(offset_index) = metadata.offset_index()
1507+
&& !offset_index.is_empty()
1508+
{
1509+
return Err(SsTableError::MissingPageIndex {
1510+
path: path.clone(),
1511+
reason: format!(
1512+
"offset index row group count mismatch: expected {row_groups}, got {}",
1513+
offset_index.len()
1514+
),
1515+
});
1516+
}
1517+
return Ok(());
1518+
}
1519+
14901520
let column_index = metadata
14911521
.column_index()
14921522
.ok_or_else(|| SsTableError::MissingPageIndex {
@@ -1499,8 +1529,6 @@ pub(crate) fn validate_page_indexes(
14991529
path: path.clone(),
15001530
reason: "offset index missing".to_string(),
15011531
})?;
1502-
1503-
let row_groups = metadata.num_row_groups();
15041532
if column_index.len() != row_groups {
15051533
return Err(SsTableError::MissingPageIndex {
15061534
path: path.clone(),
@@ -2393,4 +2421,61 @@ mod tests {
23932421
other => panic!("unexpected error: {other:?}"),
23942422
}
23952423
}
2424+
2425+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2426+
async fn open_parquet_empty_without_page_indexes_is_allowed() {
2427+
use arrow_array::StringArray;
2428+
2429+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
2430+
let values: ArrayRef = Arc::new(StringArray::from(Vec::<&str>::new()));
2431+
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![values]).expect("batch");
2432+
2433+
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
2434+
let tempdir = tempfile::tempdir().expect("tempdir");
2435+
let path = Path::from(
2436+
tempdir
2437+
.path()
2438+
.join("empty-no-page-index.parquet")
2439+
.to_string_lossy()
2440+
.to_string(),
2441+
);
2442+
let file = fs
2443+
.open_options(
2444+
&path,
2445+
OpenOptions::default()
2446+
.create(true)
2447+
.write(true)
2448+
.truncate(true),
2449+
)
2450+
.await
2451+
.expect("open file");
2452+
2453+
let properties = WriterProperties::builder()
2454+
.set_statistics_enabled(EnabledStatistics::None)
2455+
.set_offset_index_disabled(true)
2456+
.build();
2457+
let mut writer = AsyncArrowWriter::try_new(
2458+
AsyncWriter::new(file, NoopExecutor),
2459+
Arc::clone(&schema),
2460+
Some(properties),
2461+
)
2462+
.expect("writer");
2463+
writer.write(&batch).await.expect("write");
2464+
writer.close().await.expect("close");
2465+
2466+
let (mut stream, stream_schema) = open_parquet_stream_with_schema(
2467+
Arc::clone(&fs),
2468+
path,
2469+
None,
2470+
None,
2471+
None,
2472+
None,
2473+
NoopExecutor,
2474+
)
2475+
.await
2476+
.expect("empty parquet without page indexes should be readable");
2477+
2478+
assert_eq!(stream_schema.fields().len(), 1);
2479+
assert!(stream.next().await.is_none());
2480+
}
23962481
}

0 commit comments

Comments
 (0)